Source code for intake_bluesky.core

import collections
import copy
import event_model
from datetime import datetime
import dask
import dask.bag
from dask import array
import functools
import heapq
import importlib
import itertools
import intake.catalog.base
import errno
import intake.container.base
import intake_xarray.base
from intake.compat import unpack_kwargs
import msgpack
import requests
from requests.compat import urljoin
import numpy
import os
import warnings
import xarray


def tail(filename, n=1, bsize=2048):
    """
    Returns a generator with the last n lines of a file.

    Thanks to Martijn Pieters for this solution:
    https://stackoverflow.com/a/12295054/6513183

    Parameters
    ----------
    filename : string
    n: int
        number of lines
    bsize: int
        seek step size
    Returns
    -------
    line : generator
    """

    # get newlines type, open in universal mode to find it
    with open(filename, 'r', newline=None) as hfile:
        if not hfile.readline():
            return  # empty, no point
        sep = hfile.newlines  # After reading a line, python gives us this
    assert isinstance(sep, str), 'multiple newline types found, aborting'

    # find a suitable seek position in binary mode
    with open(filename, 'rb') as hfile:
        hfile.seek(0, os.SEEK_END)
        linecount = 0
        pos = 0

        while linecount <= n + 1:
            # read at least n lines + 1 more; we need to skip a partial line later on
            try:
                hfile.seek(-bsize, os.SEEK_CUR)           # go backwards
                linecount += hfile.read(bsize).count(sep.encode())  # count newlines
                hfile.seek(-bsize, os.SEEK_CUR)           # go back again
            except IOError as e:
                if e.errno == errno.EINVAL:
                    # Attempted to seek past the start, can't go further
                    bsize = hfile.tell()
                    hfile.seek(0, os.SEEK_SET)
                    pos = 0
                    linecount += hfile.read(bsize).count(sep.encode())
                    break
                raise  # Some other I/O exception, re-raise
            pos = hfile.tell()

    # Re-open in text mode
    with open(filename, 'r') as hfile:
        hfile.seek(pos, os.SEEK_SET)  # our file position from above
        for line in hfile:
            # We've located n lines *or more*, so skip if needed
            if linecount > n:
                linecount -= 1
                continue
            # The rest we yield
            yield line.rstrip()


def to_event_pages(get_event_cursor, page_size):
    """
    Decorator that changes get_event_cursor to get_event_pages.

    get_event_cursor yields events, get_event_pages yields event_pages.

    Parameters
    ----------
    get_event_cursor : function

    Returns
    -------
    get_event_pages : function
    """
    @functools.wraps(get_event_cursor)
    def get_event_pages(*args, **kwargs):
        event_cursor = get_event_cursor(*args, **kwargs)
        while True:
            result = list(itertools.islice(event_cursor, page_size))
            if result:
                yield event_model.pack_event_page(*result)
            else:
                break
    return get_event_pages


def to_datum_pages(get_datum_cursor, page_size):
    """
    Decorator that changes get_datum_cursor to get_datum_pages.

    get_datum_cursor yields datum, get_datum_pages yields datum_pages.

    Parameters
    ----------
    get_datum_cursor : function

    Returns
    -------
    get_datum_pages : function
    """
    @functools.wraps(get_datum_cursor)
    def get_datum_pages(*args, **kwargs):
        datum_cursor = get_datum_cursor(*args, **kwargs)
        while True:
            result = list(itertools.islice(datum_cursor, page_size))
            if result:
                yield event_model.pack_datum_page(*result)
            else:
                break
    return get_datum_pages


def flatten_event_page_gen(gen):
    """
    Converts an event_page generator to an event generator.

    Parameters
    ----------
    gen : generator

    Returns
    -------
    event_generator : generator
    """
    for page in gen:
        yield from event_model.unpack_event_page(page)


def interlace_event_pages(*gens):
    """
    Take event_page generators and interlace their results by timestamp.
    This is a modification of https://github.com/bluesky/databroker/pull/378/

    Parameters
    ----------
    gens : generators
        Generators of (name, dict) pairs where the dict contains a 'time' key.
    Yields
    ------
    val : tuple
        The next (name, dict) pair in time order

    """
    iters = [iter(flatten_event_page_gen(g)) for g in gens]
    heap = []

    def safe_next(indx):
        try:
            val = next(iters[indx])
        except StopIteration:
            return
        heapq.heappush(heap, (val['time'], indx, val))
    for i in range(len(iters)):
        safe_next(i)
    while heap:
        _, indx, val = heapq.heappop(heap)
        yield val
        safe_next(indx)


