Simulation and Error Checking¶
Bluesky provides three different approaches for simulating a plan without actually executing it:
Introspect a plan by passing it to a “simulator” instead of a RunEngine.
Execute a plan with the real RunEngine, but use simulated hardware objects.
Redefine the RunEngine commands to change their meanings.
Approaches (1) and (2) are the most straightforward and most common.
Introspection¶
Recall that plans yield messages that describe what should be done; they do not communicate with hardware directly. Therefore it’s easy to use (or write) a simple function that iterates through the plan and summarizes or analyzes its actions.
Print summary of plan |
|
Plot the raster path for this plan |
|
Check that a plan will not move devices outside of their limits. |
Summarize¶
The simulator summarize_plan()
print a summary of what a plan would do if
executed by the RunEngine.
In [1]: from bluesky.simulators import summarize_plan
In [2]: from ophyd.sim import det, motor
In [3]: from bluesky.plans import scan
In [4]: summarize_plan(scan([det], motor, 1, 3 ,3))
=================================== Open Run ===================================
motor -> 1.0
Read ['det', 'motor']
motor -> 2.0
Read ['det', 'motor']
motor -> 3.0
Read ['det', 'motor']
================================== Close Run ===================================
To see the unabridged contents of a plan, simply use the builtin Python
function list()
. Note that it is not possible to summarize plans that
have adaptive logic because their contents are determined dynamically during
plan executation.
In [5]: list(scan([det], motor, 1, 3 ,3))
Out[5]:
[Msg('stage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('stage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('open_run', obj=None, args=(), kwargs={'detectors': ['det'], 'motors': ('motor',), 'num_points': 3, 'num_intervals': 2, 'plan_args': {'detectors': ["SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])"], 'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3], 'per_step': 'None'}, 'plan_name': 'scan', 'hints': {'dimensions': [(['motor'], 'primary')]}, 'plan_pattern': 'inner_product', 'plan_pattern_module': 'bluesky.plan_patterns', 'plan_pattern_args': {'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3]}}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(1.0,), kwargs={'group': 'set-56f8fc'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-56f8fc'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-94e485'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-94e485'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-94e485'}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(2.0,), kwargs={'group': 'set-d68d70'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-d68d70'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-468f07'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-468f07'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-468f07'}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(3.0,), kwargs={'group': 'set-c733e1'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-c733e1'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-97ef68'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-97ef68'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-97ef68'}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('close_run', obj=None, args=(), kwargs={'exit_status': None, 'reason': None}, run=None),
Msg('unstage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('unstage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None)]
Check Limits¶
Suppose that this motor is configured with limits on its range of motion at +/-
1000. The check_limits()
simulator can verify whether or not a plan will
violate these limits, saving you from discovering this part way through a long
experiment.
In [6]: from bluesky.simulators import check_limits
In [7]: check_limits(scan([det], motor, 1, 3 ,3)) # no problem here
In [8]: check_limits(scan([det], motor, 1, -3000, 3000)) # should raise an error
Simulated Hardware¶
Warning
This feature has recently been changed, and it has yet to be documented.
Customizing RunEngine Methods¶
The RunEngine allows you to customize the meaning of commands (like ‘set’ and ‘read’). One could use this feature to create a dummy RunEngine that, instead of actually reading and writing to hardware, merely reports what it would have done.
-
RunEngine.
register_command
(name, func)[source] Register a new Message command.
- Parameters
- namestr
- funccallable
This can be a function or a method. The signature is f(msg).
-
RunEngine.
unregister_command
(name)[source] Unregister a Message command.
- Parameters
- namestr