Live Feedback and Processing

Overview of Callbacks

As the RunEngine processes instructions, it creates Documents, plain Python dictionaries organized in a specified but flexible way. These Documents contain the data and metadata generated during the plan’s execution. Each time a new Document is created, the RunEngine passes it to a list of functions. These functions can do anyting: store the data to disk, print a line of text to the scren, add a point to a plot, or even transfer the data to a cluster for immediate processing. These functions are called “callbacks.”

We subscribe callbacks to the live stream of Documents. You can think of a callback as a self-addressed stamped envelope. It tells the RunEngine, “When you create a Document, send it to this function for processing.”

In order to keep up with the scan and avoiding slowing down data collection, most subscriptions skip some Documents when they fall behind. A table might skip a row, a plot might skip a point. But critical subscriptions – like saving the data – are run in a lossless mode guananteed to process all the Docuemnts.

Simplest Example

This example passes every Document to the print function, printing each Document as it is generated during data collection.

RE(plan, print)

We will not show the lenthy output of this command here; the documents are not so nice to read in their raw form. See LiveTable below for a more refined implementation of this basic example.

Ways to Invoke Callbacks

Subscribe on a per-run basis

As in the simple example above, pass a second argument to the RunEngine.

In [1]: dets = [det1, det2, det3]

In [2]: RE(plan, LiveTable(dets))
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det1 |       det2 |       det3 |
+-----------+------------+------------+------------+------------+
|         1 | 21:16:08.4 |      5.000 |      1.765 |      1.213 |
|         2 | 21:16:08.4 |      5.000 |      1.765 |      1.213 |
|         3 | 21:16:08.4 |      5.000 |      1.765 |      1.213 |
|         4 | 21:16:08.4 |      5.000 |      1.765 |      1.213 |
+-----------+------------+------------+------------+------------+
Scan scan ['9837eb'] (scan num: 1)
Out[2]: ['9837eb45-0666-4a71-a30e-00b7d3c6cfb7']

LiveTable takes a list of objects or names to tell it which data columns to show. It prints the lines one at a time, as data collection proceeds.

To use multiple callbacks, you may pass a list of them.

In [3]: RE(plan, [LiveTable(dets), LivePlot(det1)])
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det1 |       det2 |       det3 |
+-----------+------------+------------+------------+------------+
|         1 | 21:16:08.7 |      5.000 |      1.765 |      1.213 |
|         2 | 21:16:08.9 |      5.000 |      1.765 |      1.213 |
|         3 | 21:16:08.9 |      5.000 |      1.765 |      1.213 |
|         4 | 21:16:09.0 |      5.000 |      1.765 |      1.213 |
+-----------+------------+------------+------------+------------+
Scan scan ['cbde5c'] (scan num: 2)
Out[3]: ['cbde5c03-8bfb-47af-9b82-4e06f2d24ce5']

Use this more verbose form to filter the Documents by type, feeding only certain document types to certain callbacks.

# Give all documents to LiveTable and LivePlot.
# Send only 'start' Documents to the print function.
RE(plan, {'all': [LiveTable(dets), LivePlot(det1)], 'start': print})

The allowed keys are ‘all’, ‘start’, ‘stop’, ‘descriptor’, and ‘event’, corresponding to the names of the Documents.

Subscribe each time a certain plan is used

Often, the same subscriptions are useful each time a certain kind of plan is run. To associate particular callbacks with a given plan, give the plan a subs attribute. All the built-in plans already have a subs attribute, primed with a dictionary of empty lists.

In [4]: plan.subs
Out[4]: {'all': [], 'descriptor': [], 'event': [], 'start': [], 'stop': []}

Append functions to these lists to route Documents to them every time the plan is executed.

In [5]: plan.subs['all'].append(LiveTable(dets))

Now our plan will invoke LiveTable every time.

In [6]: RE(plan)
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det1 |       det2 |       det3 |
+-----------+------------+------------+------------+------------+
|         1 | 21:16:09.2 |      5.000 |      1.765 |      1.213 |
|         2 | 21:16:09.2 |      5.000 |      1.765 |      1.213 |
|         3 | 21:16:09.2 |      5.000 |      1.765 |      1.213 |
|         4 | 21:16:09.2 |      5.000 |      1.765 |      1.213 |
+-----------+------------+------------+------------+------------+
Scan scan ['3da7da'] (scan num: 3)
Out[6]: ['3da7da37-e6a2-46d0-a8c4-9b3d9b3e5551']

