Source code for panoptes.utils.images.misc

import os
import shutil
import subprocess
from contextlib import suppress

import numpy as np
from astropy import units as u
from astropy.nddata import Cutout2D
from loguru import logger

from panoptes.utils import error


[docs]def make_timelapse( directory, fn_out=None, glob_pattern='20[1-9][0-9]*T[0-9]*.jpg', overwrite=False, timeout=60, **kwargs): # pragma: no cover """Create a timelapse. A timelapse is created from all the images in given ``directory`` Args: directory (str): Directory containing image files. fn_out (str, optional): Full path to output file name, if not provided, defaults to `directory` basename. glob_pattern (str, optional): A glob file pattern of images to include, default '20[1-9][0-9]*T[0-9]*.jpg', which corresponds to the observation images but excludes any pointing images. The pattern should be relative to the local directory. overwrite (bool, optional): Overwrite timelapse if exists, default False. timeout (int): Timeout for making movie, default 60 seconds. **kwargs (dict): Returns: str: Name of output file Raises: error.InvalidSystemCommand: Raised if ffmpeg command is not found. FileExistsError: Raised if fn_out already exists and overwrite=False. """ if fn_out is None: head, tail = os.path.split(directory) if tail == '': head, tail = os.path.split(head) field_name = head.split('/')[-2] cam_name = head.split('/')[-1] fname = f'{field_name}_{cam_name}_{tail}.mp4' fn_out = os.path.normpath(os.path.join(directory, fname)) if os.path.exists(fn_out) and not overwrite: raise FileExistsError("Timelapse exists. Set overwrite=True if needed") ffmpeg = shutil.which('ffmpeg') if ffmpeg is None: raise error.InvalidSystemCommand("ffmpeg not found, can't make timelapse") inputs_glob = os.path.join(directory, glob_pattern) try: ffmpeg_cmd = [ ffmpeg, '-r', '3', '-pattern_type', 'glob', '-i', inputs_glob, '-s', 'hd1080', '-vcodec', 'libx264', ] if overwrite: ffmpeg_cmd.append('-y') ffmpeg_cmd.append(fn_out) logger.debug(ffmpeg_cmd) proc = subprocess.Popen(ffmpeg_cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: # Don't wait forever outs, errs = proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate() finally: logger.debug(f"Output: {outs}") logger.debug(f"Errors: {errs}") # Double-check for file existence if not os.path.exists(fn_out): fn_out = None except Exception as e: raise error.PanError(f"Problem creating timelapse in {fn_out}: {e!r}") return fn_out
[docs]def crop_data(data, box_width=200, center=None, data_only=True, wcs=None, **kwargs): """Return a cropped portion of the image. Shape is a box centered around the middle of the data .. plot:: :include-source: >>> from matplotlib import pyplot as plt >>> from astropy.wcs import WCS >>> from panoptes.utils.images.misc import crop_data >>> from panoptes.utils.images.plot import add_colorbar, get_palette >>> from panoptes.utils.images.fits import getdata >>> >>> fits_url = 'https://github.com/panoptes/panoptes-utils/raw/develop/tests/data/solved.fits.fz' >>> data, header = getdata(fits_url, header=True) >>> wcs = WCS(header) >>> # Crop a portion of the image by WCS and get Cutout2d object. >>> cropped = crop_data(data, center=(600, 400), box_width=100, wcs=wcs, data_only=False) >>> fig, ax = plt.subplots() >>> im = ax.imshow(cropped.data, origin='lower', cmap=get_palette()) >>> add_colorbar(im) >>> plt.show() Args: data (`numpy.array`): Array of data. box_width (int, optional): Size of box width in pixels, defaults to 200px. center (tuple(int, int), optional): Crop around set of coords, default to image center. data_only (bool, optional): If True (default), return only data. If False return the `Cutout2D` object. wcs (None|`astropy.wcs.WCS`, optional): A valid World Coordinate System (WCS) that will be cropped along with the data if provided. Returns: np.array: A clipped (thumbnailed) version of the data if `data_only=True`, otherwise a `astropy.nddata.Cutout2D` object. """ assert data.shape[ 0] >= box_width, f"Can't clip data, it's smaller than {box_width} ({data.shape})" # Get the center if center is None: x_len, y_len = data.shape x_center = int(x_len / 2) y_center = int(y_len / 2) else: y_center = int(center[0]) x_center = int(center[1]) logger.debug(f"Using center: {x_center} {y_center}") logger.debug(f"Box width: {box_width}") cutout = Cutout2D(data, (y_center, x_center), box_width, wcs=wcs) if data_only: return cutout.data return cutout
[docs]def mask_saturated(data, saturation_level=None, threshold=0.9, bit_depth=None, dtype=None): """Convert data to a masked array with saturated values masked. .. plot:: :include-source: >>> from matplotlib import pyplot as plt >>> from astropy.wcs import WCS >>> from panoptes.utils.images.misc import crop_data, mask_saturated >>> from panoptes.utils.images.plot import add_colorbar, get_palette >>> from panoptes.utils.images.fits import getdata >>> >>> fits_url = 'https://github.com/panoptes/panoptes-utils/raw/develop/tests/data/solved.fits.fz' >>> data, header = getdata(fits_url, header=True) >>> wcs = WCS(header) >>> # Crop a portion of the image by WCS and get Cutout2d object. >>> cropped = crop_data(data, center=(600, 400), box_width=100, wcs=wcs, data_only=False) >>> masked = mask_saturated(cropped.data, saturation_level=11535) >>> fig, ax = plt.subplots() >>> im = ax.imshow(masked, origin='lower', cmap=get_palette()) >>> add_colorbar(im) >>> fig.show() Args: data (array_like): The numpy data array. saturation_level (scalar, optional): The saturation level. If not given then the saturation level will be set to threshold times the maximum pixel value. threshold (float, optional): The fraction of the maximum pixel value to use as the saturation level, default 0.9. bit_depth (astropy.units.Quantity or int, optional): The effective bit depth of the data. If given the maximum pixel value will be assumed to be 2**bit_depth, otherwise an attempt will be made to infer the maximum pixel value from the data type of the data. If data is not an integer type the maximum pixel value cannot be inferred and an IllegalValue exception will be raised. dtype (numpy.dtype, optional): The requested dtype for the masked array. If not given the dtype of the masked array will be same as data. Returns: numpy.ma.array: The masked numpy array. Raises: error.IllegalValue: Raised if bit_depth is an astropy.units.Quantity object but the units are not compatible with either bits or bits/pixel. error.IllegalValue: Raised if neither saturation level or bit_depth are given, and data has a non integer data type. """ if not saturation_level: if bit_depth is not None: try: with suppress(AttributeError): bit_depth = bit_depth.to_value(unit=u.bit) except u.UnitConversionError: try: bit_depth = bit_depth.to_value(unit=u.bit / u.pixel) except u.UnitConversionError: raise error.IllegalValue("bit_depth must have units of bits or bits/pixel, " + f"got {bit_depth!r}") bit_depth = int(bit_depth) logger.trace(f"Using bit depth {bit_depth!r}") saturation_level = threshold * (2 ** bit_depth - 1) else: # No bit depth specified, try to guess. logger.trace(f"Inferring bit_depth from data type, {data.dtype!r}") try: # Try to use np.iinfo to compute machine limits. Will work for integer types. saturation_level = threshold * np.iinfo(data.dtype).max except ValueError: # ValueError from np.iinfo means not an integer type. raise error.IllegalValue("Neither saturation_level or bit_depth given, and data " + "is not an integer type. Cannot determine correct " + "saturation level.") logger.debug(f"Masking image using saturation level {saturation_level!r}") # Convert data to masked array of requested dtype, mask values above saturation level. return np.ma.array(data, mask=(data > saturation_level), dtype=dtype)