Source code for intake_bluesky.mongo_normalized

import collections.abc
import event_model
from functools import partial
import intake
import intake.catalog
import intake.catalog.local
import intake.source.base
import pymongo
import pymongo.errors

from .core import parse_handler_registry
from .core import to_event_pages
from .core import to_datum_pages


class _Entries(collections.abc.Mapping):
    "Mock the dict interface around a MongoDB query result."
    def __init__(self, catalog):
        self.catalog = catalog

    def _doc_to_entry(self, run_start_doc):
        uid = run_start_doc['uid']
        run_start_doc.pop('_id')
        entry_metadata = {'start': run_start_doc,
                          'stop': self.catalog._get_run_stop(uid)}

        def get_run_start():
            return run_start_doc

        args = dict(
            get_run_start=get_run_start,
            get_run_stop=partial(self.catalog._get_run_stop, uid),
            get_event_descriptors=partial(self.catalog._get_event_descriptors, uid),
            # 2500 was selected as the page_size because it worked well durring
            # benchmarks, for HXN data a full page had roughly 3500 events.
            get_event_pages=to_event_pages(self.catalog._get_event_cursor, 2500),
            get_event_count=self.catalog._get_event_count,
            get_resource=self.catalog._get_resource,
            lookup_resource_for_datum=self.catalog._lookup_resource_for_datum,
            # 2500 was selected as the page_size because it worked well durring
            # benchmarks.
            get_datum_pages=to_datum_pages(self.catalog._get_datum_cursor, 2500),
            filler=self.catalog.filler)
        return intake.catalog.local.LocalCatalogEntry(
            name=run_start_doc['uid'],
            description={},  # TODO
            driver='intake_bluesky.core.BlueskyRun',
            direct_access='forbid',  # ???
            args=args,
            cache=None,  # ???
            parameters=[],
            metadata=entry_metadata,
            catalog_dir=None,
            getenv=True,
            getshell=True,
            catalog=self.catalog)

    def __iter__(self):
        cursor = self.catalog._run_start_collection.find(
            self.catalog._query, sort=[('time', pymongo.DESCENDING)])
        for run_start_doc in cursor:
            yield run_start_doc['uid']

    def __getitem__(self, name):
        # If this came from a client, we might be getting '-1'.
        collection = self.catalog._run_start_collection
        try:
            N = int(name)
        except ValueError:
            query = {'$and': [self.catalog._query, {'uid': name}]}
            run_start_doc = collection.find_one(query)
            if run_start_doc is None:
                regex_query = {
                    '$and': [self.catalog._query,
                             {'uid': {'$regex': f'{name}.*'}}]}
                matches = list(collection.find(regex_query).limit(10))
                if not matches:
                    raise KeyError(name)
                elif len(matches) == 1:
                    run_start_doc, = matches
                else:
                    match_list = '\n'.join(doc['uid'] for doc in matches)
                    raise ValueError(
                        f"Multiple matches to partial uid {name!r}. "
                        f"Up to 10 listed here:\n"
                        f"{match_list}")
        else:
            if N < 0:
                # Interpret negative N as "the Nth from last entry".
                query = self.catalog._query
                cursor = (collection.find(query)
                          .sort('time', pymongo.DESCENDING)
                          .skip(-N - 1)
                          .limit(1))
                try:
                    run_start_doc, = cursor
                except ValueError:
                    raise IndexError(
                        f"Catalog only contains {len(self.catalog)} "
                        f"runs.")
            else:
                # Interpret positive N as
                # "most recent entry with scan_id == N".
                query = {'$and': [self.catalog._query, {'scan_id': N}]}
                cursor = (collection.find(query)
                          .sort('time', pymongo.DESCENDING)
                          .limit(1))
                try:
                    run_start_doc, = cursor
                except ValueError:
                    raise KeyError(f"No run with scan_id={N}")
        if run_start_doc is None:
            raise KeyError(name)
        return self._doc_to_entry(run_start_doc)

    def __contains__(self, key):
        # Avoid iterating through all entries.
        try:
            self[key]
        except KeyError:
            return False
        else:
            return True

    def __len__(self):
        return len(self.catalog)