Now suppose we change the detectors used by the plan.

In [7]: plan.detectors.remove(det3)

In [8]: plan.detectors
Out[8]: [reader: det, reader: det1, reader: det2]

The LiveTable callback is now out of date; it still includes [det1, det2, det3]. How can we make this more convenient?

To customize the callback based on the content of the plan, use a subscription factory: a function that takes in a plan and returns a callback function.

def make_table_with_detectors(plan):
    dets = plan.detectors
    return LiveTable(dets)

plan.sub_factories['all'].append(make_table_with_detectors)

When the plan is executed, it passes itself as an argument to its own sub_factories, producing customized callbacks. In this examples, a new LiveTable is made on the fly. Each time the plan is executed, new callbacks are made via factory functions like this one.

A plan can have both normal subscriptions in plan.subs and subscription factories in plan.sub_factories. All will be used.

Subscribe for every run

The RunEngine itself can store a collection of subscriptions to be applied to every single scan it executes.

Usually, if a subscription is useful for every single run, it should be added to a IPython configuration file and subscribed automatically at startup.

The method RE.subscribe passes through to this method:

Dispatcher.subscribe(name, func)

Register a callback function to consume documents.

The Run Engine can execute callback functions at the start and end of a scan, and after the insertion of new Event Descriptors and Events.

Parameters:
  • name ({'start', 'descriptor', 'event', 'stop', 'all'}) –
  • func (callable) – expecting signature like f(name, document) where name is a string and document is a dict
Returns:

token – an integer token that can be used to unsubscribe

Return type:

int

Dispatcher.unsubscribe(token)

Unregister a callback function using its integer ID.

Parameters:token (int) – the integer token issued by subscribe

Running Callbacks on Saved Data

Callbacks are designed to work live, but they also work retroactively on completed runs with data that has been saved to disk.

Warning

This subsection documents a feature that has not been released yet.

If the data is accessible from the Data Broker (as it is if you use the standard configuration) then you can feed data from the Data Broker in the callbacks.

from dataportal import DataBroker, stream
stream(header, callback_func)

Live Table

As each data point is collected (i.e., as each Event Document is generated) a row is added to the table. For nonscalar detectors, such as area detectors, the sum is shown. (See LiveImage, below, to view the images themselves.)

The only crucial parameter is the first one, which specifies which fields to include in the table. These can include specific fields (e.g., the string 'sclr_chan4') or readable objects (e.g., the object sclr).

Numerous other parameters allow you to customize the display style.

class bluesky.callbacks.LiveTable(fields, *, stream_name='primary', print_header_interval=50, min_width=12, default_prec=3, extra_pad=1, logbook=None)

Live updating table

Parameters:
  • fields (list) – List of fields to add to the table.
  • stream_name (str, optional) – The event stream to watch for
  • print_header_interval (int, optional) – Reprint the header every this many lines, defaults to 50
  • min_width (int, optional) – The minimum width is spaces of the data columns. Defaults to 12
  • default_prec (int, optional) – Precision to use if it can not be found in descriptor, defaults to 3
  • extra_pad (int, optional) – Number of extra spaces to put around the printed data, defaults to 1
  • logbook (callable, optional) –

    Must take a sting as the first positional argument

    def logbook(input_str):
    pass

Live Plot for Scalar Data

Plot scalars.

class bluesky.callbacks.LivePlot(y, x=None, legend_keys=None, xlim=None, ylim=None, fig=None, **kwargs)

Build a function that updates a plot from a stream of Events.

Note: If your figure blocks the main thread when you are trying to scan with this callback, call plt.ion() in your IPython session.

Parameters:
  • y (str) – the name of a data field in an Event
  • x (str, optional) – the name of a data field in an Event If None, use the Event’s sequence number.
  • legend_keys (list, optional) – The list of keys to extract from the RunStart document and format in the legend of the plot. The legend will always show the scan_id followed by a colon (“1: ”). Each
  • xlim (tuple) – passed to Axes.set_xlim
  • ylim (tuple) – passed to Axes.set_ylim
  • additional keyword arguments are passed through to Axes.plot. (All) –

