Source code for bluesky.callbacks.broker

import os
import time as ttime
from .core import CallbackBase
from ..utils import ensure_uid
import numpy as np


class BrokerCallbackBase(CallbackBase):
    """
    Base class for callbacks which need filled documents

    Parameters
    ----------
    fields: Iterable of str
        Names of data field in an Event
    db: Broker instance, optional
        The Broker instance to pull the data from
    """

    def __init__(self, fields, *, db=None):
        self.db = db
        self.fields = fields
        self.descriptor_dict = {}

    def clear(self):
        self.descriptor_dict.clear()

    def stop(self, doc):
        self.clear()

    def descriptor(self, doc):
        self.descriptor_dict = {doc['uid']: doc}

    def event(self, doc):
        # the subset of self.fields that are (1) in the doc and (2) unfilled
        # and (3) external
        fields = [field for field in self.fields
                  if (field in doc['data'] and
                      not doc.get('filled', {}).get(field) and
                      'external' in self.descriptor_dict[
                          doc['descriptor']]['data_keys'][field])]
        if fields:
            if self.db is None:
                raise RuntimeError('Either the data must be pre-loaded or '
                                   'a Broker instance must be provided '
                                   'via the db parameter of '
                                   'BrokerCallbackBase.')
            res, = self.db.fill_events(
                events=[doc],
                descriptors=[self.descriptor_dict[doc['descriptor']]],
                fields=fields)
            doc['data'].update(**res['data'])  # modify in place


[docs]class LiveImage(BrokerCallbackBase): """ Stream 2D images in a cross-section viewer. Parameters ---------- field : string name of data field in an Event fs: Registry instance The Registry instance to pull the data from cmap : str, colormap, or None color map to use. Defaults to gray norm : Normalize or None Normalization function to use limit_func : callable, optional function that takes in the image and returns clim values auto_redraw : bool, optional interpolation : str, optional Interpolation method to use. List of valid options can be found in CrossSection2DView.interpolation """ def __init__(self, field, *, db=None, cmap=None, norm=None, limit_func=None, auto_redraw=True, interpolation=None, window_title=None): from xray_vision.backend.mpl.cross_section_2d import CrossSection import matplotlib.pyplot as plt super().__init__((field,), db=db) fig = plt.figure() self.field = field self.cs = CrossSection(fig, cmap, norm, limit_func, auto_redraw, interpolation) if window_title: self.cs._fig.canvas.set_window_title(window_title) self.cs._fig.show() def event(self, doc): super().event(doc) data = doc['data'][self.field] self.update(data) def update(self, data): self.cs.update_image(data) self.cs._fig.canvas.draw_idle()
def post_run(callback, db, fill=False): """Trigger a callback to process all the Documents from a run at the end. This function does not receive the Document stream during collection. It retrieves the complete set of Documents from the DataBroker after collection is complete. Parameters ---------- callback : callable Expected signature :: def func(doc_name, doc): pass db : Broker The databroker instance to use fill : boolean, optional Whether to deference externally-stored data in the documents. False by default. Returns ------- func : function a function that accepts a RunStop Document Examples -------- Print a table with full (lossless) result set at the end of a run. >>> table = LiveTable(['det', 'motor']) >>> RE(scan(motor, [det], [1,2,3]), {'stop': post_run(table)}) +------------+-------------------+----------------+----------------+ | seq_num | time | det | motor | +------------+-------------------+----------------+----------------+ | 3 | 14:02:32.218348 | 5.00 | 3.00 | | 2 | 14:02:32.158503 | 5.00 | 2.00 | | 1 | 14:02:32.099807 | 5.00 | 1.00 | +------------+-------------------+----------------+----------------+ """ def f(name, doc): if name != 'stop': return uid = ensure_uid(doc['run_start']) header = db[uid] for name, doc in header.documents(fill=fill): callback(name, doc) # Depending on the order that this callback and the # databroker-insertion callback were called in, the databroker might # not yet have the 'stop' document that we currently have, so we'll # use our copy instead of expecting the header to include one. if name != 'stop': callback('stop', doc) return f def make_restreamer(callback, db): """ Run a callback whenever a uid is updated. Parameters ---------- callback : callable expected signature is `f(name, doc)` db : Broker The databroker instance to use Example ------- Run a callback whenever a uid is updated. >>> def f(name, doc): ... # do stuff ... >>> g = make_restreamer(f, db) To use this `ophyd.callbacks.LastUidSignal`: >>> last_uid_signal.subscribe(g) """ def cb(value, **kwargs): return db.process(db[value], callback) return cb def verify_files_saved(name, doc, db): "This is a brute-force approach. We retrieve all the data." ttime.sleep(0.1) # Wait for data to be saved. if name != 'stop': return print(" Verifying that all the run's Documents were saved...") try: header = db[ensure_uid(doc['run_start'])] except Exception as e: print(" Verification Failed! Error: {0}".format(e)) return else: print('\x1b[1A\u2713') # print a checkmark on the previous line print(" Verifying that all externally-stored files are accessible...") try: list(db.get_events(header, fill=True)) except Exception as e: print(" Verification Failed! Error: {0}".format(e)) else: print('\x1b[1A\u2713') # print a checkmark on the previous line
[docs]class LiveTiffExporter(BrokerCallbackBase): """ Save TIFF files. Incorporate metadata and data from individual data points in the filenames. Parameters ---------- field : str a data key, e.g., 'image' template : str A templated file path, where curly brackets will be filled in with the attributes of 'start', 'event', and (for image stacks) 'i', a sequential number. e.g., "dir/scan{start[scan_id]}_by_{start[experimenter]}_{i}.tiff" dryrun : bool default to False; if True, do not write any files overwrite : bool default to False, raising an OSError if file exists db : Broker, optional The databroker instance to use, if not provided use databroker singleton Attributes ---------- filenames : list of filenames written in ongoing or most recent run """ def __init__(self, field, template, dryrun=False, overwrite=False, db=None): try: import tifffile except ImportError: print("Tifffile is required by this callback. Please install" "tifffile and then try again." "\n\n\tpip install tifffile\n\nor\n\n\tconda install " "tifffile") raise else: # stash a reference so the module is accessible in self._save_image self._tifffile = tifffile try: import doct except ImportError: print('doct is required by LiveTiffExporter') else: self._doct = doct self.field = field super().__init__((field,), db=db.fs) self.template = template self.dryrun = dryrun self.overwrite = overwrite self.filenames = [] self._start = None def _save_image(self, image, filename): if not self.overwrite: if os.path.isfile(filename): raise OSError("There is already a file at {}. Delete " "it and try again.".format(filename)) if not self.dryrun: self._tifffile.imsave(filename, np.asarray(image)) self.filenames.append(filename) def start(self, doc): self.filenames = [] # Convert doc from dict into dottable dict, more convenient # in Python format strings: doc.key == doc['key'] self._start = self._doct.Document('start', doc) super().start(doc) def event(self, doc): if self.field not in doc['data']: return super().event(doc) image = np.asarray(doc['data'][self.field]) if image.ndim == 2: filename = self.template.format(start=self._start, event=doc) self._save_image(image, filename) if image.ndim == 3: for i, plane in enumerate(image): filename = self.template.format(i=i, start=self._start, event=doc) self._save_image(plane, filename) def stop(self, doc): self._start = None self.filenames = [] super().stop(doc)