API Documentation

Schemas and Names

The event-model Python package contains tooling for composing, validating, and transforming documents in the model.

class event_model.DocumentNames(value)[source]

An enumeration.

bulk_datum = 'bulk_datum'
bulk_events = 'bulk_events'
datum = 'datum'
datum_page = 'datum_page'
descriptor = 'descriptor'
event = 'event'
event_page = 'event_page'
resource = 'resource'
start = 'start'
stop = 'stop'

There are two dictionaries, event_model.schemas and event_model.schema_validators, which are keyed on the members of the event_model.DocumentNames enum and which are mapped, respectively, to a schema and an associated jsonschema.IValidator.

Routers

class event_model.RunRouter(factories, handler_registry=None, *, root_map=None, filler_class=<class 'event_model.Filler'>, fill_or_fail=False)[source]

Routes documents, by run, to callbacks it creates from factory functions.

A RunRouter is callable, and it has the signature router(name, doc), suitable for subscribing to the RunEngine.

It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below.

Warning

This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed.

Parameters
factorieslist

A list of callables with the signature:

factory('start', start_doc) -> List[Callbacks], List[SubFactories]

which should return two lists, which may be empty. All items in the first list should be callbacks — callables with the signature:

callback(name, doc)

that will receive that RunStart document and all subsequent documents from the run including the RunStop document. All items in the second list should be “subfactories” with the signature:

subfactory('descriptor', descriptor_doc) -> List[Callbacks]

These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive the RunStart document, that EventDescriptor, all Events that reference that EventDescriptor and finally the RunStop document for the run.

handler_registrydict, optional

This is passed to the Filler or whatever class is given in the filler_class parametr below.

Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

A ‘handler class’ may be any callable with the signature:

handler_class(full_path, **resource_kwargs)

It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

handler_instance(**datum_kwargs)

As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

root_map: dict, optional

This is passed to Filler or whatever class is given in the filler_class parameter below.

str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

filler_class: type

This is Filler by default. It can be a Filler subclass, functools.partial(Filler, ...), or any class that provides the same methods as DocumentRouter.

fill_or_fail: boolean, optional

