from collections import deque
from itertools import count, tee
import time as ttime
from event_model import DocumentNames
from .log import doc_logger
from .utils import (
new_uid,
IllegalMessageSequence,
_rearrange_into_parallel_dicts,
short_uid,
Msg,
)
[docs]class RunBundler:
[docs] def __init__(self, md, record_interruptions, emit, emit_sync, log, *, loop):
# state stolen from the RE
self.bundling = False # if we are in the middle of bundling readings
self._bundle_name = None # name given to event descriptor
self._run_start_uid = None # The (future) runstart uid
self._objs_read = deque() # objects read in one Event
self._read_cache = deque() # cache of obj.read() in one Event
self._asset_docs_cache = deque() # cache of obj.collect_asset_docs()
self._describe_cache = dict() # cache of all obj.describe() output
self._config_desc_cache = dict() # " obj.describe_configuration()
self._config_values_cache = dict() # " obj.read_configuration() values
self._config_ts_cache = dict() # " obj.read_configuration() timestamps
self._descriptors = dict() # cache of {name: (objs_frozen_set, doc)}
self._sequence_counters = dict() # a seq_num counter per stream
self._teed_sequence_counters = dict() # for if we redo data-points
self._monitor_params = dict() # cache of {obj: (cb, kwargs)}
self.run_is_open = False
self._uncollected = set() # objects after kickoff(), before collect()
# we expect the RE to take care of the composition
self._md = md
# this is state on the RE, mirror it here rather than refer to
# the parent
self.record_interruptions = record_interruptions
# this is RE.emit, but lifted to this context
self.emit = emit
self.emit_sync = emit_sync
self.log = log
self.loop = loop
[docs] async def open_run(self, msg):
self.run_is_open = True
self._run_start_uid = new_uid()
self._interruptions_desc_uid = None # uid for a special Event Desc.
self._interruptions_counter = count(1) # seq_num, special Event stream
doc = dict(uid=self._run_start_uid, time=ttime.time(), **self._md)
await self.emit(DocumentNames.start, doc)
doc_logger.debug("[start] document is emitted (run_uid=%r)", self._run_start_uid,
extra={'doc_name': 'start',
'run_uid': self._run_start_uid})
await self.reset_checkpoint_state_coro()
# Emit an Event Descriptor for recording any interruptions as Events.
if self.record_interruptions:
self._interruptions_desc_uid = new_uid()
dk = {"dtype": "string", "shape": [], "source": "RunEngine"}
interruptions_desc = dict(
time=ttime.time(),
uid=self._interruptions_desc_uid,
name="interruptions",
data_keys={"interruption": dk},
run_start=self._run_start_uid,
)
await self.emit(DocumentNames.descriptor, interruptions_desc)
return self._run_start_uid
[docs] async def close_run(self, msg):
"""Instruct the RunEngine to write the RunStop document
Expected message object is::
Msg('close_run', None, exit_status=None, reason=None)
if *exit_stats* and *reason* are not provided, use the values
stashed on the RE.
"""
if not self.run_is_open:
raise IllegalMessageSequence(
"A 'close_run' message was received but there is no run "
"open. If this occurred after a pause/resume, add "
"a 'checkpoint' message after the 'close_run' message."
)
self.log.debug("Stopping run %r", self._run_start_uid)
# Clear any uncleared monitoring callbacks.
for obj, (cb, kwargs) in list(self._monitor_params.items()):
obj.clear_sub(cb)
del self._monitor_params[obj]
# Count the number of Events in each stream.
num_events = {}
for bundle_name, counter in self._sequence_counters.items():
if bundle_name is None:
# rare but possible via Msg('create', name='primary')
continue
num_events[bundle_name] = next(counter) - 1
reason = msg.kwargs.get("reason", None)
if reason is None:
reason = ""
exit_status = msg.kwargs.get("exit_status", "success") or "success"
doc = dict(
run_start=self._run_start_uid,
time=ttime.time(),
uid=new_uid(),
exit_status=exit_status,
reason=reason,
num_events=num_events,
)
await self.emit(DocumentNames.stop, doc)
doc_logger.debug("[stop] document is emitted (run_uid=%r)", self._run_start_uid,
extra={'doc_name': 'stop',
'run_uid': self._run_start_uid})
await self.reset_checkpoint_state_coro()
self.run_is_open = False
return doc["run_start"]
[docs] async def create(self, msg):
"""Trigger the run engine to start bundling future obj.read() calls for
an Event document
Expected message object is::
Msg('create', None, name='primary')
Msg('create', name='primary')
Note that the `name` kwarg will be the 'name' field of the resulting
descriptor. So descriptor['name'] = msg.kwargs['name'].
Also note that changing the 'name' of the Event will create a new
Descriptor document.
"""
if self.bundling:
raise IllegalMessageSequence(
"A second 'create' message is not "
"allowed until the current event "
"bundle is closed with a 'save' or "
"'drop' message."
)
self._read_cache.clear()
self._asset_docs_cache.clear()
self._objs_read.clear()
self.bundling = True
command, obj, args, kwargs, _ = msg
try:
self._bundle_name = kwargs["name"]
except KeyError:
try:
self._bundle_name, = args
except ValueError:
raise ValueError(
"Msg('create') now requires a stream name, given as "
"Msg('create', name) or Msg('create', name=name)"
) from None
[docs] async def read(self, msg, reading):
"""
Add a reading to the open event bundle.
Expected message object is::
Msg('read', obj)
"""
if self.bundling:
obj = msg.obj
# if the object is not in the _describe_cache, cache it
if obj not in self._describe_cache:
# Validate that there is no data key name collision.
data_keys = obj.describe()
self._describe_cache[obj] = data_keys
self._config_desc_cache[obj] = obj.describe_configuration()
self._cache_config(obj)
# check that current read collides with nothing else in
# current event
cur_keys = set(self._describe_cache[obj].keys())
for read_obj in self._objs_read:
# that is, field names
known_keys = self._describe_cache[read_obj].keys()
if set(known_keys) & cur_keys:
raise ValueError(
f"Data keys (field names) from {obj!r} "
f"collide with those from {read_obj!r}. "
f"The colliding keys are {set(known_keys) & cur_keys}"
)
# add this object to the cache of things we have read
self._objs_read.append(obj)
# Stash the results, which will be emitted the next time _save is
# called --- or never emitted if _drop is called instead.
self._read_cache.append(reading)
# Ask the object for any resource or datum documents is has cached
# and cache them as well. Likewise, these will be emitted if and
# when _save is called.
if hasattr(obj, "collect_asset_docs"):
self._asset_docs_cache.extend(
obj.collect_asset_docs(*msg.args, **msg.kwargs)
)
return reading
def _cache_config(self, obj):
"Read the object's configuration and cache it."
config_values = {}
config_ts = {}
for key, val in obj.read_configuration().items():
config_values[key] = val["value"]
config_ts[key] = val["timestamp"]
self._config_values_cache[obj] = config_values
self._config_ts_cache[obj] = config_ts
[docs] async def monitor(self, msg):
"""
Monitor a signal. Emit event documents asynchronously.
A descriptor document is emitted immediately. Then, a closure is
defined that emits Event documents associated with that descriptor
from a separate thread. This process is not related to the main
bundling process (create/read/save).
Expected message object is::
Msg('monitor', obj, **kwargs)
Msg('monitor', obj, name='event-stream-name', **kwargs)
where kwargs are passed through to ``obj.subscribe()``
"""
obj = msg.obj
if msg.args:
raise ValueError(
"The 'monitor' Msg does not accept positional " "arguments."
)
kwargs = dict(msg.kwargs)
name = kwargs.pop("name", short_uid("monitor"))
if obj in self._monitor_params:
raise IllegalMessageSequence(
"A 'monitor' message was sent for {}"
"which is already monitored".format(obj)
)
descriptor_uid = new_uid()
data_keys = obj.describe()
config = {obj.name: {"data": {}, "timestamps": {}}}
config[obj.name]["data_keys"] = obj.describe_configuration()
for key, val in obj.read_configuration().items():
config[obj.name]["data"][key] = val["value"]
config[obj.name]["timestamps"][key] = val["timestamp"]
object_keys = {obj.name: list(data_keys)}
hints = {}
if hasattr(obj, "hints"):
hints.update({obj.name: obj.hints})
desc_doc = dict(
run_start=self._run_start_uid,
time=ttime.time(),
data_keys=data_keys,
uid=descriptor_uid,
configuration=config,
hints=hints,
name=name,
object_keys=object_keys,
)
doc_logger.debug("[descriptor] document is emitted with name %r containing "
"data keys %r (run_uid=%r)", name, data_keys.keys(),
self._run_start_uid,
extra={'doc_name': 'descriptor',
'run_uid': self._run_start_uid,
'data_keys': data_keys.keys()})
seq_num_counter = count(1)
def emit_event(*args, **kwargs):
# Ignore the inputs. Use this call as a signal to call read on the
# object, a crude way to be sure we get all the info we need.
data, timestamps = _rearrange_into_parallel_dicts(obj.read())
doc = dict(
descriptor=descriptor_uid,
time=ttime.time(),
data=data,
timestamps=timestamps,
seq_num=next(seq_num_counter),
uid=new_uid(),
)
self.emit_sync(DocumentNames.event, doc)
self._monitor_params[obj] = emit_event, kwargs
await self.emit(DocumentNames.descriptor, desc_doc)
obj.subscribe(emit_event, **kwargs)
[docs] def record_interruption(self, content):
"""
Emit an event in the 'interruptions' event stream.
If we are not inside a run or if self.record_interruptions is False,
nothing is done.
"""
if self._interruptions_desc_uid is not None:
# We are inside a run and self.record_interruptions is True.
doc = dict(
descriptor=self._interruptions_desc_uid,
time=ttime.time(),
uid=new_uid(),
seq_num=next(self._interruptions_counter),
data={"interruption": content},
timestamps={"interruption": ttime.time()},
)
self.emit_sync(DocumentNames.event, doc)
[docs] def rewind(self):
self._sequence_counters.clear()
self._sequence_counters.update(self._teed_sequence_counters)
# This is needed to 'cancel' an open bundling (e.g. create) if
# the pause happens after a 'checkpoint', after a 'create', but
# before the paired 'save'.
self.bundling = False
[docs] async def unmonitor(self, msg):
"""
Stop monitoring; i.e., remove the callback emitting event documents.
Expected message object is::
Msg('unmonitor', obj)
"""
obj = msg.obj
if obj not in self._monitor_params:
raise IllegalMessageSequence(
f"Cannot 'unmonitor' {obj}; it is not " "being monitored."
)
cb, kwargs = self._monitor_params[obj]
obj.clear_sub(cb)
del self._monitor_params[obj]
await self.reset_checkpoint_state_coro()
[docs] async def save(self, msg):
"""Save the event that is currently being bundled
Create and emit an Event document containing the data read from devices
in self._objs_read. Emit any Resource and Datum documents cached by
those devices before emitting the Event document. If this is the first
Event of its stream then create and emit the Event Descriptor document
before emitting Resource, Datum, and Event documents.
Expected message object is::
Msg('save')
"""
if not self.bundling:
raise IllegalMessageSequence(
"A 'create' message must be sent, to "
"open an event bundle, before that "
"bundle can be saved with 'save'."
)
# Short-circuit if nothing has been read. (Do not create empty Events.)
if not self._objs_read:
self.bundling = False
self._bundle_name = None
return
# The Event Descriptor is uniquely defined by the set of objects
# read in this Event grouping.
objs_read = frozenset(self._objs_read)
# Event Descriptor key
desc_key = self._bundle_name
# This is a separate check because it can be reset on resume.
seq_num_key = desc_key
if seq_num_key not in self._sequence_counters:
counter = count(1)
counter_copy1, counter_copy2 = tee(counter)
self._sequence_counters[seq_num_key] = counter_copy1
self._teed_sequence_counters[seq_num_key] = counter_copy2
self.bundling = False
self._bundle_name = None
d_objs, descriptor_doc = self._descriptors.get(desc_key, (None, None))
if d_objs is not None and d_objs != objs_read:
raise RuntimeError(
"Mismatched objects read, expected {!s}, "
"got {!s}".format(d_objs, objs_read)
)
if descriptor_doc is None:
# We do not have an Event Descriptor for this set
# so one must be created.
data_keys = {}
config = {}
object_keys = {}
hints = {}
for obj in objs_read:
dks = self._describe_cache[obj]
obj_name = obj.name
# dks is an OrderedDict. Record that order as a list.
object_keys[obj.name] = list(dks)
for field, dk in dks.items():
dk["object_name"] = obj_name
data_keys.update(dks)
config[obj_name] = {}
config[obj_name]["data"] = self._config_values_cache[obj]
config[obj_name]["timestamps"] = self._config_ts_cache[obj]
config[obj_name]["data_keys"] = self._config_desc_cache[obj]
if hasattr(obj, "hints"):
hints[obj_name] = obj.hints
descriptor_uid = new_uid()
descriptor_doc = dict(
run_start=self._run_start_uid,
time=ttime.time(),
data_keys=data_keys,
uid=descriptor_uid,
configuration=config,
name=desc_key,
hints=hints,
object_keys=object_keys,
)
await self.emit(DocumentNames.descriptor, descriptor_doc)
doc_logger.debug(
"[descriptor] document emitted with name %r containing "
"data keys %r (run_uid=%r)",
obj_name,
data_keys.keys(),
self._run_start_uid,
extra={
'doc_name': 'descriptor',
'run_uid': self._run_start_uid,
'data_keys': data_keys.keys()}
)
self._descriptors[desc_key] = (objs_read, descriptor_doc)
descriptor_uid = descriptor_doc["uid"]
# Resource and Datum documents
for resource_or_datum_name, resource_or_datum_doc in self._asset_docs_cache:
# Add a 'run_start' field to resource documents on their way out
# since this field could not have been set correctly before this point.
if resource_or_datum_name == "resource":
resource_or_datum_doc["run_start"] = self._run_start_uid
doc_logger.debug(
"[%s] document emitted %r",
resource_or_datum_name,
resource_or_datum_doc,
extra={
"doc_name": resource_or_datum_name,
"run_uid": self._run_start_uid,
"doc": resource_or_datum_doc
}
)
await self.emit(
DocumentNames(resource_or_datum_name),
resource_or_datum_doc
)
# Event document
seq_num = next(self._sequence_counters[seq_num_key])
event_uid = new_uid()
# Merge list of readings into single dict.
readings = {k: v for d in self._read_cache for k, v in d.items()}
data, timestamps = _rearrange_into_parallel_dicts(readings)
# Mark all externally-stored data as not filled so that consumers
# know that the corresponding data are identifiers, not dereferenced
# data.
filled = {
k: False
for k, v in self._descriptors[desc_key][1]["data_keys"].items()
if "external" in v
}
event_doc = dict(
descriptor=descriptor_uid,
time=ttime.time(),
data=data,
timestamps=timestamps,
seq_num=seq_num,
uid=event_uid,
filled=filled,
)
await self.emit(DocumentNames.event, event_doc)
doc_logger.debug(
"[event] document emitted with data keys %r (run_uid=%r)",
data.keys(),
self._run_start_uid,
extra={
'doc_name': 'event',
'run_uid': self._run_start_uid,
'data_keys': data.keys()}
)
[docs] def clear_monitors(self):
for obj, (cb, kwargs) in list(self._monitor_params.items()):
try:
obj.clear_sub(cb)
except Exception:
self.log.exception("Failed to stop monitoring %r.", obj)
else:
del self._monitor_params[obj]
[docs] def reset_checkpoint_state(self):
# Keep a safe separate copy of the sequence counters to use if we
# rewind and retake some data points.
for key, counter in list(self._sequence_counters.items()):
counter_copy1, counter_copy2 = tee(counter)
self._sequence_counters[key] = counter_copy1
self._teed_sequence_counters[key] = counter_copy2
[docs] async def reset_checkpoint_state_coro(self):
self.reset_checkpoint_state()
[docs] async def suspend_monitors(self):
for obj, (cb, kwargs) in self._monitor_params.items():
obj.clear_sub(cb)
[docs] async def restore_monitors(self):
for obj, (cb, kwargs) in self._monitor_params.items():
obj.subscribe(cb, **kwargs)
[docs] async def clear_checkpoint(self, msg):
self._teed_sequence_counters.clear()
[docs] async def drop(self, msg):
"""Drop the event that is currently being bundled
Expected message object is::
Msg('drop')
"""
if not self.bundling:
raise IllegalMessageSequence(
"A 'create' message must be sent, to "
"open an event bundle, before that "
"bundle can be dropped with 'drop'."
)
self.bundling = False
self._bundle_name = None
self.log.debug("Dropped open event bundle")
[docs] async def kickoff(self, msg):
"""Start a flyscan object.
Expected message object is:
If `flyer_object` has a `kickoff` function that takes no arguments::
Msg('kickoff', flyer_object)
Msg('kickoff', flyer_object, group=<name>)
If *flyer_object* has a ``kickoff`` function that takes
``(start, stop, steps)`` as its function arguments::
Msg('kickoff', flyer_object, start, stop, step)
Msg('kickoff', flyer_object, start, stop, step, group=<name>)
"""
self._uncollected.add(msg.obj)
[docs] async def complete(self, msg):
"""
Tell a flyer, 'stop collecting, whenever you are ready'.
The flyer returns a status object. Some flyers respond to this
command by stopping collection and returning a finished status
object immediately. Other flyers finish their given course and
finish whenever they finish, irrespective of when this command is
issued.
Expected message object is::
Msg('complete', flyer, group=<GROUP>)
where <GROUP> is a hashable identifier.
"""
...
[docs] async def collect(self, msg):
"""
Collect data cached by a flyer and emit documents.
Expect message object is
Msg('collect', collect_obj)
Msg('collect', flyer_object, stream=True, return_payload=False)
"""
collect_obj = msg.obj
if not self.run_is_open:
# sanity check -- 'kickoff' should catch this and make this
# code path impossible
raise IllegalMessageSequence(
"A 'collect' message was sent but no run is open."
)
self._uncollected.discard(collect_obj)
if hasattr(collect_obj, "collect_asset_docs"):
# Resource and Datum documents
for name, doc in collect_obj.collect_asset_docs():
# Add a 'run_start' field to the resource document on its way out.
if name == "resource":
doc["run_start"] = self._run_start_uid
await self.emit(DocumentNames(name), doc)
collect_obj_config = {}
if hasattr(collect_obj, "read_configuration"):
doc_logger.debug("reading configuration from %s", collect_obj)
collect_obj_config[collect_obj.name] = {
"data": {},
"timestamps": {},
"data_keys": collect_obj.describe_configuration()
}
for config_key, config in collect_obj.read_configuration().items():
collect_obj_config[collect_obj.name]["data"][config_key] = config["value"]
collect_obj_config[collect_obj.name]["timestamps"][config_key] = config["timestamp"]
else:
doc_logger.debug("%s has no read_configuration method", collect_obj)
bulk_data = {}
local_descriptors = {} # hashed on objs_read, not (name, objs_read)
# collect_obj.describe_collect() returns a dictionary like this:
# {name_for_desc1: data_keys_for_desc1,
# name_for_desc2: data_keys_for_desc2, ...}
for stream_name, stream_data_keys in collect_obj.describe_collect().items():
if stream_name not in self._descriptors:
# We do not have an Event Descriptor for this set.
descriptor_uid = new_uid()
hints = {}
if hasattr(collect_obj, "hints"):
hints.update({collect_obj.name: collect_obj.hints})
doc = dict(
run_start=self._run_start_uid,
time=ttime.time(),
data_keys=stream_data_keys,
uid=descriptor_uid,
name=stream_name,
configuration=collect_obj_config,
hints=hints,
object_keys={collect_obj.name: list(stream_data_keys)},
)
await self.emit(DocumentNames.descriptor, doc)
doc_logger.debug("[descriptor] document is emitted with name %r "
"containing data keys %r (run_uid=%r)", stream_name,
stream_data_keys.keys(), self._run_start_uid,
extra={'doc_name': 'descriptor',
'run_uid': self._run_start_uid,
'data_keys': stream_data_keys.keys()})
self._descriptors[stream_name] = (stream_data_keys, doc)
self._sequence_counters[stream_name] = count(1)
else:
objs_read, doc = self._descriptors[stream_name]
if stream_data_keys != objs_read:
raise RuntimeError(
"Mismatched objects read, "
"expected {!s}, "
"got {!s}".format(stream_data_keys, objs_read)
)
descriptor_uid = doc["uid"]
local_descriptors[frozenset(stream_data_keys)] = (stream_name, descriptor_uid)
bulk_data[descriptor_uid] = []
# If stream is True, run 'event' subscription per document.
# If stream is False, run 'bulk_events' subscription once.
stream = msg.kwargs.get("stream", False)
# If True, accumulate all the Events in memory and return them at the
# end, providing the plan access to the Events. If False, do not
# accumulate, and return None.
return_payload = msg.kwargs.get('return_payload', True)
payload = []
for ev in collect_obj.collect():
if return_payload:
payload.append(ev)
objs_read = frozenset(ev["data"])
stream_name, descriptor_uid = local_descriptors[objs_read]
seq_num = next(self._sequence_counters[stream_name])
event_uid = new_uid()
reading = ev["data"]
for key in ev["data"]:
reading[key] = reading[key]
ev["data"] = reading
ev["descriptor"] = descriptor_uid
ev["seq_num"] = seq_num
ev["uid"] = event_uid
if stream:
doc_logger.debug("[event] document is emitted with data keys %r (run_uid=%r)",
ev['data'].keys(), self._run_start_uid,
event_uid,
extra={'doc_name': 'event',
'run_uid': self._run_start_uid,
'data_keys': ev['data'].keys()})
await self.emit(DocumentNames.event, ev)
else:
bulk_data[descriptor_uid].append(ev)
if not stream:
await self.emit(DocumentNames.bulk_events, bulk_data)
doc_logger.debug("[bulk events] document is emitted for descriptors (run_uid=%r)",
self._run_start_uid,
extra={'doc_name': 'bulk_events',
'run_uid': self._run_start_uid})
if return_payload:
return payload
[docs] async def backstop_collect(self):
for obj in list(self._uncollected):
try:
await self.collect(Msg("collect", obj))
except Exception:
self.log.exception("Failed to collect %r.", obj)