def interlace_event_page_chunks(*gens, chunk_size):
    """
    Take event_page generators and interlace their results by timestamp.

    This is a modification of https://github.com/bluesky/databroker/pull/378/

    Parameters
    ----------
    gens : generators
        Generators of (name, dict) pairs where the dict contains a 'time' key.
    chunk_size : integer
        Size of pages to yield
    Yields
    ------
    val : tuple
        The next (name, dict) pair in time order

    """
    iters = [iter(event_model.rechunk_event_pages(g, chunk_size)) for g in gens]
    heap = []

    def safe_next(indx):
        try:
            val = next(iters[indx])
        except StopIteration:
            return
        heapq.heappush(heap, (val['time'][0], indx, val))
    for i in range(len(iters)):
        safe_next(i)
    while heap:
        _, indx, val = heapq.heappop(heap)
        yield val


[docs]def documents_to_xarray(*, start_doc, stop_doc, descriptor_docs, get_event_pages, filler, get_resource, lookup_resource_for_datum, get_datum_pages, include=None, exclude=None): """ Represent the data in one Event stream as an xarray. Parameters ---------- start_doc: dict RunStart Document stop_doc : dict RunStop Document descriptor_docs : list EventDescriptor Documents filler : event_model.Filler get_resource : callable Expected signature ``get_resource(resource_uid) -> Resource`` lookup_resource_for_datum : callable Expected signature ``lookup_resource_for_datum(datum_id) -> resource_uid`` get_datum_pages : callable Expected signature ``get_datum_pages(resource_uid) -> generator`` where ``generator`` yields datum_page documents get_event_pages : callable Expected signature ``get_event_pages(descriptor_uid) -> generator`` where ``generator`` yields event_page documents include : list, optional Fields ('data keys') to include. By default all are included. This parameter is mutually exclusive with ``exclude``. exclude : list, optional Fields ('data keys') to exclude. By default none are excluded. This parameter is mutually exclusive with ``include``. Returns ------- dataset : xarray.Dataset """ if include is None: include = [] if exclude is None: exclude = [] if include and exclude: raise ValueError( "The parameters `include` and `exclude` are mutually exclusive.") # Data keys must not change within one stream, so we can safely sample # just the first Event Descriptor. if descriptor_docs: data_keys = descriptor_docs[0]['data_keys'] if include: keys = list(set(data_keys) & set(include)) elif exclude: keys = list(set(data_keys) - set(exclude)) else: keys = list(data_keys) # Collect a Dataset for each descriptor. Merge at the end. datasets = [] for descriptor in descriptor_docs: events = list(flatten_event_page_gen(get_event_pages(descriptor['uid']))) if not events: continue if any(data_keys[key].get('external') for key in keys): filler('descriptor', descriptor) for event in events: try: filler('event', event) except event_model.UnresolvableForeignKeyError as err: datum_id = err.key resource_uid = lookup_resource_for_datum(datum_id) resource = get_resource(resource_uid) filler('resource', resource) # Pre-fetch all datum for this resource. for datum_page in get_datum_pages(resource_uid): filler('datum_page', datum_page) # TODO -- When to clear the datum cache in filler? filler('event', event) times = [ev['time'] for ev in events] seq_nums = [ev['seq_num'] for ev in events] uids = [ev['uid'] for ev in events] data_table = _transpose(events, keys, 'data') # external_keys = [k for k in data_keys if 'external' in data_keys[k]] # Collect a DataArray for each field in Event, each field in # configuration, and 'seq_num'. The Event 'time' will be the # default coordinate. data_arrays = {} # Make DataArrays for Event data. for key in keys: field_metadata = data_keys[key] # Verify the actual ndim by looking at the data. ndim = numpy.asarray(data_table[key][0]).ndim dims = None if 'dims' in field_metadata: # As of this writing no Devices report dimension names ('dims') # but they could in the future. reported_ndim = len(field_metadata['dims']) if reported_ndim == ndim: dims = tuple(field_metadata['dims']) else: # TODO Warn ... if dims is None: # Construct the same default dimension names xarray would. dims = tuple(f'dim_{i}' for i in range(ndim)) data_arrays[key] = xarray.DataArray( data=data_table[key], dims=('time',) + dims, coords={'time': times}, name=key) # Make DataArrays for configuration data. for object_name, config in descriptor.get('configuration', {}).items(): data_keys = config['data_keys'] # For configuration, label the dimension specially to # avoid key collisions. scoped_data_keys = {key: f'{object_name}:{key}' for key in data_keys} if include: keys = {k: v for k, v in scoped_data_keys.items() if v in include} elif exclude: keys = {k: v for k, v in scoped_data_keys.items() if v not in include} else: keys = scoped_data_keys for key, scoped_key in keys.items(): field_metadata = data_keys[key] # Verify the actual ndim by looking at the data. ndim = numpy.asarray(config['data'][key]).ndim dims = None if 'dims' in field_metadata: # As of this writing no Devices report dimension names ('dims') # but they could in the future. reported_ndim = len(field_metadata['dims']) if reported_ndim == ndim: dims = tuple(field_metadata['dims']) else: # TODO Warn ... if dims is None: # Construct the same default dimension names xarray would. dims = tuple(f'dim_{i}' for i in range(ndim)) data_arrays[scoped_key] = xarray.DataArray( # TODO Once we know we have one Event Descriptor # per stream we can be more efficient about this. data=numpy.tile(config['data'][key], (len(times),) + ndim * (1,)), dims=('time',) + dims, coords={'time': times}, name=key) # Finally, make DataArrays for 'seq_num' and 'uid'. data_arrays['seq_num'] = xarray.DataArray( data=seq_nums, dims=('time',), coords={'time': times}, name='seq_num') data_arrays['uid'] = xarray.DataArray( data=uids, dims=('time',), coords={'time': times}, name='uid') datasets.append(xarray.Dataset(data_vars=data_arrays)) # Merge Datasets from all Event Descriptors into one representing the # whole stream. (In the future we may simplify to one Event Descriptor # per stream, but as of this writing we must account for the # possibility of multiple.) return xarray.merge(datasets)
[docs]class RemoteBlueskyRun(intake.catalog.base.RemoteCatalog): """ Catalog representing one Run. This is a client-side proxy to a BlueskyRun stored on a remote server. Parameters ---------- url: str Address of the server headers: dict HTTP headers to sue in calls name: str handle to reference this data parameters: dict To pass to the server when it instantiates the data source metadata: dict Additional info kwargs: ignored """ name = 'bluesky-run' def __init__(self, url, http_args, name, parameters, metadata=None, **kwargs): super().__init__(url=url, http_args=http_args, name=name, metadata=metadata) self.url = url self.name = name self.parameters = parameters self.http_args = http_args self._source_id = None self.metadata = metadata or {} self._get_source_id() self.bag = None def _get_source_id(self): if self._source_id is None: payload = dict(action='open', name=self.name, parameters=self.parameters) req = requests.post(urljoin(self.url, '/v1/source'), data=msgpack.packb(payload, use_bin_type=True), **self.http_args) req.raise_for_status() response = msgpack.unpackb(req.content, **unpack_kwargs) self._parse_open_response(response) def _parse_open_response(self, response): self.npartitions = response['npartitions'] self.metadata = response['metadata'] self._schema = intake.source.base.Schema( datashape=None, dtype=None, shape=self.shape, npartitions=self.npartitions, metadata=self.metadata) self._source_id = response['source_id'] def _load_metadata(self): if self.bag is None: self.raw_parts = [dask.delayed(intake.container.base.get_partition)( self.url, self.http_args, self._source_id, self.container, (i, True) ) for i in range(self.npartitions)] self.parts = [dask.delayed(intake.container.base.get_partition)( self.url, self.http_args, self._source_id, self.container, (i, False) ) for i in range(self.npartitions)] self.bag = dask.bag.from_delayed(self.parts) return self._schema def _get_partition(self, index): self._load_metadata() i, raw = index if raw: parts = self.raw_parts else: parts = self.parts return parts[i].compute()
[docs] def read(self): raise NotImplementedError( "Reading the BlueskyRun itself is not supported. Instead read one " "its entries, representing individual Event Streams.")
[docs] def to_dask(self): raise NotImplementedError( "Reading the BlueskyRun itself is not supported. Instead read one " "its entries, representing individual Event Streams.")
def _close(self): self.bag = None def canonical(self): for i in range(self.npartitions): for name, doc in self._get_partition((i, False)): yield name, doc def read_canonical(self): warnings.warn( "The method read_canonical has been renamed canonical. This alias " "may be removed in a future release.") yield from self.canonical() def canonical_unfilled(self): for i in range(self.npartitions): for name, doc in self._get_partition((i, True)): yield name, doc def __repr__(self): self._load() try: start = self.metadata['start'] stop = self.metadata['stop'] out = (f"Run Catalog\n" f" uid={start['uid']!r}\n" f" exit_status={stop.get('exit_status')!r}\n" f" {_ft(start['time'])} -- {_ft(stop.get('time', '?'))}\n" f" Streams:\n") for stream_name in self: out += f" * {stream_name}\n" except Exception as exc: out = f"<Intake catalog: Run *REPR_RENDERING_FAILURE* {exc!r}>" return out def search(self): raise NotImplementedError("Cannot search within one run.")
[docs]class BlueskyRun(intake.catalog.Catalog): """ Catalog representing one Run. Parameters ---------- get_run_start: callable Expected signature ``get_run_start() -> RunStart`` get_run_stop : callable Expected signature ``get_run_stop() -> RunStop`` get_event_descriptors : callable Expected signature ``get_event_descriptors() -> List[EventDescriptors]`` get_event_pages : callable Expected signature ``get_event_pages(descriptor_uid) -> generator`` where ``generator`` yields Event documents get_event_count : callable Expected signature ``get_event_count(descriptor_uid) -> int`` get_resource : callable Expected signature ``get_resource(resource_uid) -> Resource`` lookup_resource_for_datum : callable Expected signature ``lookup_resource_for_datum(datum_id) -> resource_uid`` get_datum_pages : callable Expected signature ``get_datum_pages(resource_uid) -> generator`` where ``generator`` yields Datum documents filler : event_model.Filler **kwargs : Additional keyword arguments are passed through to the base class, Catalog. """ container = 'bluesky-run' version = '0.0.1' partition_access = True PARTITION_SIZE = 100 def __init__(self, get_run_start, get_run_stop, get_event_descriptors, get_event_pages, get_event_count, get_resource, lookup_resource_for_datum, get_datum_pages, filler, **kwargs): # All **kwargs are passed up to base class. TODO: spell them out # explicitly. self.urlpath = '' # TODO Not sure why I had to add this. self._get_run_start = get_run_start self._get_run_stop = get_run_stop self._get_event_descriptors = get_event_descriptors self._get_event_pages = get_event_pages self._get_event_count = get_event_count self._get_resource = get_resource self._lookup_resource_for_datum = lookup_resource_for_datum self._get_datum_pages = get_datum_pages self.filler = filler super().__init__(**kwargs) def __repr__(self): try: start = self._run_start_doc stop = self._run_stop_doc or {} out = (f"Run Catalog\n" f" uid={start['uid']!r}\n" f" exit_status={stop.get('exit_status')!r}\n" f" {_ft(start['time'])} -- {_ft(stop.get('time', '?'))}\n" f" Streams:\n") for stream_name in self: out += f" * {stream_name}\n" except Exception as exc: out = f"<Intake catalog: Run *REPR_RENDERING_FAILURE* {exc!r}>" return out def _load(self): # Count the total number of documents in this run. self._run_stop_doc = self._get_run_stop() self._run_start_doc = self._get_run_start() self._descriptors = self._get_event_descriptors() self._offset = len(self._descriptors) + 1 self.metadata.update({'start': self._run_start_doc}) self.metadata.update({'stop': self._run_stop_doc}) count = 1 descriptor_uids = [doc['uid'] for doc in self._descriptors] count += len(descriptor_uids) for doc in self._descriptors: count += self._get_event_count(doc['uid']) count += (self._run_stop_doc is not None) self.npartitions = int(numpy.ceil(count / self.PARTITION_SIZE)) self._schema = intake.source.base.Schema( datashape=None, dtype=None, shape=(count,), npartitions=self.npartitions, metadata=self.metadata) # Make a BlueskyEventStream for each stream_name. for doc in self._descriptors: if 'name' not in doc: warnings.warn( f"EventDescriptor {doc['uid']!r} has no 'name', likely " f"because it was generated using an old version of " f"bluesky. The name 'primary' will be used.") descriptors_by_name = collections.defaultdict(list) for doc in self._descriptors: descriptors_by_name[doc.get('name', 'primary')].append(doc) for stream_name, descriptors in descriptors_by_name.items(): args = dict( get_run_start=self._get_run_start, stream_name=stream_name, get_run_stop=self._get_run_stop, get_event_descriptors=self._get_event_descriptors, get_event_pages=self._get_event_pages, get_event_count=self._get_event_count, get_resource=self._get_resource, lookup_resource_for_datum=self._lookup_resource_for_datum, get_datum_pages=self._get_datum_pages, filler=self.filler, metadata={'descriptors': descriptors}) self._entries[stream_name] = intake.catalog.local.LocalCatalogEntry( name=stream_name, description={}, # TODO driver='intake_bluesky.core.BlueskyEventStream', direct_access='forbid', args=args, cache=None, # ??? metadata={'descriptors': descriptors}, catalog_dir=None, getenv=True, getshell=True, catalog=self) def read_canonical(self): warnings.warn( "The method read_canonical has been renamed canonical. This alias " "may be removed in a future release.") yield from self.canonical() def canonical(self): for i in range(self.npartitions): for name, doc in self.read_partition((i, False)): yield name, doc def canonical_unfilled(self): for i in range(self.npartitions): for name, doc in self.read_partition((i, True)): yield name, doc
[docs] def read_partition_unfilled(self, i): """Fetch one chunk of documents. """ self._load() payload = [] start = i * self.PARTITION_SIZE stop = (1 + i) * self.PARTITION_SIZE if start < self._offset: payload.extend( itertools.islice( itertools.chain( (('start', self._get_run_start()),), (('descriptor', doc) for doc in self._descriptors)), start, stop)) descriptor_uids = [doc['uid'] for doc in self._descriptors] skip = max(0, start - len(payload)) limit = stop - start - len(payload) # print('start, stop, skip, limit', start, stop, skip, limit) datum_ids = set() if limit > 0: events = itertools.islice(interlace_event_pages( *(self._get_event_pages(descriptor_uid=descriptor_uid) for descriptor_uid in descriptor_uids)), skip, limit) for event in events: for key, is_filled in event['filled'].items(): if not is_filled: datum_id = event['data'][key] if datum_id not in datum_ids: if '/' in datum_id: resource_uid, _ = datum_id.split('/', 1) else: resource_uid = self._lookup_resource_for_datum(datum_id) resource = self._get_resource(uid=resource_uid) payload.append(('resource', resource)) for datum_page in self._get_datum_pages(resource_uid): # TODO Greedily cache but lazily emit. payload.append(('datum_page', datum_page)) datum_ids |= set(datum_page['datum_id']) payload.append(('event', event)) if i == self.npartitions - 1 and self._run_stop_doc is not None: payload.append(('stop', self._run_stop_doc)) for _, doc in payload: doc.pop('_id', None) return payload
[docs] def read_partition(self, index): """Fetch one chunk of documents. """ i, raw = index if raw: return self.read_partition_unfilled(i) self._load() payload = [] start = i * self.PARTITION_SIZE stop = (1 + i) * self.PARTITION_SIZE if start < self._offset: payload.extend( itertools.islice( itertools.chain( (('start', self._get_run_start()),), (('descriptor', doc) for doc in self._descriptors)), start, stop)) descriptor_uids = [doc['uid'] for doc in self._descriptors] skip = max(0, start - len(payload)) limit = stop - start - len(payload) if limit > 0: events = itertools.islice(interlace_event_pages( *(self._get_event_pages(descriptor_uid=descriptor_uid) for descriptor_uid in descriptor_uids)), skip, limit) for descriptor in self._descriptors: self.filler('descriptor', descriptor) for event in events: self._fill(event) # in place (for now) payload.append(('event', event)) if i == self.npartitions - 1 and self._run_stop_doc is not None: payload.append(('stop', self._run_stop_doc)) for _, doc in payload: doc.pop('_id', None) return payload
def _fill(self, event, last_datum_id=None): try: self.filler('event', event) except event_model.UnresolvableForeignKeyError as err: datum_id = err.key if datum_id == last_datum_id: # We tried to fetch this Datum on the last trip # trip through this method, and apparently it did not # work. We are in an infinite loop. Bail! raise if '/' in datum_id: resource_uid, _ = datum_id.split('/', 1) else: resource_uid = self._lookup_resource_for_datum(datum_id) resource = self._get_resource(uid=resource_uid) self.filler('resource', resource) # Pre-fetch all datum for this resource. for datum_page in self._get_datum_pages( resource_uid=resource_uid): self.filler('datum_page', datum_page) # TODO -- When to clear the datum cache in filler? # Re-enter and try again now that the Filler has consumed the # missing Datum. There might be another missing Datum in this same # Event document (hence this re-entrant structure) or might be good # to go. self._fill(event, last_datum_id=datum_id)
[docs] def read(self): raise NotImplementedError( "Reading the BlueskyRun itself is not supported. Instead read one " "its entries, representing individual Event Streams.")
[docs] def to_dask(self): raise NotImplementedError( "Reading the BlueskyRun itself is not supported. Instead read one " "its entries, representing individual Event Streams.")
[docs]class BlueskyEventStream(intake_xarray.base.DataSourceMixin): """ Catalog representing one Event Stream from one Run. Parameters ---------- get_run_start: callable Expected signature ``get_run_start() -> RunStart`` stream_name : string Stream name, such as 'primary'. get_run_stop : callable Expected signature ``get_run_stop() -> RunStop`` get_event_descriptors : callable Expected signature ``get_event_descriptors() -> List[EventDescriptors]`` get_event_pages : callable Expected signature ``get_event_pages(descriptor_uid) -> generator`` where ``generator`` yields event_page documents get_event_count : callable Expected signature ``get_event_count(descriptor_uid) -> int`` get_resource : callable Expected signature ``get_resource(resource_uid) -> Resource`` lookup_resource_for_datum : callable Expected signature ``lookup_resource_for_datum(datum_id) -> resource_uid`` get_datum_pages : callable Expected signature ``get_datum_pages(resource_uid) -> generator`` where ``generator`` yields datum_page documents filler : event_model.Filler metadata : dict passed through to base class include : list, optional Fields ('data keys') to include. By default all are included. This parameter is mutually exclusive with ``exclude``. exclude : list, optional Fields ('data keys') to exclude. By default none are excluded. This parameter is mutually exclusive with ``include``. **kwargs : Additional keyword arguments are passed through to the base class. """ container = 'xarray' name = 'bluesky-event-stream' version = '0.0.1' partition_access = True def __init__(self, get_run_start, stream_name, get_run_stop, get_event_descriptors, get_event_pages, get_event_count, get_resource, lookup_resource_for_datum, get_datum_pages, filler, metadata, include=None, exclude=None, **kwargs): # self._partition_size = 10 # self._default_chunks = 10 self._get_run_start = get_run_start self._stream_name = stream_name self._get_event_descriptors = get_event_descriptors self._get_run_stop = get_run_stop self._get_event_pages = get_event_pages self._get_event_count = get_event_count self._get_resource = get_resource self._lookup_resource_for_datum = lookup_resource_for_datum self._get_datum_pages = get_datum_pages self.filler = filler self.urlpath = '' # TODO Not sure why I had to add this. self._ds = None # set by _open_dataset below self.include = include self.exclude = exclude super().__init__(metadata=metadata) def __repr__(self): try: out = (f"<Intake catalog: Stream {self._stream_name!r} " f"from Run {self._get_run_start()['uid'][:8]}...>") except Exception as exc: out = f"<Intake catalog: Stream *REPR_RENDERING_FAILURE* {exc!r}>" return out def _open_dataset(self): self._run_stop_doc = self._get_run_stop() self._run_start_doc = self._get_run_start() self.metadata.update({'start': self._run_start_doc}) self.metadata.update({'stop': self._run_stop_doc}) descriptor_docs = [doc for doc in self._get_event_descriptors() if doc.get('name') == self._stream_name] self._ds = documents_to_xarray( start_doc=self._run_start_doc, stop_doc=self._run_stop_doc, descriptor_docs=descriptor_docs, get_event_pages=self._get_event_pages, filler=self.filler, get_resource=self._get_resource, lookup_resource_for_datum=self._lookup_resource_for_datum, get_datum_pages=self._get_datum_pages, include=self.include, exclude=self.exclude)
class DocumentCache(event_model.DocumentRouter): def __init__(self): self.descriptors = {} self.resources = {} self.event_pages = collections.defaultdict(list) self.datum_pages_by_resource = collections.defaultdict(list) self.resource_uid_by_datum_id = {} self.start_doc = None self.stop_doc = None def start(self, doc): self.start_doc = doc def stop(self, doc): self.stop_doc = doc def event_page(self, doc): self.event_pages[doc['descriptor']].append(doc) def datum_page(self, doc): self.datum_pages_by_resource[doc['resource']].append(doc) for datum_id in doc['datum_id']: self.resource_uid_by_datum_id[datum_id] = doc['resource'] def descriptor(self, doc): self.descriptors[doc['uid']] = doc def resource(self, doc): self.resources[doc['uid']] = doc class BlueskyRunFromGenerator(BlueskyRun): def __init__(self, gen_func, gen_args, gen_kwargs, filler=None, **kwargs): if filler is None: filler = event_model.Filler({}, inplace=True) document_cache = DocumentCache() for item in gen_func(*gen_args, **gen_kwargs): document_cache(*item) assert document_cache.start_doc is not None def get_run_start(): return document_cache.start_doc def get_run_stop(): return document_cache.stop_doc def get_event_descriptors(): return document_cache.descriptors.values() def get_event_pages(descriptor_uid, skip=0, limit=None): if skip != 0 and limit is not None: raise NotImplementedError return document_cache.event_pages[descriptor_uid] def get_event_count(descriptor_uid): return sum(len(page['seq_num']) for page in (document_cache.event_pages[descriptor_uid])) def get_resource(uid): return document_cache.resources[uid] def lookup_resource_for_datum(datum_id): return document_cache.resource_uid_by_datum_id[datum_id] def get_datum_pages(resource_uid, skip=0, limit=None): if skip != 0 and limit is not None: raise NotImplementedError return document_cache.datum_pages_by_resource[resource_uid] super().__init__( get_run_start=get_run_start, get_run_stop=get_run_stop, get_event_descriptors=get_event_descriptors, get_event_pages=get_event_pages, get_event_count=get_event_count, get_resource=get_resource, lookup_resource_for_datum=lookup_resource_for_datum, get_datum_pages=get_datum_pages, filler=filler, **kwargs) def _transpose(in_data, keys, field): """Turn a list of dicts into dict of lists Parameters ---------- in_data : list A list of dicts which contain at least one dict. All of the inner dicts must have at least the keys in `keys` keys : list The list of keys to extract field : str The field in the outer dict to use Returns ------- transpose : dict The transpose of the data """ out = {k: [None] * len(in_data) for k in keys} for j, ev in enumerate(in_data): dd = ev[field] for k in keys: out[k][j] = dd[k] return out def _ft(timestamp): "format timestamp" if isinstance(timestamp, str): return timestamp # Truncate microseconds to miliseconds. Do not bother to round. return (datetime.fromtimestamp(timestamp) .strftime('%Y-%m-%d %H:%M:%S.%f'))[:-3] def xarray_to_event_gen(data_xarr, ts_xarr, page_size): for start_idx in range(0, len(data_xarr['time']), page_size): stop_idx = start_idx + page_size data = {name: variable.values for name, variable in data_xarr.isel({'time': slice(start_idx, stop_idx)}).items() if ':' not in name} ts = {name: variable.values for name, variable in ts_xarr.isel({'time': slice(start_idx, stop_idx)}).items() if ':' not in name} event_page = {} seq_num = data.pop('seq_num') ts.pop('seq_num') uids = data.pop('uid') ts.pop('uid') event_page['data'] = data event_page['timestamps'] = ts event_page['time'] = data_xarr['time'][start_idx:stop_idx].values event_page['uid'] = uids event_page['seq_num'] = seq_num event_page['filled'] = {} yield event_page
[docs]def parse_handler_registry(handler_registry): """ Parse mapping of spec name to 'import path' into mapping to class itself. Parameters ---------- handler_registry : dict Values may be string 'import paths' to classes or actual classes. Examples -------- Pass in name; get back actual class. >>> parse_handler_registry({'my_spec': 'package.module.ClassName'}) {'my_spec': <package.module.ClassName>} """ result = {} for spec, handler_str in handler_registry.items(): if isinstance(handler_str, str): module_name, _, class_name = handler_str.rpartition('.') class_ = getattr(importlib.import_module(module_name), class_name) else: class_ = handler_str result[spec] = class_ return result
intake.registry['remote-bluesky-run'] = RemoteBlueskyRun intake.container.container_map['bluesky-run'] = RemoteBlueskyRun def concat_dataarray_pages(dataarray_pages): """ Combines a iterable of dataarray_pages to a single dataarray_page. Parameters ---------- dataarray_pages: Iterabile An iterable of event_pages with xarray.dataArrays in the data, timestamp, and filled fields. Returns ------ event_page : dict A single event_pages with xarray.dataArrays in the data, timestamp, and filled fields. """ pages = list(dataarray_pages) if len(pages) == 1: return pages[0] array_keys = ['seq_num', 'time', 'uid'] data_keys = dataarray_pages[0]['data'].keys() return {'descriptor': pages[0]['descriptor'], **{key: list(itertools.chain.from_iterable( [page[key] for page in pages])) for key in array_keys}, 'data': {key: xarray.concat([page['data'][key] for page in pages]) for key in data_keys}, 'timestamps': {key: xarray.concat([page['timestamps'][key] for page in pages]) for key in data_keys}, 'filled': {key: xarray.concat([page['filled'][key] for page in pages]) for key in data_keys}} def event_page_to_dataarray_page(event_page, dims=None, coords=None): """ Converts the event_page's data, timestamps, and filled to xarray.DataArray. Parameters ---------- event_page: dict A EventPage document dims: tuple Tuple of dimension names associated with the array coords: dict-like Dictionary-like container of coordinate arrays Returns ------ event_page : dict An event_pages with xarray.dataArrays in the data, timestamp, and filled fields. """ if coords is None: coords = {'time': event_page['time']} if dims is None: dims = ('time',) array_keys = ['seq_num', 'time', 'uid'] data_keys = event_page['data'].keys() return {'descriptor': event_page['descriptor'], **{key: event_page[key] for key in array_keys}, 'data': {key: xarray.DataArray( event_page['data'][key], dims=dims, coords=coords, name=key) for key in data_keys}, 'timestamps': {key: xarray.DataArray( event_page['timestamps'][key], dims=dims, coords=coords, name=key) for key in data_keys}, 'filled': {key: xarray.DataArray( event_page['filled'][key], dims=dims, coords=coords, name=key) for key in data_keys}} def dataarray_page_to_dataset_page(dataarray_page): """ Converts the dataarray_page's data, timestamps, and filled to xarray.DataSet. Parameters ---------- dataarray_page: dict Returns ------ dataset_page : dict """ array_keys = ['seq_num', 'time', 'uid'] return {'descriptor': dataarray_page['descriptor'], **{key: dataarray_page[key] for key in array_keys}, 'data': xarray.merge(dataarray_page['data'].values()), 'timestamps': xarray.merge(dataarray_page['timestamps'].values()), 'filled': xarray.merge(dataarray_page['filled'].values())} class DaskFiller(event_model.Filler): def __init__(self, *args, inplace=False, **kwargs): if inplace: raise NotImplementedError("DaskFiller inplace is not supported.") # The DaskFiller will not mutate documents pass in by the user, but it # will ask the base class to mutate *internal* copies in place, so we # set inplace=True here, even though the user documents will never be # modified in place. super().__init__(*args, inplace=True, **kwargs) def event_page(self, doc): @dask.delayed def delayed_fill(event_page, key): self.fill_event_page(event_page, include=key) return numpy.asarray(event_page['data'][key]) descriptor = self._descriptor_cache[doc['descriptor']] needs_filling = {key for key, val in descriptor['data_keys'].items() if 'external' in val} filled_doc = copy.deepcopy(doc) for key in needs_filling: shape = extract_shape(descriptor, key) dtype = extract_dtype(descriptor, key) filled_doc['data'][key] = array.from_delayed( delayed_fill(filled_doc, key), shape=shape, dtype=dtype) return filled_doc def event(self, doc): @dask.delayed def delayed_fill(event, key): self.fill_event(event, include=key) return numpy.asarray(event['data'][key]) descriptor = self._descriptor_cache[doc['descriptor']] needs_filling = {key for key, val in descriptor['data_keys'].items() if 'external' in val} filled_doc = copy.deepcopy(doc) for key in needs_filling: shape = extract_shape(descriptor, key) dtype = extract_dtype(descriptor, key) filled_doc['data'][key] = array.from_delayed( delayed_fill(filled_doc, key), shape=shape, dtype=dtype) return filled_doc def extract_shape(descriptor, key): """ Work around bug in https://github.com/bluesky/ophyd/pull/746 """ # Ideally this code would just be # descriptor['data_keys'][key]['shape'] # but we have to do some heuristics to make up for errors in the reporting. # Broken ophyd reports (x, y, 0). We want (num_images, y, x). data_key = descriptor['data_keys'][key] if len(data_key['shape']) == 3 and data_key['shape'][-1] == 0: object_keys = descriptor.get('object_keys', {}) for object_name, data_keys in object_keys.items(): if key in data_keys: break else: raise RuntimeError(f"Could not figure out shape of {key}") num_images = descriptor['configuration'][object_name]['data'].get('num_images', -1) x, y, _ = data_key['shape'] shape = (num_images, y, x) else: shape = descriptor['data_keys'][key]['shape'] return shape def extract_dtype(descriptor, key): """ Work around the fact that we currently report jsonschema data types. """ reported = descriptor['data_keys'][key]['dtype'] if reported == 'array': return float # guess! else: return reported