API Documentation¶
Schemas and Names¶
The event-model
Python package contains tooling for composing, validating,
and transforming documents in the model.
-
class
event_model.
DocumentNames
(value)[source]¶ An enumeration.
-
bulk_datum
= 'bulk_datum'¶
-
bulk_events
= 'bulk_events'¶
-
datum
= 'datum'¶
-
datum_page
= 'datum_page'¶
-
descriptor
= 'descriptor'¶
-
event
= 'event'¶
-
event_page
= 'event_page'¶
-
resource
= 'resource'¶
-
start
= 'start'¶
-
stop
= 'stop'¶
-
There are two dictionaries, event_model.schemas
and
event_model.schema_validators
, which are keyed on the members of the
event_model.DocumentNames
enum and which are mapped, respectively, to
a schema and an associated jsonschema.IValidator
.
Routers¶
-
class
event_model.
RunRouter
(factories, handler_registry=None, *, root_map=None, filler_class=<class 'event_model.Filler'>, fill_or_fail=False)[source]¶ Routes documents, by run, to callbacks it creates from factory functions.
A RunRouter is callable, and it has the signature
router(name, doc)
, suitable for subscribing to the RunEngine.It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below.
Warning
This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed.
- Parameters
- factorieslist
A list of callables with the signature:
factory('start', start_doc) -> List[Callbacks], List[SubFactories]
which should return two lists, which may be empty. All items in the first list should be callbacks — callables with the signature:
callback(name, doc)
that will receive that RunStart document and all subsequent documents from the run including the RunStop document. All items in the second list should be “subfactories” with the signature:
subfactory('descriptor', descriptor_doc) -> List[Callbacks]
These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive the RunStart document, that EventDescriptor, all Events that reference that EventDescriptor and finally the RunStop document for the run.
- handler_registrydict, optional
This is passed to the Filler or whatever class is given in the filler_class parametr below.
Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.
A ‘handler class’ may be any callable with the signature:
handler_class(full_path, **resource_kwargs)
It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:
handler_instance(**datum_kwargs)
As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements
__init__
and__call__
, with the respective signatures. But in general it may be any callable-that-returns-a-callable.- root_map: dict, optional
This is passed to Filler or whatever class is given in the filler_class parameter below.
str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a
root
inroot_map
will be loaded using the mappedroot
.- filler_class: type
This is Filler by default. It can be a Filler subclass,
functools.partial(Filler, ...)
, or any class that provides the same methods asDocumentRouter
.- fill_or_fail: boolean, optional
By default (False), if a document with a spec not in
handler_registry
is encountered, let it pass through unfilled. But if set to True, fill everything and raise ``UndefinedAssetSpecification` if some unknown spec is encountered.
-
class
event_model.
SingleRunDocumentRouter
[source]¶ A DocumentRouter intended to process events from exactly one run.
-
get_descriptor
(doc)[source]¶ Convenience method returning the descriptor associated with the specified document.
- Parameters
- docdict
event-model document
- Returns
- descriptor documentdict
-
-
class
event_model.
DocumentRouter
(*, emit=None)[source]¶ Route each document by type to a corresponding method.
When an instance is called with a document type and a document like:
router(name, doc)
the document is passed to the method of the corresponding name, as in:
getattr(router, name)(doc)
The method is expected to return
None
or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.Finally, the call to
router(name, doc)
returns:(name, getattr(router, name)(doc))
- Parameters
- emit: callable, optional
Expected signature
f(name, doc)
-
class
event_model.
Filler
(handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024))[source]¶ Pass documents through, loading any externally-referenced data.
It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected.
But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished.
See Examples below.
- Parameters
- handler_registrydict
Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.
A ‘handler class’ may be any callable with the signature:
handler_class(full_path, **resource_kwargs)
It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:
handler_instance(**datum_kwargs)
As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements
__init__
and__call__
, with the respective signatures. But in general it may be any callable-that-returns-a-callable.- includeIterable
The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the
exclude
parameter.- excludeIterable
The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the
include
parameter.- root_map: dict
str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a
root
inroot_map
will be loaded using the mappedroot
.- coerce{‘as_is’, ‘numpy’}
Default is ‘as_is’. Other options (e.g. ‘delayed’) may be registered by external packages at runtime.
- handler_cachedict, optional
A cache of handler instances. If None, a dict is used.
- resource_cachedict, optional
A cache of Resource documents. If None, a dict is used.
- datum_cachedict, optional
A cache of Datum documents. If None, a dict is used.
- descriptor_cachedict, optional
A cache of EventDescriptor documents. If None, a dict is used.
- retry_intervalsIterable, optional
If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an
IOError
it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set toNone
to try only once before raisingDataNotAccessible
. A subclass may catch this exception and implement a different retry mechanism — for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it.
- Raises
- DataNotAccessible
If an IOError is raised when loading the data after the configured number of attempts. See the
retry_intervals
parameter for details.
Examples
A Filler may be used as a context manager.
>>> with Filler(handler_registry) as filler: ... for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc.
Or as a long-lived object.
>>> f = Filler(handler_registry) >>> for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc. ... >>> del filler # Free up memory from potentially large caches.
-
clear_handler_cache
()[source]¶ Clear any cached handler instances.
This operation may free significant memory, depending on the implementation of the handlers.
-
clone
(handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=None)[source]¶ Create a new Filler instance from this one.
By default it will be created with the same settings that this Filler has. Individual settings may be overridden here.
The clone does not share any caches or internal state with the original.
-
close
()[source]¶ Drop cached documents and handlers.
They are not explicitly cleared, so if there are other references to these caches they will remain.
-
deregister_handler
(spec)[source]¶ Deregister a handler.
If no handler is registered for this spec, it is no-op and returns None.
- Parameters
- spec: str
- Returns
- handler: Handler or None
- See https://blueskyproject.io/event-model/external.html
-
get_handler
(resource)[source]¶ Return a new Handler instance for this Resource.
- Parameters
- resource: dict
- Returns
- handler: Handler
-
register_handler
(spec, handler, overwrite=False)[source]¶ Register a handler.
- Parameters
- spec: str
- handler: Handler
- overwrite: boolean, optional
False by default
- Raises
- DuplicateHandler
If a handler is already registered for spec and overwrite is False
- See https://blueskyproject.io/event-model/external.html
-
class
event_model.
NoFiller
(*args, **kwargs)[source]¶ This does not fill the documents; it merely validates them.
It checks that all the references between the documents are resolvable and could be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs.
-
event_model.
register_coercion
(name, func, overwrite=False)[source]¶ Register a new option for
Filler
’scoerce
argument.This is an advanced feature. See source code for comments and examples.
- Parameters
- namestring
The new value for
coerce
that will invoke this function.- funccallable
Expected signature:
func(filler, handler_class) -> handler_class
- overwriteboolean, optional
False by default. Name collissions will raise
EventModelValueError
unless this is set toTrue
.
Document Minting¶
To use these functions start with compose_run()
which will
return a ComposeRunBundle
.
-
event_model.
compose_run
(*, uid=None, time=None, metadata=None, validate=True)[source]¶ Compose a RunStart document and factory functions for related documents.
- Parameters
- uidstring, optional
Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated.
- timefloat, optional
UNIX epoch time of start of this run. If None is given, the current time will be used.
- metadatadict, optional
Additional metadata include the document
- validateboolean, optional
Validate this document conforms to the schema.
- Returns
- ComposeRunBundle
-
class
event_model.
ComposeRunBundle
(start_doc, compose_descriptor, compose_resource, compose_stop)¶
-
event_model.
compose_descriptor
(*, start, streams, event_counter, name, data_keys, uid=None, time=None, object_keys=None, configuration=None, hints=None, validate=True)[source]¶
-
class
event_model.
ComposeDescriptorBundle
(descriptor_doc, compose_event, compose_event_page)¶
-
event_model.
compose_event
(*, descriptor, event_counter, data, timestamps, seq_num=None, filled=None, uid=None, time=None, validate=True)[source]¶
-
event_model.
compose_event_page
(*, descriptor, event_counter, data, timestamps, seq_num, filled=None, uid=None, time=None, validate=True)[source]¶
-
event_model.
compose_resource
(*, start, spec, root, resource_path, resource_kwargs, path_semantics='posix', uid=None, validate=True)[source]¶
-
class
event_model.
ComposeResourceBundle
(resource_doc, compose_datum, compose_datum_page)¶
Document Munging¶
-
event_model.
pack_event_page
(*events)[source]¶ Transform one or more Event documents into an EventPage document.
- Parameters
- *eventdicts
any number of Event documents
- Returns
- event_pagedict
-
event_model.
unpack_event_page
(event_page)[source]¶ Transform an EventPage document into individual Event documents.
- Parameters
- event_pagedict
- Yields
- eventdict
-
event_model.
pack_datum_page
(*datum)[source]¶ Transform one or more Datum documents into a DatumPage document.
- Parameters
- *datumdicts
any number of Datum documents
- Returns
- datum_pagedict
-
event_model.
unpack_datum_page
(datum_page)[source]¶ Transform a DatumPage document into individual Datum documents.
- Parameters
- datum_pagedict
- Yields
- datumdict
-
event_model.
sanitize_doc
(doc)[source]¶ Return a copy with any numpy objects converted to built-in Python types.
This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder.
- Parameters
- docdict
The event-model document to be sanitized
- Returns
- sanitized_docevent-model document
The event-model document with numpy objects converted to built-in Python types.
-
event_model.
verify_filled
(event_page)[source]¶ Take an event_page document and verify that it is completely filled.
- Parameters
- event_pageevent_page document
The event page document to check
- Raises
- UnfilledData
Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message.
-
class
event_model.
NumpyEncoder
(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶ A json.JSONEncoder for encoding numpy objects using built-in Python types.
Examples
Encode a Python object that includes an arbitrarily-nested numpy object.
>>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder)
-
default
(obj)[source]¶ Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
-