By default (False), if a document with a spec not in handler_registry is encountered, let it pass through unfilled. But if set to True, fill everything and raise ``UndefinedAssetSpecification` if some unknown spec is encountered.

datum_page(doc)[source]
descriptor(descriptor_doc)[source]
event_page(doc)[source]
resource(doc)[source]
start(start_doc)[source]
stop(doc)[source]
class event_model.SingleRunDocumentRouter[source]

A DocumentRouter intended to process events from exactly one run.

get_descriptor(doc)[source]

Convenience method returning the descriptor associated with the specified document.

Parameters
docdict

event-model document

Returns
descriptor documentdict
get_start()[source]

Convenience method returning the start document for the associated run.

If no start document has been processed EventModelError will be raised.

Returns
start documentdict
get_stream_name(doc)[source]

Convenience method returning the name of the stream for the specified document.

Parameters
docdict

event-model document

Returns
stream namestr
class event_model.DocumentRouter(*, emit=None)[source]

Route each document by type to a corresponding method.

When an instance is called with a document type and a document like:

router(name, doc)

the document is passed to the method of the corresponding name, as in:

getattr(router, name)(doc)

The method is expected to return None or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.

Finally, the call to router(name, doc) returns:

(name, getattr(router, name)(doc))
Parameters
emit: callable, optional

Expected signature f(name, doc)

bulk_datum(doc)[source]
bulk_events(doc)[source]
datum(doc)[source]
datum_page(doc)[source]
descriptor(doc)[source]
emit(name, doc)[source]

Emit to the callable provided an instantiation time, if any.

event(doc)[source]
event_page(doc)[source]
resource(doc)[source]
start(doc)[source]
stop(doc)[source]
class event_model.Filler(handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024))[source]

Pass documents through, loading any externally-referenced data.

It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected.

But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished.

See Examples below.

Parameters
handler_registrydict

Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

A ‘handler class’ may be any callable with the signature:

handler_class(full_path, **resource_kwargs)

It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

handler_instance(**datum_kwargs)

As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

includeIterable

The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the exclude parameter.

excludeIterable

The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the include parameter.

root_map: dict

str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

coerce{‘as_is’, ‘numpy’}

Default is ‘as_is’. Other options (e.g. ‘delayed’) may be registered by external packages at runtime.

handler_cachedict, optional

A cache of handler instances. If None, a dict is used.

resource_cachedict, optional

A cache of Resource documents. If None, a dict is used.

datum_cachedict, optional

A cache of Datum documents. If None, a dict is used.

descriptor_cachedict, optional

A cache of EventDescriptor documents. If None, a dict is used.

retry_intervalsIterable, optional

If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an IOError it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set to None to try only once before raising DataNotAccessible. A subclass may catch this exception and implement a different retry mechanism — for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it.

Raises
DataNotAccessible

If an IOError is raised when loading the data after the configured number of attempts. See the retry_intervals parameter for details.

Examples

A Filler may be used as a context manager.

>>> with Filler(handler_registry) as filler:
...     for name, doc in stream:
...         filler(name, doc)  # mutates doc in place
...         # Do some analysis or export with name and doc.

Or as a long-lived object.

>>> f = Filler(handler_registry)
>>> for name, doc in stream:
...     filler(name, doc)  # mutates doc in place
...     # Do some analysis or export with name and doc.
...
>>> del filler  # Free up memory from potentially large caches.
clear_document_caches()[source]

Clear any cached documents.

clear_handler_cache()[source]

Clear any cached handler instances.

This operation may free significant memory, depending on the implementation of the handlers.

clone(handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=None)[source]

Create a new Filler instance from this one.

By default it will be created with the same settings that this Filler has. Individual settings may be overridden here.

The clone does not share any caches or internal state with the original.

close()[source]

Drop cached documents and handlers.

They are not explicitly cleared, so if there are other references to these caches they will remain.

deregister_handler(spec)[source]

Deregister a handler.

If no handler is registered for this spec, it is no-op and returns None.

Parameters
spec: str
Returns
handler: Handler or None
See https://blueskyproject.io/event-model/external.html
get_handler(resource)[source]

Return a new Handler instance for this Resource.

Parameters
resource: dict
Returns
handler: Handler
register_handler(spec, handler, overwrite=False)[source]

Register a handler.

Parameters
spec: str
handler: Handler
overwrite: boolean, optional

False by default

Raises
DuplicateHandler

If a handler is already registered for spec and overwrite is False

See https://blueskyproject.io/event-model/external.html
class event_model.NoFiller(*args, **kwargs)[source]

This does not fill the documents; it merely validates them.

It checks that all the references between the documents are resolvable and could be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs.

event_model.register_coercion(name, func, overwrite=False)[source]

Register a new option for Filler’s coerce argument.

This is an advanced feature. See source code for comments and examples.

Parameters
namestring

The new value for coerce that will invoke this function.

funccallable

Expected signature:

func(filler, handler_class) -> handler_class
overwriteboolean, optional

False by default. Name collissions will raise EventModelValueError unless this is set to True.

event_model.as_is(handler_class, filler_state)[source]

A no-op coercion function that returns handler_class unchanged.

event_model.force_numpy(handler_class, filler_state)[source]

A coercion that makes handler_class.__call__ return actual numpy.ndarray.

Document Minting

To use these functions start with compose_run() which will return a ComposeRunBundle.

event_model.compose_run(*, uid=None, time=None, metadata=None, validate=True)[source]

Compose a RunStart document and factory functions for related documents.

Parameters
uidstring, optional

Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated.

timefloat, optional

UNIX epoch time of start of this run. If None is given, the current time will be used.

metadatadict, optional

Additional metadata include the document

validateboolean, optional

Validate this document conforms to the schema.

Returns
ComposeRunBundle
class event_model.ComposeRunBundle(start_doc, compose_descriptor, compose_resource, compose_stop)
event_model.compose_descriptor(*, start, streams, event_counter, name, data_keys, uid=None, time=None, object_keys=None, configuration=None, hints=None, validate=True)[source]
class event_model.ComposeDescriptorBundle(descriptor_doc, compose_event, compose_event_page)
event_model.compose_event(*, descriptor, event_counter, data, timestamps, seq_num=None, filled=None, uid=None, time=None, validate=True)[source]
event_model.compose_event_page(*, descriptor, event_counter, data, timestamps, seq_num, filled=None, uid=None, time=None, validate=True)[source]
event_model.compose_resource(*, start, spec, root, resource_path, resource_kwargs, path_semantics='posix', uid=None, validate=True)[source]
class event_model.ComposeResourceBundle(resource_doc, compose_datum, compose_datum_page)
event_model.compose_datum(*, resource, counter, datum_kwargs, validate=True)[source]
event_model.compose_datum_page(*, resource, counter, datum_kwargs, validate=True)[source]
event_model.compose_stop(*, start, event_counter, poison_pill, exit_status='success', reason='', uid=None, time=None, validate=True)[source]

Document Munging

event_model.pack_event_page(*events)[source]

Transform one or more Event documents into an EventPage document.

Parameters
*eventdicts

any number of Event documents

Returns
event_pagedict
event_model.unpack_event_page(event_page)[source]

Transform an EventPage document into individual Event documents.

Parameters
event_pagedict
Yields
eventdict
event_model.pack_datum_page(*datum)[source]

Transform one or more Datum documents into a DatumPage document.

Parameters
*datumdicts

any number of Datum documents

Returns
datum_pagedict
event_model.unpack_datum_page(datum_page)[source]

Transform a DatumPage document into individual Datum documents.

Parameters
datum_pagedict
Yields
datumdict
event_model.sanitize_doc(doc)[source]

Return a copy with any numpy objects converted to built-in Python types.

This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder.

Parameters
docdict

The event-model document to be sanitized

Returns
sanitized_docevent-model document

The event-model document with numpy objects converted to built-in Python types.

event_model.verify_filled(event_page)[source]

Take an event_page document and verify that it is completely filled.

Parameters
event_pageevent_page document

The event page document to check

Raises
UnfilledData

Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message.

class event_model.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

A json.JSONEncoder for encoding numpy objects using built-in Python types.

Examples

Encode a Python object that includes an arbitrarily-nested numpy object.

>>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder)
default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)