Examples

>>> my_plotter = LivePlot('det', 'motor', legend_keys=['sample'])
>>> RE(my_scan, my_plotter)

Live Image Plot

class bluesky.callbacks.broker.LiveImage(field)

Stream 2D images in a cross-section viewer.

Parameters:field (string) – name of data field in an Event

Note

Requires a matplotlib fix that is not released as of this writing. The relevant commit is a951b7.

Live Raster Plot (Heat Map)

class bluesky.callbacks.LiveRaster(raster_shape, I, *, clim=None, cmap='viridis', xlabel='x', ylabel='y', extent=None)

Simple callback that fills in values based on a raster

This simply wraps around a AxesImage. seq_num is used to determine which pixel to fill in

Parameters:
  • raster_shap (tuple) – The (row, col) shape of the raster
  • I (str) – The field to use for the color of the markers
  • clim (tuple, optional) – The color limits
  • cmap (str or colormap, optional) – The color map to use

Automated Data Export

Exporting Image Data as TIFF Files

First, compose a filename template. This is a simple working example.

template = "output_dir/{start.scan_id}_{event.seq_num}.tiff"

The template can include metadata or event data from the scan.

template = ("output_dir/{start.scan_id}_{start.sample_name}_"
            "{event.data.temperature}_{event.seq_num}.tiff")

It can be handy to use the metadata to sort the images into directories.

template = "{start.user}/{start.scan_id}/{event.seq_num}.tiff"

If each image data point is actually a stack of 2D image planes, the template must also include {i}, which will count through the iamge planes in the stack.

(Most metadata comes from the “start” document, hence start.scan_id above. See here for a more comprehensive explanation of what is in the different documents.)

Next, create an exporter.

from bluesky.callbacks.broker import LiveTiffExporter

exporter = LiveTiffExporter('image', template)

Finally, to export all the images from a run when it finishes running, wrap the exporter in post_run and subscribe.

from bluesky.callbacks.broker import post_run

RE.subscribe('all', post_run(exporter))

It also possible to write TIFFs live, hence the name LiveTiffExporter, but there is an important disadvantage to this: in order to ensure that every image is saved, a lossless subscription must be used. And, as a consequence, the progress of the experiment may be intermittently slowed while data is written to disk. In some circumstances, this affect on the timing of the experiment may not be acceptable.

RE.subscribe_lossless('all', exporter)

There are more configuration options avaiable, as given in detail below.

class bluesky.callbacks.broker.LiveTiffExporter(field, template, dryrun=False, overwrite=False)

Save TIFF files.

Incorporate metadata and data from individual data points in the filenames.

Parameters:
  • field (str) – a data key, e.g., ‘image’
  • template (str) – A templated file path, where curly brackets will be filled in with the attributes of ‘start’, ‘event’, and (for image stacks) ‘i’, a sequential number. e.g., “dir/scan{start.scan_id}_by_{start.experimenter}_{i}.tiff”
  • dryrun (bool) – default to False; if True, do not write any files
  • overwrite (bool) – default to False, raising an OSError if file exists
filenames

list of filenames written in ongoing or most recent run

Export All Data and Metadata in an HDF5 File

A Stop Document is emitted at the end of every run. Subscribe to it, using it as a cue to load the dataset via the DataBroker and export an HDF5 file using suitcase.

Working example:

from databroker import DataBroker as db
import suitcase

def suitcase_as_callback(name, doc):
    if name != 'stop':
        return
    run_start_uid = doc['run_start']
    header = db[run_start_uid]
    filename = '{}.h5'.format(run_start_uid)
    suitcase.export(header, filename)

RE.subscribe('stop', suitcase_as_callback)

Verify Data Has Been Saved

The following verifies that all Documents and external files from a run have been saved to disk and are accessible from the DataBroker. It prints a message indicating success or failure.

Note: If the data collection machine is not able to access the machine where some external data is being saved, it will indicate failure. This can be a false alarm.

from bluesky.callbacks.broker import post_run, verify_files_saved

RE.subscribe('all', post_run(verify_files_saved))

Writing Custom Callbacks

Any function that accepts a Python dictionary as its argument can be used as a callback. Refer to simple examples above to get started.

Two Simple Custom Callbacks

These simple examples illustrate the concept and the usage.

