Source code for panoptes.utils.database.memory

import threading
import weakref
from contextlib import suppress
from uuid import uuid4

from panoptes.utils import error
from panoptes.utils.database import AbstractPanDB
from panoptes.utils.database.base import create_storage_obj
from panoptes.utils.serializers import from_json, to_json


[docs] class PanMemoryDB(AbstractPanDB): """In-memory store of serialized objects. We serialize the objects in order to test the same code path used when storing in an external database. """ active_dbs = weakref.WeakValueDictionary()
[docs] @classmethod def get_or_create(cls, db_name=None, **kwargs): """Returns the named db, creating if needed. This method exists because PanDB gets called multiple times for the same database name. With mongo or a file store where the storage is external from the instance, that is not a problem, but with PanMemoryDB the instance is the store, so the instance must be shared.""" db = PanMemoryDB.active_dbs.get(db_name) if not db: db = PanMemoryDB(db_name=db_name, **kwargs) PanMemoryDB.active_dbs[db_name] = db return db
def __init__(self, **kwargs): """Initialize in-memory database. Args: **kwargs: Additional keyword arguments passed to parent class. """ super().__init__(**kwargs) self.current = {} self.collections = {} self.lock = threading.Lock() def _make_id(self): """Generate a unique ID for database objects. Returns: str: Unique identifier string. """ return str(uuid4())
[docs] def insert_current(self, collection, obj, store_permanently=True): """Insert object as current item in collection. Args: collection (str): Collection name to insert into. obj: Object to insert. store_permanently (bool): Whether to also store in permanent collection. Returns: str: Object ID of inserted item. """ obj_id = self._make_id() obj = create_storage_obj(collection, obj, obj_id) try: obj = to_json(obj) except Exception as e: raise error.InvalidSerialization(f"Problem serializing object for insertion: {e} {obj!r}") with self.lock: self.current[collection] = obj if store_permanently: self.collections.setdefault(collection, {})[obj_id] = obj return obj_id
[docs] def insert(self, collection, obj): """Insert object into collection. Args: collection (str): Collection name to insert into. obj: Object to insert. Returns: str: Object ID of inserted item. """ obj_id = self._make_id() obj = create_storage_obj(collection, obj, obj_id) try: obj = to_json(obj) except Exception as e: raise error.InvalidSerialization(f"Problem inserting object into collection: {e}, {obj!r}") with self.lock: self.collections.setdefault(collection, {})[obj_id] = obj return obj_id
[docs] def get_current(self, collection): """Get current object from collection. Args: collection (str): Collection name to get current from. Returns: dict or None: Current object in collection, or None if not found. """ with self.lock: obj = self.current.get(collection, None) if obj: obj = from_json(obj) return obj
[docs] def find(self, collection, obj_id): """Find object by ID in collection. Args: collection (str): Collection name to search in. obj_id (str): Object ID to find. Returns: dict or None: Found object, or None if not found. """ with self.lock: obj = self.collections.get(collection, {}).get(obj_id) if obj: obj = from_json(obj) return obj
[docs] def clear_current(self, entry_type): """Clear current entry for specified type. Args: entry_type (str): Entry type to clear. """ with suppress(KeyError): del self.current[entry_type]
[docs] @classmethod def permanently_erase_database(cls, *args, **kwargs): """Permanently erase the database. For testing purposes only. Erases all data and references. Args: *args: Positional arguments (ignored). **kwargs: Keyword arguments (ignored). """ # For some reason we're not seeing all the references disappear # after tests. Perhaps there is some global variable pointing at # the db or one of its referrers, or perhaps a pytest fixture # hasn't been removed. PanMemoryDB.active_dbs = weakref.WeakValueDictionary()