Source code for panoptes.utils.database.file
import os
from contextlib import suppress
from glob import glob
from uuid import uuid4
from loguru import logger
from panoptes.utils import error
from panoptes.utils.database import AbstractPanDB
from panoptes.utils.database.base import create_storage_obj
from panoptes.utils.serializers import from_json
from panoptes.utils.serializers import to_json
[docs]class PanFileDB(AbstractPanDB):
"""Stores collections as files of JSON records."""
def __init__(self, db_name='panoptes', storage_dir='json_store', **kwargs):
"""Flat file storage for json records.
This will simply store each json record inside a file corresponding
to the type. Each entry will be stored in a single line.
Args:
db_name (str, optional): Name of the database containing the collections.
storage_dir (str, optional): The name of the directory where the
database files will be stored. Default is `json_store` in current
directory. Pass an absolute path for non-relative.
"""
super().__init__(db_name=db_name, **kwargs)
self.db_folder = db_name
# Set up storage directory.
self.storage_dir = os.path.join(storage_dir, self.db_folder)
os.makedirs(self.storage_dir, exist_ok=True)
[docs] def insert_current(self, collection, obj, store_permanently=True):
obj_id = self._make_id()
result = obj_id
storage_obj = create_storage_obj(collection, obj, obj_id)
current_fn = self._get_file(collection, permanent=False)
try:
# Overwrite current collection file with obj.
to_json(storage_obj, filename=current_fn, append=False)
except Exception as e:
raise error.InvalidSerialization(f"Problem serializing before insert: {e!r} {obj!r}")
if store_permanently:
result = self.insert(collection, obj)
return result
[docs] def insert(self, collection, obj):
obj_id = self._make_id()
obj = create_storage_obj(collection, obj, obj_id)
collection_fn = self._get_file(collection)
try:
# Insert record into file
to_json(obj, filename=collection_fn)
return obj_id
except Exception as e:
raise error.InvalidSerialization(f"Problem serializing before insert: {e!r} {obj!r}")
[docs] def get_current(self, collection):
current_fn = self._get_file(collection, permanent=False)
try:
with open(current_fn) as f:
msg = from_json(f.read())
return msg
except FileNotFoundError:
logger.warning(f"No record found for {collection}")
return None
[docs] def find(self, collection, obj_id):
collection_fn = self._get_file(collection)
obj = None
with suppress(FileNotFoundError):
with open(collection_fn, 'r') as f:
for line in f:
if obj_id in line:
obj = from_json(line)
break
return obj
[docs] def clear_current(self, record_type):
"""Clears the current record of the given type.
Args:
record_type (str): The record type, e.g. 'weather', 'environment', etc.
"""
current_f = self._get_file(record_type, permanent=False)
with suppress(FileNotFoundError):
os.remove(current_f)
def _get_file(self, collection, permanent=True):
if permanent:
name = f'{collection}.json'
else:
name = f'current_{collection}.json'
return os.path.join(self.storage_dir, name)
def _make_id(self):
return str(uuid4())
[docs] @classmethod
def permanently_erase_database(cls, db_name, storage_dir=None):
# Clear out any .json files.
storage_dir = os.path.join(storage_dir, db_name)
for f in glob(os.path.join(storage_dir, '*.json')):
os.remove(f)