[docs]class BlueskyMongoCatalog(intake.catalog.Catalog): def __init__(self, metadatastore_db, asset_registry_db, *, handler_registry=None, query=None, **kwargs): """ This Catalog is backed by a pair of MongoDBs with "layout 1". This layout uses a separate Mongo collection per document type and a separate Mongo document for each logical document. Parameters ---------- metadatastore_db : pymongo.database.Database or string Must be a Database or a URI string that includes a database name. asset_registry_db : pymongo.database.Database or string Must be a Database or a URI string that includes a database name. handler_registry : dict, optional Maps each asset spec to a handler class or a string specifying the module name and class name, as in (for example) ``{'SOME_SPEC': 'module.submodule.class_name'}``. query : dict, optional MongoDB query. Used internally by the ``search()`` method. **kwargs : Additional keyword arguments are passed through to the base class, Catalog. """ name = 'bluesky-mongo-catalog' # noqa if isinstance(metadatastore_db, str): mds_db = _get_database(metadatastore_db) else: mds_db = metadatastore_db if isinstance(asset_registry_db, str): assets_db = _get_database(asset_registry_db) else: assets_db = asset_registry_db self._run_start_collection = mds_db.get_collection('run_start') self._run_stop_collection = mds_db.get_collection('run_stop') self._event_descriptor_collection = mds_db.get_collection('event_descriptor') self._event_collection = mds_db.get_collection('event') self._resource_collection = assets_db.get_collection('resource') self._datum_collection = assets_db.get_collection('datum') self._metadatastore_db = mds_db self._asset_registry_db = assets_db self._query = query or {} if handler_registry is None: handler_registry = {} parsed_handler_registry = parse_handler_registry(handler_registry) self.filler = event_model.Filler(parsed_handler_registry, inplace=True) super().__init__(**kwargs) def _get_run_stop(self, run_start_uid): doc = self._run_stop_collection.find_one( {'run_start': run_start_uid}) # It is acceptable to return None if the document does not exist. if doc is not None: doc.pop('_id') return doc def _get_event_descriptors(self, run_start_uid): results = [] cursor = self._event_descriptor_collection.find( {'run_start': run_start_uid}, sort=[('time', pymongo.ASCENDING)]) for doc in cursor: doc.pop('_id') results.append(doc) return results def _get_event_cursor(self, descriptor_uid, skip=0, limit=None): cursor = (self._event_collection .find({'descriptor': descriptor_uid}, sort=[('time', pymongo.ASCENDING)])) descriptor = self._event_descriptor_collection.find_one( {'uid': descriptor_uid}) cursor.skip(skip) if limit is not None: cursor = cursor.limit(limit) external_keys = {k for k, v in descriptor['data_keys'].items() if 'external' in v} for doc in cursor: doc['filled'] = {k: False for k in external_keys} doc.pop('_id') yield doc def _get_event_count(self, descriptor_uid): return self._event_collection.count_documents( {'descriptor': descriptor_uid}) def _get_resource(self, uid): doc = self._resource_collection.find_one( {'uid': uid}) if doc is None: raise ValueError(f"Could not find Resource with uid={uid}") doc.pop('_id') return doc def _lookup_resource_for_datum(self, datum_id): doc = self._datum_collection.find_one( {'datum_id': datum_id}) if doc is None: raise ValueError(f"Could not find Datum with datum_id={datum_id}") return doc['resource'] def _get_datum_cursor(self, resource_uid): cursor = self._datum_collection.find({'resource': resource_uid}) for doc in cursor: doc.pop('_id') yield doc self._schema = {} # TODO This is cheating, I think. def _make_entries_container(self): return _Entries(self) def __len__(self): return self._run_start_collection.count_documents(self._query) def _close(self): self._client.close()
[docs] def search(self, query): """ Return a new Catalog with a subset of the entries in this Catalog. Parameters ---------- query : dict MongoDB query. """ if self._query: query = {'$and': [self._query, query]} cat = type(self)( metadatastore_db=self._metadatastore_db, asset_registry_db=self._asset_registry_db, query=query, handler_registry=self.filler.handler_registry, name='search results', getenv=self.getenv, getshell=self.getshell, auth=self.auth, metadata=(self.metadata or {}).copy(), storage_options=self.storage_options) return cat
def _get_database(uri): client = pymongo.MongoClient(uri) try: # Called with no args, get_database() returns the database # specified in the client's uri --- or raises if there was none. # There is no public method for checking this in advance, so we # just catch the error. return client.get_database() except pymongo.errors.ConfigurationError as err: raise ValueError( f"Invalid client: {client} " f"Did you forget to include a database?") from err intake.registry['mongo_metadatastore'] = BlueskyMongoCatalog