Source code for panoptes.utils.utils

import collections.abc
import contextlib
import os
import re
import shutil
import signal
from urllib.parse import unquote as unquote_url

import numpy as np
from astropy import units as u
from astropy.coordinates import AltAz
from astropy.coordinates import ICRS
from astropy.coordinates import SkyCoord
from panoptes.utils.time import current_time

PATH_MATCHER = re.compile(r'''
    .*?
    (?P<unit_id>PAN\d{3})[/_]{1}
    (?P<camera_id>[a-gA-G0-9]{6})[/_]{1}
    (?P<sequence_id>[0-9]{8}T[0-9]{6})[/_]{1}
    (?P<image_id>[0-9]{8}T[0-9]{6})
    .*?
''', re.VERBOSE)


[docs]def listify(obj): """ Given an object, return a list. Always returns a list. If obj is None, returns empty list, if obj is list, just returns obj, otherwise returns list with obj as single member. If a `dict` object is passed then this function will return a list of *only* the values. .. doctest:: >>> listify(42) [42] >>> listify('foo') ['foo'] >>> listify(None) [] >>> listify(['a']) ['a'] >>> my_dict = dict(a=42, b='foo') >>> listify(my_dict) [42, 'foo'] >>> listify(my_dict.values()) [42, 'foo'] >>> listify(my_dict.keys()) ['a', 'b'] Returns: list: You guessed it. """ if obj is None: return list() elif isinstance(obj, list): return obj elif isinstance(obj, dict): return list(obj.values()) elif isinstance(obj, (collections.abc.ValuesView, collections.abc.KeysView)): return list(obj) else: return [obj]
[docs]def get_free_space(directory=None): """Return the amoung of freespace in gigabytes for given directory. >>> from panoptes.utils import get_free_space >>> get_free_space() <Quantity ... Gbyte> >>> get_free_space(directory='/') <Quantity ... Gbyte> Args: directory (str, optional): Path to directory. If None defaults to $PANDIR. Returns: astropy.units.Quantity: The number of gigabytes avialable in folder. """ if directory is None: directory = os.getenv('PANDIR') _, _, free_space = shutil.disk_usage(directory) free_space = (free_space * u.byte).to(u.gigabyte) return free_space
[docs]def string_to_params(opts): """Parses a single string into parameters that can be passed to a function. A user of the `peas_shell` can supply positional and keyword arguments to the command being called, however the `Cmd` module that is used for the shell does not parse these options but instead passes this as a single string. This utility method does some simple parsing of that string and returns a list of positional parameters and a dictionary of keyword arguments. A keyword argument is considered anything that contains an equal sign (e.g. `exptime=30`). Any leading `--` to a keyword argument will be stripped during parsing. A list of items can be passed by specifying the keyword argument multiple times. Note: This function will attempt to parse keyword values as floats if possible. If a string is required include a single quote around the value, e.g. `param='42'` will keep the value as the string `'42'`. >>> from panoptes.utils import string_to_params >>> args, kwargs = string_to_params("parg1 parg2 key1=a_str key2=2 key2='2' key3=03") >>> args ['parg1', 'parg2'] >>> kwargs {'key1': 'a_str', 'key2': [2.0, '2'], 'key3': 3.0} >>> isinstance(kwargs['key2'][0], float) True >>> isinstance(kwargs['key2'][1], str) True >>> kwargs['key2'][1] == '2' True >>> args, kwargs = string_to_params('--key1=val1 --key1-2=val1-2') >>> kwargs {'key1': 'val1', 'key1-2': 'val1-2'} Args: opts (str): A single string containing everything beyond the actual command that is called. Returns: tuple(list, dict): Returns a list of positional parameters and a dictionary of keyword arguments. These correspond to the *args and **kwargs that a typical function would receive. """ args = [] kwargs = {} for opt in opts.split(' '): if '=' not in opt: args.append(opt) else: name, value = opt.split('=', maxsplit=1) if name.startswith('--') and len(name) > 2: name = name[2:] if "'" in value: # Remove the explict single quotes. value = value.replace("'", "") else: # Make it a number if possible. with contextlib.suppress(ValueError): value = float(value) if name in kwargs: kwargs[name] = listify(kwargs[name]) kwargs[name].append(value) else: kwargs[name] = value return args, kwargs
[docs]def altaz_to_radec(alt=None, az=None, location=None, obstime=None, **kwargs): """Convert alt/az degrees to RA/Dec SkyCoord. >>> from panoptes.utils import altaz_to_radec >>> from astropy.coordinates import EarthLocation >>> from astropy import units as u >>> keck = EarthLocation.of_site('Keck Observatory') ... >>> altaz_to_radec(alt=75, az=180, location=keck, obstime='2020-02-02T20:20:02.02') <SkyCoord (ICRS): (ra, dec) in deg (281.78..., 4.807...)> >>> # Can use quantities or not. >>> alt = 4500 * u.arcmin >>> az = 180 * u.degree >>> altaz_to_radec(alt=alt, az=az, location=keck, obstime='2020-02-02T20:20:02.02') <SkyCoord (ICRS): (ra, dec) in deg (281.78..., 4.807...)> >>> # Will use current time if none given. >>> altaz_to_radec(alt=35, az=90, location=keck) <SkyCoord (ICRS): (ra, dec) in deg (..., ...)> >>> # Must pass a `location` instance. >>> altaz_to_radec() Traceback (most recent call last): File "<stdin>", line 1, in <module> ... assert location is not None AssertionError Args: alt (astropy.units.Quantity or scalar): Altitude. az (astropy.units.Quantity or scalar): Azimuth. location (astropy.coordinates.EarthLocation, required): A valid location. obstime (None, optional): Time for object, defaults to `current_time` Returns: astropy.coordinates.SkyCoord: Coordinates corresponding to the AltAz. """ assert location is not None if obstime is None: obstime = current_time() alt = get_quantity_value(alt, 'degree') * u.degree az = get_quantity_value(az, 'degree') * u.degree altaz = AltAz(obstime=obstime, location=location, alt=alt, az=az) return SkyCoord(altaz.transform_to(ICRS))
[docs]class DelaySigTerm(contextlib.ContextDecorator): """Supports delaying SIGTERM during a critical section. This allows one to avoid having SIGTERM interrupt a critical block of code, such as saving to a database. Example: .. with DelaySigTerm(): db.WriteCurrentRecord(record) """ # TODO(jamessynge): Consider generalizing as DelaySignal(signum). def __enter__(self, callback=None): """ Args: callback: If not None, called when SIGTERM is handled, with kwargs previously_caught and frame. """ self.caught = False self.old_handler = signal.getsignal(signal.SIGTERM) if callback: assert callable(callback) self.callback = callback else: self.callback = None def handler(signum, frame): previously_caught = self.caught self.caught = True if self.callback: self.callback(previously_caught=previously_caught, frame=frame) signal.signal(signal.SIGTERM, handler) return self def __exit__(self, *exc): signal.signal(signal.SIGTERM, self.old_handler) if self.caught: # Send SIGTERM to this process. os.kill(os.getpid(), signal.SIGTERM) # Suppress any exception caught while the context was running. return True return False
[docs]def get_quantity_value(quantity, unit=None): """ Thin-wrapper around the `astropy.units.Quantity.to_value` method. If passed something other than a Quantity will simply return the original object. >>> from astropy import units as u >>> from panoptes.utils import get_quantity_value >>> get_quantity_value(60 * u.second) 60.0 >>> # Can convert between units. >>> get_quantity_value(60 * u.minute, unit='second') 3600.0 >>> get_quantity_value(60 * u.minute, unit=u.second) 3600.0 >>> get_quantity_value(60) 60 Args: quantity (astropy.units.Quantity or scalar): Quantity to extract numerical value from. unit (astropy.units.Unit, optional): unit to convert to. Returns: float: numerical value of the Quantity after conversion to the specified unit. """ try: return quantity.to_value(unit) except AttributeError: return quantity
[docs]def moving_average(data_set, periods=3): """Moving average. Args: data_set (`numpy.array`): An array of values over which to perform the moving average. periods (int, optional): Number of periods. Returns: `numpy.array`: An array of the computed averages. """ weights = np.ones(periods) / periods return np.convolve(data_set, weights, mode='same')
[docs]def image_id_from_path(path): """Return the `image_id` from the given path or uri. >>> from panoptes.utils import image_id_from_path >>> path = 'gs://panoptes-raw-images/PAN012/ee04d1/20190820T111638/20190820T122447.fits' >>> image_id_from_path(path) 'PAN012_ee04d1_20190820T122447' >>> path = 'nothing/to/match' >>> image_id_from_path(path) Args: path (str): A path or uri for a file. Returns: str: The image id in the form "<unit_id>_<camera_id>_<image_id>" """ result = PATH_MATCHER.match(unquote_url(path)) with contextlib.suppress(AttributeError): match = '{}_{}_{}'.format( result.group('unit_id'), result.group('camera_id'), result.group('image_id') ) return match return None
[docs]def sequence_id_from_path(path): """Return the `sequence_id` from the given path or uri. >>> from panoptes.utils import sequence_id_from_path >>> path = 'gs://panoptes-raw-images/PAN012/ee04d1/20190820T111638/20190820T122447.fits' >>> sequence_id_from_path(path) 'PAN012_ee04d1_20190820T111638' >>> path = 'nothing/to/match' >>> sequence_id_from_path(path) Args: path (str): A path or uri for a file. Returns: str: The image id in the form "<unit_id>_<camera_id>_<sequence_id>" """ result = PATH_MATCHER.match(unquote_url(path)) with contextlib.suppress(AttributeError): match = '{}_{}_{}'.format( result.group('unit_id'), result.group('camera_id'), result.group('sequence_id') ) return match return None