import itertools
import uuid
from cycler import cycler
from . import utils
import operator
from functools import reduce
from collections.abc import Iterable
import time
import warnings
try:
# cytools is a drop-in replacement for toolz, implemented in Cython
from cytools import partition
except ImportError:
from toolz import partition
from .utils import (
separate_devices,
all_safe_rewind,
Msg,
ensure_generator,
short_uid as _short_uid,
)
[docs]def create(name='primary'):
"""
Bundle future readings into a new Event document.
Parameters
----------
name : string, optional
name given to event stream, used to convenient identification
default is 'primary'
Yields
------
msg : Msg
Msg('create', name=name)
See Also
--------
:func:`bluesky.plan_stubs.save`
"""
return (yield Msg('create', name=name))
[docs]def save():
"""
Close a bundle of readings and emit a completed Event document.
Yields
-------
msg : Msg
Msg('save')
See Also
--------
:func:`bluesky.plan_stubs.create`
"""
return (yield Msg('save'))
[docs]def drop():
"""
Drop a bundle of readings without emitting a completed Event document.
Yields
------
msg : Msg
Msg('drop')
See Also
--------
:func:`bluesky.plan_stubs.save`
:func:`bluesky.plan_stubs.create`
"""
return (yield Msg('drop'))
[docs]def read(obj):
"""
Take a reading and add it to the current bundle of readings.
Parameters
----------
obj : Device or Signal
Yields
------
msg : Msg
Msg('read', obj)
"""
return (yield Msg('read', obj))
[docs]def monitor(obj, *, name=None, **kwargs):
"""
Asynchronously monitor for new values and emit Event documents.
Parameters
----------
obj : Signal
args :
passed through to ``obj.subscribe()``
name : string, optional
name of event stream; default is None
kwargs :
passed through to ``obj.subscribe()``
Yields
------
msg : Msg
``Msg('monitor', obj, *args, **kwargs)``
See Also
--------
:func:`bluesky.plan_stubs.unmonitor`
"""
return (yield Msg('monitor', obj, name=name, **kwargs))
[docs]def unmonitor(obj):
"""
Stop monitoring.
Parameters
----------
obj : Signal
Yields
------
msg : Msg
Msg('unmonitor', obj)
See Also
--------
:func:`bluesky.plan_stubs.monitor`
"""
return (yield Msg('unmonitor', obj))
[docs]def null():
"""
Yield a no-op Message. (Primarily for debugging and testing.)
Yields
------
msg : Msg
Msg('null')
"""
return (yield Msg('null'))
[docs]def abs_set(obj, *args, group=None, wait=False, **kwargs):
"""
Set a value. Optionally, wait for it to complete before continuing.
Parameters
----------
obj : Device
group : string (or any hashable object), optional
identifier used by 'wait'
wait : boolean, optional
If True, wait for completion before processing any more messages.
False by default.
args :
passed to obj.set()
kwargs :
passed to obj.set()
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.rel_set`
:func:`bluesky.plan_stubs.wait`
:func:`bluesky.plan_stubs.mv`
"""
if wait and group is None:
group = str(uuid.uuid4())
ret = yield Msg('set', obj, *args, group=group, **kwargs)
if wait:
yield Msg('wait', None, group=group)
return ret
[docs]def rel_set(obj, *args, group=None, wait=False, **kwargs):
"""
Set a value relative to current value. Optionally, wait before continuing.
Parameters
----------
obj : Device
group : string (or any hashable object), optional
identifier used by 'wait'; None by default
wait : boolean, optional
If True, wait for completion before processing any more messages.
False by default.
args :
passed to obj.set()
kwargs :
passed to obj.set()
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.abs_set`
:func:`bluesky.plan_stubs.wait`
"""
from .preprocessors import relative_set_wrapper
return (
yield from relative_set_wrapper(
abs_set(obj, *args, group=group, wait=wait, **kwargs)
)
)
[docs]def mv(*args, group=None, **kwargs):
"""
Move one or more devices to a setpoint. Wait for all to complete.
If more than one device is specifed, the movements are done in parallel.
Parameters
----------
args :
device1, value1, device2, value2, ...
group : string, optional
Used to mark these as a unit to be waited on.
kwargs :
passed to obj.set()
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.abs_set`
:func:`bluesky.plan_stubs.mvr`
"""
group = group or str(uuid.uuid4())
status_objects = []
cyl = reduce(operator.add, [cycler(obj, [val]) for obj, val in partition(2, args)])
(step,) = utils.merge_cycler(cyl)
for obj, val in step.items():
ret = yield Msg('set', obj, val, group=group, **kwargs)
status_objects.append(ret)
yield Msg('wait', None, group=group)
return tuple(status_objects)
mov = mv # synonym
[docs]def mvr(*args, group=None, **kwargs):
"""
Move one or more devices to a relative setpoint. Wait for all to complete.
If more than one device is specifed, the movements are done in parallel.
Parameters
----------
args :
device1, value1, device2, value2, ...
group : string, optional
Used to mark these as a unit to be waited on.
kwargs :
passed to obj.set()
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.rel_set`
:func:`bluesky.plan_stubs.mv`
"""
objs = []
for obj, val in partition(2, args):
objs.append(obj)
from .preprocessors import relative_set_decorator
@relative_set_decorator(objs)
def inner_mvr():
return (yield from mv(*args, group=group, **kwargs))
return (yield from inner_mvr())
movr = mvr # synonym
[docs]def rd(obj, *, default_value=0):
"""Reads a single-value non-triggered object
This is a helper plan to get the scalar value out of a Device
(such as an EpicsMotor or a single EpicsSignal).
For devices that have more than one read key the following rules are used:
- if exactly 1 field is hinted that value is used
- if no fields are hinted and there is exactly 1 value in the
reading that value is used
- if more than one field is hinted an Exception is raised
- if no fields are hinted and there is more than one key in the reading an
Exception is raised
The devices is not triggered and this plan does not create any Events
Parameters
----------
obj : Device
The device to be read
default_value : Any
The value to return when not running in a "live" RunEngine.
This come ups when ::
ret = yield Msg('read', obj)
assert ret is None
the plan is passed to `list` or some other iterator that
repeatedly sends `None` into the plan to advance the
generator.
Returns
-------
val : Any or None
The "single" value of the device
"""
hints = getattr(obj, 'hints', {}).get("fields", [])
if len(hints) > 1:
msg = (
f"Your object {obj} ({obj.name}.{getattr(obj, 'dotted_name', '')}) "
f"has {len(hints)} items hinted ({hints}). We do not know how to "
"pick out a single value. Please adjust the hinting by setting the "
"kind of the components of this device or by rd ing one of it's components"
)
raise ValueError(msg)
elif len(hints) == 0:
hint = None
if hasattr(obj, "read_attrs"):
if len(obj.read_attrs) != 1:
msg = (
f"Your object {obj} ({obj.name}.{getattr(obj, 'dotted_name', '')}) "
f"and has {len(obj.read_attrs)} read attrs. We do not know how to "
"pick out a single value. Please adjust the hinting/read_attrs by "
"setting the kind of the components of this device or by rd ing one "
"of its components"
)
raise ValueError(msg)
# len(hints) == 1
else:
(hint,) = hints
ret = yield from read(obj)
# list-ify mode
if ret is None:
return default_value
if hint is not None:
return ret[hint]["value"]
# handle the no hint 1 field case
try:
(data,) = ret.values()
except ValueError as er:
msg = (
f"Your object {obj} ({obj.name}.{getattr(obj, 'dotted_name', '')}) "
f"and has {len(ret)} read values. We do not know how to pick out a "
"single value. Please adjust the hinting/read_attrs by setting the "
"kind of the components of this device or by rd ing one of its components"
)
raise ValueError(msg) from er
else:
return data["value"]
[docs]def stop(obj):
"""
Stop a device.
Parameters
----------
obj : Device
Yields
------
msg : Msg
"""
return (yield Msg('stop', obj))
[docs]def trigger(obj, *, group=None, wait=False):
"""
Trigger and acquisition. Optionally, wait for it to complete.
Parameters
----------
obj : Device
group : string (or any hashable object), optional
identifier used by 'wait'; None by default
wait : boolean, optional
If True, wait for completion before processing any more messages.
False by default.
Yields
------
msg : Msg
"""
ret = yield Msg('trigger', obj, group=group)
if wait:
yield Msg('wait', None, group=group)
return ret
[docs]def sleep(time):
"""
Tell the RunEngine to sleep, while asynchronously doing other processing.
This is not the same as ``import time; time.sleep()`` because it allows
other actions, like interruptions, to be processed during the sleep.
Parameters
----------
time : float
seconds
Yields
------
msg : Msg
Msg('sleep', None, time)
"""
return (yield Msg('sleep', None, time))
[docs]def wait(group=None):
"""
Wait for all statuses in a group to report being finished.
Parameters
----------
group : string (or any hashable object), optional
idenified given to `abs_set`, `rel_set`, `trigger`; None by default
Yields
------
msg : Msg
Msg('wait', None, group=group)
"""
return (yield Msg('wait', None, group=group))
_wait = wait # for internal references to avoid collision with 'wait' kwarg
[docs]def checkpoint():
"""
If interrupted, rewind to this point.
Yields
------
msg : Msg
Msg('checkpoint')
See Also
--------
:func:`bluesky.plan_stubs.clear_checkpoint`
"""
return (yield Msg('checkpoint'))
[docs]def clear_checkpoint():
"""
Designate that it is not safe to resume. If interrupted or paused, abort.
Yields
------
msg : Msg
Msg('clear_checkpoint')
See Also
--------
:func:`bluesky.plan_stubs.checkpoint`
"""
return (yield Msg('clear_checkpoint'))
[docs]def pause():
"""
Pause and wait for the user to resume.
Yields
------
msg : Msg
Msg('pause')
See Also
--------
:func:`bluesky.plan_stubs.deferred_pause`
:func:`bluesky.plan_stubs.sleep`
"""
return (yield Msg('pause', None, defer=False))
[docs]def deferred_pause():
"""
Pause at the next checkpoint.
Yields
------
msg : Msg
Msg('pause', defer=True)
See Also
--------
:func:`bluesky.plan_stubs.pause`
:func:`bluesky.plan_stubs.sleep`
"""
return (yield Msg('pause', None, defer=True))
[docs]def kickoff(obj, *, group=None, wait=False, **kwargs):
"""
Kickoff a fly-scanning device.
Parameters
----------
obj : fly-able
Device with 'kickoff', 'complete', and 'collect' methods
group : string (or any hashable object), optional
identifier used by 'wait'
wait : boolean, optional
If True, wait for completion before processing any more messages.
False by default.
kwargs
passed through to ``obj.kickoff()``
Yields
------
msg : Msg
Msg('kickoff', obj)
See Also
--------
:func:`bluesky.plan_stubs.complete`
:func:`bluesky.plan_stubs.collect`
:func:`bluesky.plan_stubs.wait`
"""
ret = (yield Msg('kickoff', obj, group=group, **kwargs))
if wait:
yield from _wait(group=group)
return ret
[docs]def complete(obj, *, group=None, wait=False, **kwargs):
"""
Tell a flyer, 'stop collecting, whenver you are ready'.
The flyer returns a status object. Some flyers respond to this
command by stopping collection and returning a finished status
object immedately. Other flyers finish their given course and
finish whenever they finish, irrespective of when this command is
issued.
Parameters
----------
obj : fly-able
Device with 'kickoff', 'complete', and 'collect' methods
group : string (or any hashable object), optional
identifier used by 'wait'
wait : boolean, optional
If True, wait for completion before processing any more messages.
False by default.
kwargs
passed through to ``obj.complete()``
Yields
------
msg : Msg
a 'complete' Msg and maybe a 'wait' message
See Also
--------
:func:`bluesky.plan_stubs.kickoff`
:func:`bluesky.plan_stubs.collect`
:func:`bluesky.plan_stubs.wait`
"""
ret = yield Msg('complete', obj, group=group, **kwargs)
if wait:
yield from _wait(group=group)
return ret
[docs]def collect(obj, *, stream=False, return_payload=True):
"""
Collect data cached by a fly-scanning device and emit documents.
Parameters
----------
obj : fly-able
Device with 'kickoff', 'complete', and 'collect' methods
stream : boolean, optional
If False (default), emit Event documents in one bulk dump. If True,
emit events one at time.
return_payload: boolean, optional
If True (default), return the collected Events. If False, return None.
Using ``stream=True`` and ``return_payload=False`` together avoids
accumulating the documents in memory: they are emmitted as they are
collected, and they are not accumulated.
Yields
------
msg : Msg
Msg('collect', obj)
See Also
--------
:func:`bluesky.plan_stubs.kickoff`
:func:`bluesky.plan_stubs.complete`
:func:`bluesky.plan_stubs.wait`
"""
return (yield Msg('collect', obj, stream=stream, return_payload=return_payload))
[docs]def stage(obj):
"""
'Stage' a device (i.e., prepare it for use, 'arm' it).
Parameters
----------
obj : Device
Yields
------
msg : Msg
Msg('stage', obj)
See Also
--------
:func:`bluesky.plan_stubs.unstage`
"""
return (yield Msg('stage', obj))
[docs]def unstage(obj):
"""
'Unstage' a device (i.e., put it in standby, 'disarm' it).
Parameters
----------
obj : Device
Yields
------
msg : Msg
Msg('unstage', obj)
See Also
--------
:func:`bluesky.plan_stubs.stage`
"""
return (yield Msg('unstage', obj))
[docs]def subscribe(name, func):
"""
Subscribe the stream of emitted documents.
Parameters
----------
name : {'all', 'start', 'descriptor', 'event', 'stop'}
func : callable
Expected signature: ``f(name, doc)`` where ``name`` is one of the
strings above ('all, 'start', ...) and ``doc`` is a dict
Yields
------
msg : Msg
Msg('subscribe', None, func, name)
See Also
--------
:func:`bluesky.plan_stubs.unsubscribe`
"""
return (yield Msg('subscribe', None, func, name))
[docs]def unsubscribe(token):
"""
Remove a subscription.
Parameters
----------
token : int
token returned by processing a 'subscribe' message
Yields
------
msg : Msg
Msg('unsubscribe', token=token)
See Also
--------
:func:`bluesky.plan_stubs.subscribe`
"""
return (yield Msg('unsubscribe', token=token))
[docs]def install_suspender(suspender):
"""
Install a suspender during a plan.
Parameters
----------
suspender : :class:`bluesky.suspenders.SuspenderBase`
The suspender to install
Yields
------
msg : Msg
Msg('install_suspender', None, suspender)
See Also
--------
:func:`bluesky.plan_stubs.remove_suspender`
"""
return (yield Msg('install_suspender', None, suspender))
[docs]def remove_suspender(suspender):
"""
Remove a suspender during a plan.
Parameters
----------
suspender : :class:`bluesky.suspenders.SuspenderBase`
The suspender to remove
Yields
------
msg : Msg
Msg('remove_suspender', None, suspender)
See Also
--------
:func:`bluesky.plan_stubs.install_suspender`
"""
return (yield Msg('remove_suspender', None, suspender))
[docs]def open_run(md=None):
"""
Mark the beginning of a new 'run'. Emit a RunStart document.
Parameters
----------
md : dict, optional
metadata
Yields
------
msg : Msg
``Msg('open_run', **md)``
See Also
--------
:func:`bluesky.plans_stubs.close_run`
"""
return (yield Msg('open_run', **(md or {})))
[docs]def close_run(exit_status=None, reason=None):
"""
Mark the end of the current 'run'. Emit a RunStop document.
Yields
------
msg : Msg
Msg('close_run')
exit_status : {None, 'success', 'abort', 'fail'}
The exit status to report in the Stop document
reason : str, optional
Long-form description of why the run ended
See Also
--------
:func:`bluesky.plans_stubs.open_run`
"""
return (yield Msg('close_run', exit_status=exit_status, reason=reason))
[docs]def wait_for(futures, **kwargs):
"""
Low-level: wait for a list of ``asyncio.Future`` objects to set (complete).
Parameters
----------
futures : collection
collection of asyncio.Future objects
kwargs
passed through to ``asyncio.wait()``
Yields
------
msg : Msg
``Msg('wait_for', None, futures, **kwargs)``
See Also
--------
:func:`bluesky.plan_stubs.wait`
"""
return (yield Msg('wait_for', None, futures, **kwargs))
[docs]def trigger_and_read(devices, name='primary'):
"""
Trigger and read a list of detectors and bundle readings into one Event.
Parameters
----------
devices : iterable
devices to trigger (if they have a trigger method) and then read
name : string, optional
event stream name, a convenient human-friendly identifier; default
name is 'primary'
Yields
------
msg : Msg
messages to 'trigger', 'wait' and 'read'
"""
# If devices is empty, don't emit 'create'/'save' messages.
if not devices:
yield from null()
devices = separate_devices(devices) # remove redundant entries
rewindable = all_safe_rewind(devices) # if devices can be re-triggered
def inner_trigger_and_read():
grp = _short_uid('trigger')
no_wait = True
for obj in devices:
if hasattr(obj, 'trigger'):
no_wait = False
yield from trigger(obj, group=grp)
# Skip 'wait' if none of the devices implemented a trigger method.
if not no_wait:
yield from wait(group=grp)
yield from create(name)
ret = {} # collect and return readings to give plan access to them
for obj in devices:
reading = (yield from read(obj))
if reading is not None:
ret.update(reading)
yield from save()
return ret
from .preprocessors import rewindable_wrapper
return (yield from rewindable_wrapper(inner_trigger_and_read(),
rewindable))
[docs]def broadcast_msg(command, objs, *args, **kwargs):
"""
Generate many copies of a mesasge, applying it to a list of devices.
Parameters
----------
command : string
devices : iterable
``*args``
args for message
``**kwargs``
kwargs for message
Yields
------
msg : Msg
"""
return_vals = []
for o in objs:
ret = yield Msg(command, o, *args, **kwargs)
return_vals.append(ret)
return return_vals
[docs]def repeater(n, gen_func, *args, **kwargs):
"""
Generate n chained copies of the messages from gen_func
Parameters
----------
n : int or None
total number of repetitions; if None, infinite
gen_func : callable
returns generator instance
``*args``
args for gen_func
``**kwargs``
kwargs for gen_func
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.caching_repeater`
"""
it = range
if n is None:
n = 0
it = itertools.count
for j in it(n):
yield from gen_func(*args, **kwargs)
[docs]def caching_repeater(n, plan):
"""
Generate n chained copies of the messages in a plan.
This is different from ``repeater`` above because it takes in a
generator or iterator, not a function that returns one.
Parameters
----------
n : int or None
total number of repetitions; if None, infinite
plan : iterable
Yields
------
msg : Msg
See Also
--------
:func:`bluesky.plan_stubs.repeater`
"""
warnings.warn("The caching_repeater will be removed in a future version "
"of bluesky.", stacklevel=2)
if n is None:
gen = itertools.count(0)
else:
gen = range(n)
lst_plan = list(plan)
for _ in gen:
yield from (m for m in lst_plan)
[docs]def one_shot(detectors, take_reading=trigger_and_read):
"""Inner loop of a count.
This is the default function for ``per_shot`` in count plans.
Parameters
----------
detectors : Iterable[OphydObj]
devices to read
take_reading : plan, optional
function to do the actual acquisition ::
def take_reading(dets, name='primary'):
yield from ...
Callable[List[OphydObj], Optional[str]] -> Generator[Msg], optional
Defaults to `trigger_and_read`
"""
yield Msg('checkpoint')
yield from take_reading(list(detectors))
[docs]def one_1d_step(detectors, motor, step, take_reading=trigger_and_read):
"""
Inner loop of a 1D step scan
This is the default function for ``per_step`` param in 1D plans.
Parameters
----------
detectors : iterable
devices to read
motor : Settable
The motor to move
step : Any
Where to move the motor to
take_reading : plan, optional
function to do the actual acquisition ::
def take_reading(dets, name='primary'):
yield from ...
Callable[List[OphydObj], Optional[str]] -> Generator[Msg], optional
Defaults to `trigger_and_read`
"""
def move():
grp = _short_uid('set')
yield Msg('checkpoint')
yield Msg('set', motor, step, group=grp)
yield Msg('wait', None, group=grp)
yield from move()
return (yield from take_reading(list(detectors) + [motor]))
[docs]def move_per_step(step, pos_cache):
"""
Inner loop of an N-dimensional step scan without any readings
This can be used as a building block for custom ``per_step`` stubs.
Parameters
----------
step : dict
mapping motors to positions in this step
pos_cache : dict
mapping motors to their last-set positions
"""
yield Msg('checkpoint')
grp = _short_uid('set')
for motor, pos in step.items():
if pos == pos_cache[motor]:
# This step does not move this motor.
continue
yield Msg('set', motor, pos, group=grp)
pos_cache[motor] = pos
yield Msg('wait', None, group=grp)
[docs]def one_nd_step(detectors, step, pos_cache, take_reading=trigger_and_read):
"""
Inner loop of an N-dimensional step scan
This is the default function for ``per_step`` param`` in ND plans.
Parameters
----------
detectors : iterable
devices to read
step : dict
mapping motors to positions in this step
pos_cache : dict
mapping motors to their last-set positions
take_reading : plan, optional
function to do the actual acquisition ::
def take_reading(dets, name='primary'):
yield from ...
Callable[List[OphydObj], Optional[str]] -> Generator[Msg], optional
Defaults to `trigger_and_read`
"""
motors = step.keys()
yield from move_per_step(step, pos_cache)
yield from take_reading(list(detectors) + list(motors))
[docs]def repeat(plan, num=1, delay=None):
"""
Repeat a plan num times with delay and checkpoint between each repeat.
This is different from ``repeater`` and ``caching_repeater`` in that it
adds ``checkpoint`` and optionally ``sleep`` messages if delay is provided.
This is intended for users who need the structure of ``count`` but do not
want to reimplement the control flow.
Parameters
----------
plan: callable
Callable that returns an iterable of Msg objects
num : integer, optional
number of readings to take; default is 1
If None, capture data until canceled
delay : iterable or scalar, optional
time delay between successive readings; default is 0
Notes
-----
If ``delay`` is an iterable, it must have at least ``num - 1`` entries or
the plan will raise a ``ValueError`` during iteration.
"""
# Create finite or infinite counter
if num is None:
iterator = itertools.count()
else:
iterator = range(num)
# If delay is a scalar, repeat it forever. If it is an iterable, leave it.
if not isinstance(delay, Iterable):
delay = itertools.repeat(delay)
else:
try:
num_delays = len(delay)
except TypeError:
# No way to tell in advance if we have enough delays.
pass
else:
if num - 1 > num_delays:
raise ValueError("num=%r but delays only provides %r "
"entries" % (num, num_delays))
delay = iter(delay)
def repeated_plan():
for i in iterator:
now = time.time() # Intercept the flow in its earliest moment.
yield Msg('checkpoint')
yield from ensure_generator(plan())
try:
d = next(delay)
except StopIteration:
if i + 1 == num:
break
elif num is None:
break
else:
# num specifies a number of iterations less than delay
raise ValueError("num=%r but delays only provides %r "
"entries" % (num, i))
if d is not None:
d = d - (time.time() - now)
if d > 0: # Sleep if and only if time is left to do it.
yield Msg('sleep', None, d)
return (yield from repeated_plan())