Source code for panoptes.utils.database.memory

import threading
import weakref
from contextlib import suppress
from uuid import uuid4

from panoptes.utils import error
from panoptes.utils.database import AbstractPanDB
from panoptes.utils.database.base import create_storage_obj
from panoptes.utils.serializers import from_json
from panoptes.utils.serializers import to_json


[docs]class PanMemoryDB(AbstractPanDB): """In-memory store of serialized objects. We serialize the objects in order to test the same code path used when storing in an external database. """ active_dbs = weakref.WeakValueDictionary()
[docs] @classmethod def get_or_create(cls, db_name=None, **kwargs): """Returns the named db, creating if needed. This method exists because PanDB gets called multiple times for the same database name. With mongo or a file store where the storage is external from the instance, that is not a problem, but with PanMemoryDB the instance is the store, so the instance must be shared.""" db = PanMemoryDB.active_dbs.get(db_name) if not db: db = PanMemoryDB(db_name=db_name, **kwargs) PanMemoryDB.active_dbs[db_name] = db return db
def __init__(self, **kwargs): super().__init__(**kwargs) self.current = {} self.collections = {} self.lock = threading.Lock() def _make_id(self): return str(uuid4())
[docs] def insert_current(self, collection, obj, store_permanently=True): obj_id = self._make_id() obj = create_storage_obj(collection, obj, obj_id) try: obj = to_json(obj) except Exception as e: raise error.InvalidSerialization(f"Problem serializing object for insertion: {e} {obj!r}") with self.lock: self.current[collection] = obj if store_permanently: self.collections.setdefault(collection, {})[obj_id] = obj return obj_id
[docs] def insert(self, collection, obj): obj_id = self._make_id() obj = create_storage_obj(collection, obj, obj_id) try: obj = to_json(obj) except Exception as e: raise error.InvalidSerialization(f"Problem inserting object into collection: {e}, {obj!r}") with self.lock: self.collections.setdefault(collection, {})[obj_id] = obj return obj_id
[docs] def get_current(self, collection): with self.lock: obj = self.current.get(collection, None) if obj: obj = from_json(obj) return obj
[docs] def find(self, collection, obj_id): with self.lock: obj = self.collections.get(collection, {}).get(obj_id) if obj: obj = from_json(obj) return obj
[docs] def clear_current(self, entry_type): with suppress(KeyError): del self.current[entry_type]
[docs] @classmethod def permanently_erase_database(cls, *args, **kwargs): # For some reason we're not seeing all the references disappear # after tests. Perhaps there is some global variable pointing at # the db or one of its referrers, or perhaps a pytest fixture # hasn't been removed. PanMemoryDB.active_dbs = weakref.WeakValueDictionary()