import os
from contextlib import suppress
from glob import glob
from uuid import uuid4
from panoptes.utils import error
from panoptes.utils.database import AbstractPanDB
from panoptes.utils.database.base import create_storage_obj
from panoptes.utils.logging import logger
from panoptes.utils.serializers import from_json
from panoptes.utils.serializers import to_json
[docs]class PanFileDB(AbstractPanDB):
"""Stores collections as files of JSON records."""
[docs] def __init__(self, db_name='panoptes', storage_dir='json_store', **kwargs):
"""Flat file storage for json records.
This will simply store each json record inside a file corresponding
to the type. Each entry will be stored in a single line.
Args:
db_name (str, optional): Name of the database containing the collections.
storage_dir (str, optional): The name of the directory in $PANDIR where
the database files will be stored. Default is `json_store` for backwards
compatibility.
"""
super().__init__(db_name=db_name, **kwargs)
self.db_folder = db_name
# Set up storage directory.
self.storage_dir = os.path.join(os.environ['PANDIR'], storage_dir, self.db_folder)
os.makedirs(self.storage_dir, exist_ok=True)
[docs] def insert_current(self, collection, obj, store_permanently=True):
obj_id = self._make_id()
obj = create_storage_obj(collection, obj, obj_id)
current_fn = self._get_file(collection, permanent=False)
result = obj_id
try:
# Overwrite current collection file with obj.
to_json(obj, filename=current_fn, append=False)
except Exception as e:
raise error.InvalidSerialization(f"Problem serializing object for insertion: {e} {current_fn} {obj!r}")
if not store_permanently:
return result
else:
return self.insert(collection, obj)
[docs] def insert(self, collection, obj):
obj_id = self._make_id()
obj = create_storage_obj(collection, obj, obj_id)
collection_fn = self._get_file(collection)
try:
# Insert record into file
to_json(obj, filename=collection_fn)
return obj_id
except Exception as e:
raise error.InvalidSerialization(f"Problem inserting object into collection: {e}, {obj!r}")
[docs] def get_current(self, collection):
current_fn = self._get_file(collection, permanent=False)
try:
with open(current_fn) as f:
msg = from_json(f.read())
return msg
except FileNotFoundError:
logger.warning(f"No record found for {collection}")
return None
[docs] def find(self, collection, obj_id):
collection_fn = self._get_file(collection)
obj = None
with suppress(FileNotFoundError):
with open(collection_fn, 'r') as f:
for line in f:
if obj_id in line:
obj = from_json(line)
break
return obj
[docs] def clear_current(self, record_type):
"""Clears the current record of the given type.
Args:
record_type (str): The record type, e.g. 'weather', 'environment', etc.
"""
current_f = self._get_file(record_type, permanent=False)
with suppress(FileNotFoundError):
os.remove(current_f)
def _get_file(self, collection, permanent=True):
if permanent:
name = f'{collection}.json'
else:
name = f'current_{collection}.json'
return os.path.join(self.storage_dir, name)
def _make_id(self):
return str(uuid4())
[docs] @classmethod
def permanently_erase_database(cls, db_name, storage_dir='json_store'):
# Clear out any .json files.
storage_dir = os.path.join(os.environ['PANDIR'], storage_dir, db_name)
for f in glob(os.path.join(storage_dir, '*.json')):
os.remove(f)