import pymongo
from .objects import TYPES_TO_COLLECTION_NAMES, Container, Sample
[docs]class Client:
"""
This connects to several MongoDB collections for sample management.
For each collection, we have a traitlets-based object to represent
documents from that collection and automatically sync any changes back to
the database.
Each collection has a counterpart named {collection_name}_revisions that
stores previous version of the document. This approach was inspired by:
https://www.mongodb.com/blog/post/building-with-patterns-the-document-versioning-pattern
"""
def __init__(self, database):
"""
Connect to a MongoDB datbase.
Parameters
----------
database: pymongo.Database or URI string
"""
if database is None:
raise ValueError("Database should be URI or pymongo-like object.")
if isinstance(database, str):
database = _get_database(database)
self._db = database
self._samples = CollectionAccessor(self, Sample)
self._containers = CollectionAccessor(self, Container)
@property
def samples(self):
"""
Accessor for creating and searching Samples
"""
return self._samples
@property
def containers(self):
"""
Accessor for creating and searching Containers
"""
return self._containers
def _new_document(self, obj_type, args, kwargs):
"""
Insert a new document with a new uuid.
"""
# Make a new object (e.g. Sample)
obj = obj_type(self, *args, **kwargs)
# Find the assocaited MongoDB collection.
collection_name = TYPES_TO_COLLECTION_NAMES[obj_type]
collection = self._db[collection_name]
# Insert the new object.
collection.insert_one(obj.to_dict())
# Observe any updates to the object and sync them to MongoDB.
obj.observe(self._update)
return obj
def _update(self, change):
"""
Sync a change to an object, observed via traitlets, to MongoDB.
"""
# The 'revision' trait is a read-only trait, so if it is being changed
# it is being changed by us, and we don't need to process it.
# Short-circuit here to avoid an infinite recursion.
if change['name'] == 'revision':
return
owner = change['owner']
collection_name = TYPES_TO_COLLECTION_NAMES[type(owner)]
collection = self._db[collection_name]
revisions = self._db[f'{collection_name}_revisions']
# We need the JSON-safe value of the change, so do this instead of
# change['new'].
new = change['owner'].to_dict()[change['name']]
filter = {'uuid': owner.uuid}
update = {'$set': {change['name']: new},
'$inc': {'revision': 1}}
# TODO Use transactions for this once we have MongoDB 4.0+.
# Increment the revision number.
owner.set_trait('revision', owner.revision + 1)
# Update the document in {collection_name}.
original = collection.find_one_and_update(filter, update)
# Remove the internal MongoDB id.
original.pop('_id')
# Insert the old version in {collection_name}_revisions
revisions = revisions.insert_one(original)
def _purge(self, obj_type, uuid):
collection_name = TYPES_TO_COLLECTION_NAMES[obj_type]
collection = self._db[collection_name]
revisions = self._db[f'{collection_name}_revisions']
filter = {'uuid': uuid}
original = collection.find_one(filter)
# Remove the internal MongoDB id.
_id = original.pop('_id')
revisions = revisions.insert(original)
# Restore the internal MongoDB id.
original['_id'] = _id
collection.delete_one(original)
def _document_to_obj(self, obj_type, document):
"""
Convert a dict returned by pymongo to our traitlets-based object.
"""
# Handle the read_only traits separately.
uuid = document.pop('uuid')
revision = document.pop('revision')
obj = obj_type(self, **document)
obj.set_trait('uuid', uuid)
obj.set_trait('revision', revision)
# Observe any updates to the object and sync them to MongoDB.
obj.observe(self._update)
return obj
def _revisions(self, obj):
"""
Access all revisions to an object with the most recent first.
"""
revisions = self._db[f'{TYPES_TO_COLLECTION_NAMES[type(obj)]}_revisions']
type_ = type(obj)
for document in (revisions.find({'uuid': obj.uuid})
.sort('revision', pymongo.DESCENDING)):
document.pop('_id') # Remove the internal MongoDB id.
yield type_.from_document(self, document)
class CollectionAccessor:
"""
Accessor used on Clients
"""
def __init__(self, client, obj_type):
self._client = client
self._obj_type = obj_type
self._collection = client._db[TYPES_TO_COLLECTION_NAMES[self._obj_type]]
def new(self, *args, **kwargs):
return self._client._new_document(self._obj_type, args, kwargs)
def find(self, filter):
if filter is None:
filter = {}
for document in self._collection.find(filter):
document.pop('_id') # Remove the internal MongoDB id.
yield self._obj_type.from_document(self._client, document)
def find_one(self, filter):
document = self._collection.find_one(filter)
if document is None:
return None
document.pop('_id') # Remove the internal MongoDB id.
return self._obj_type.from_document(self._client, document)
def _get_database(uri):
client = pymongo.MongoClient(uri)
try:
# Called with no args, get_database() returns the database
# specified in the client's uri --- or raises if there was none.
# There is no public method for checking this in advance, so we
# just catch the error.
return client.get_database()
except pymongo.errors.ConfigurationError as err:
raise ValueError(
f"Invalid client: {client} "
f"Did you forget to include a database?") from err