Source code for panoptes.utils.database.file

import os
from contextlib import suppress
from glob import glob
from uuid import uuid4

from loguru import logger

from panoptes.utils import error
from panoptes.utils.database import AbstractPanDB
from panoptes.utils.database.base import create_storage_obj
from panoptes.utils.serializers import from_json, to_json


[docs] class PanFileDB(AbstractPanDB): """Stores collections as files of JSON records.""" def __init__(self, db_name="panoptes", storage_dir="json_store", **kwargs): """Flat file storage for json records. This will simply store each json record inside a file corresponding to the type. Each entry will be stored in a single line. Args: db_name (str, optional): Name of the database containing the collections. storage_dir (str, optional): The name of the directory where the database files will be stored. Default is `json_store` in current directory. Pass an absolute path for non-relative. """ super().__init__(db_name=db_name, **kwargs) self.db_folder = db_name # Set up storage directory. self.storage_dir = os.path.join(storage_dir, self.db_folder) os.makedirs(self.storage_dir, exist_ok=True)
[docs] def insert_current(self, collection, obj, store_permanently=True): """Insert object as current item in collection. Args: collection (str): Collection name to insert into. obj: Object to insert. store_permanently (bool): Whether to also store in permanent collection. Returns: str: Object ID of inserted item. """ obj_id = self._make_id() result = obj_id storage_obj = create_storage_obj(collection, obj, obj_id) current_fn = self._get_file(collection, permanent=False) try: # Overwrite current collection file with obj. to_json(storage_obj, filename=current_fn, append=False) except Exception as e: raise error.InvalidSerialization(f"Problem serializing before insert: {e!r} {obj!r}") if store_permanently: result = self.insert(collection, obj) return result
[docs] def insert(self, collection, obj): """Insert object into collection. Args: collection (str): Collection name to insert into. obj: Object to insert. Returns: str: Object ID of inserted item. """ obj_id = self._make_id() obj = create_storage_obj(collection, obj, obj_id) collection_fn = self._get_file(collection) try: # Insert record into file to_json(obj, filename=collection_fn) return obj_id except Exception as e: raise error.InvalidSerialization(f"Problem serializing before insert: {e!r} {obj!r}")
[docs] def get_current(self, collection): """Get current object from collection. Args: collection (str): Collection name to get current from. Returns: dict or None: Current object in collection, or None if not found. """ current_fn = self._get_file(collection, permanent=False) try: with open(current_fn) as f: msg = from_json(f.read()) return msg except FileNotFoundError: logger.warning(f"No record found for {collection}") return None
[docs] def find(self, collection, obj_id): """Find object by ID in collection. Args: collection (str): Collection name to search in. obj_id (str): Object ID to find. Returns: dict or None: Found object, or None if not found. """ collection_fn = self._get_file(collection) obj = None with suppress(FileNotFoundError): with open(collection_fn) as f: for line in f: if obj_id in line: obj = from_json(line) break return obj
[docs] def clear_current(self, record_type): """Clears the current record of the given type. Args: record_type (str): The record type, e.g. 'weather', 'environment', etc. """ current_f = self._get_file(record_type, permanent=False) with suppress(FileNotFoundError): os.remove(current_f)
def _get_file(self, collection, permanent=True): """Get file path for collection. Args: collection (str): Collection name. permanent (bool): Whether to get permanent or current file. Returns: str: Full file path for the collection. """ if permanent: name = f"{collection}.json" else: name = f"current_{collection}.json" return os.path.join(self.storage_dir, name) def _make_id(self): """Generate a unique ID for database objects. Returns: str: Unique identifier string. """ return str(uuid4())
[docs] @classmethod def permanently_erase_database(cls, db_name, storage_dir=None): """Permanently erase the database. For testing purposes only. Removes all JSON files from storage directory. Args: db_name (str): Database name. storage_dir (str, optional): Storage directory path. """ # Clear out any .json files. storage_dir = os.path.join(storage_dir, db_name) for f in glob(os.path.join(storage_dir, "*.json")): os.remove(f)