First, we define a function that takes two arguments

  1. the name of the Document type (‘start’, ‘stop’, ‘event’, or ‘descriptor’)
  2. the Document itself, a dictionary

This is the callback.

In [9]: def print_data(name, doc):
   ...:     print("Measured: %s" % doc['data'])
   ...: 

Then, we tell the RunEngine to call this function on each Event Document. We are setting up a subscription.

In [10]: s = Count([det])

In [11]: RE(s, {'event': print_data})
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det1 |       det2 |       det3 |
+-----------+------------+------------+------------+------------+
|         1 | 21:16:09.9 |            |            |            |
Measured: {'det': 0.00033546262790251185}
+-----------+------------+------------+------------+------------+
Count count ['cfa3ce'] (scan num: 4)
Out[11]: ['cfa3ce23-c584-471b-94a0-79e51e9ee60a']

Each time the RunEngine generates a new Event Doucment (i.e., data point) print_data is called.

There are five kinds of subscriptions matching the four kinds of Documents plus an ‘all’ subscription that receives all Documents.

  • ‘start’
  • ‘descriptor’
  • ‘event’
  • ‘stop’
  • ‘all’

We can use the ‘stop’ subscription to trigger automatic end-of-run activities. For example:

In [12]: def celebrate(name, doc):
   ....:     print("The run is finished!")
   ....: 

Let’s use both print_data and celebrate at once.

In [13]: RE(s, {'event': print_data, 'stop': celebrate})
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det1 |       det2 |       det3 |
+-----------+------------+------------+------------+------------+
|         1 | 21:16:10.1 |            |            |            |
Measured: {'det': 0.00033546262790251185}
The run is finished!
+-----------+------------+------------+------------+------------+
Count count ['20daf8'] (scan num: 5)
Out[13]: ['20daf835-0748-49ae-9fdf-6877d2abf4aa']

Using multiple document types

Some tasks use only one Document type, but we often need to use more than one. For example, LiveTable uses ‘start’ kick off the creation of a fresh table, it uses ‘event’ to see the data, and it uses ‘stop’ to draw the bottom border.

A convenient pattern for this kind of subscription is a class with a method for each Document type.

from bluesky.callbacks import CallbackBase
class MyCallback(CallbackBase):
    def start(self, doc):
        print("I got a new 'start' Document")
        # Do something
    def descriptor(self, doc):
        print("I got a new 'descriptor' Document")
        # Do something
    def event(self, doc):
        print("I got a new 'event' Document")
        # Do something
    def stop(self, doc):
        print("I got a new 'stop' Document")
        # Do something

The base class, CallbackBase, takes care of dispatching each Document to the corresponding method. If your application does not need all four, you may simple omit methods that aren’t required.

Lossless Subscriptions for Critical Functions

Because subscriptions are processed during a scan, it’s possible that they can slow down data collection. We mitigate this by making the subscriptions lossy. That is, some Documents will be skipped if the subscription functions take too long and fall behind. For the purposes of real-time feedback, this is usually acceptable. For other purposes, like saving data to metadatastore, it is not.

Critical subscriptions are subscriptions that are executed on every Document no matter how long they take to run, potentially slowing down data collection but guaranteeing that all tasks are completed but the scan proceeds.

For example, in the standard configuration, metadatastore insertion functions are registered as critical subscriptions.

If your subscription requires the complete, lossless stream of Documents and you are will to accept the possibility of slowing down data collection while that stream in processed, you can register your own critical subscriptions.

RunEngine.subscribe_lossless(name, func)

Register a callback function to consume documents.

Functions registered here are considered “critical.” They receive a lossless stream of Event documents. If they generate an exception they always abort the run. (In contrast, exceptions from normal subscriptions are ignored by default.)

The Run Engine can execute callback functions at the start and end of a scan, and after the insertion of new Event Descriptors and Events.

Parameters:
  • name ({'start', 'descriptor', 'event', 'stop', 'all'}) –
  • func (callable) – expecting signature like f(name, document) where name is a string and document is a dict
Returns:

token – an integer token that can be used to unsubscribe

Return type:

int

RunEngine.unsubscribe_lossless(token)

Un-register a ‘critical’ callback function.

Parameters:token (int) – an integer token returned by _subscribe_lossless