Source code for panoptes.utils.images.cr2

import os
import shutil
import subprocess
from json import loads
from pathlib import Path
from typing import Union, Optional
from warnings import warn

import numpy as np
from PIL import Image, ImageDraw, ImageFont
from astropy.io import fits
from dateutil.parser import parse as date_parse
from loguru import logger

from panoptes.utils import error
from panoptes.utils.images import fits as fits_utils


[docs]def cr2_to_fits( cr2_fname: Union[str, Path], fits_fname: str = None, overwrite: bool = False, headers: dict = None, fits_headers: dict = None, remove_cr2: bool = False, **kwargs) -> Union[Path, None]: # pragma: no cover """Convert a CR2 file to FITS. This is a convenience function that first converts the CR2 to PGM via ~cr2_to_pgm. Also adds keyword headers to the FITS file. Note: The intermediate PGM file is automatically removed Arguments: cr2_fname (str): Name of the CR2 file to be converted. fits_fname (str, optional): Name of the FITS file to output. Default is `None`, in which case the `cr2_fname` is used as the base. overwrite (bool, optional): Overwrite existing FITS, default False. headers (dict, optional): Header data added to the FITS file. fits_headers (dict, optional): Header data added to the FITS file without filtering. remove_cr2 (bool, optional): If CR2 should be removed after processing, default False. **kwargs: Description Returns: str: The full path to the generated FITS file. """ if fits_headers is None: fits_headers = {} if headers is None: headers = {} # Convert path to just a str. if isinstance(cr2_fname, Path): cr2_fname = str(cr2_fname) if isinstance(fits_fname, Path): fits_fname = str(fits_fname) if fits_fname is None: fits_fname = cr2_fname.replace('.cr2', '.fits') if not os.path.exists(fits_fname) or overwrite: logger.debug(f'Converting CR2 to PGM: {cr2_fname}') # Convert the CR2 to a PGM file then delete PGM try: pgm = read_pgm(cr2_to_pgm(cr2_fname), remove_after=True) except error.InvalidSystemCommand: logger.warning(f'No dcraw on the system, cannot proceed.') return None # Add the EXIF information from the CR2 file exif = read_exif(cr2_fname) # Set the PGM as the primary data for the FITS file hdu = fits.PrimaryHDU(pgm) obs_date = date_parse(exif.get('DateTimeOriginal', '').replace(':', '-', 2)).isoformat() # Set some default headers hdu.header.set('FILTER', 'RGGB') hdu.header.set('ISO', exif.get('ISO', '')) hdu.header.set('EXPTIME', exif.get('ExposureTime', 'Seconds')) hdu.header.set('CAMTEMP', exif.get('CameraTemperature', ''), 'Celsius - From CR2') hdu.header.set('CIRCCONF', exif.get('CircleOfConfusion', ''), 'From CR2') hdu.header.set('COLORTMP', exif.get('ColorTempMeasured', ''), 'From CR2') hdu.header.set('FILENAME', exif.get('FileName', ''), 'From CR2') hdu.header.set('INTSN', exif.get('InternalSerialNumber', ''), 'From CR2') hdu.header.set('CAMSN', exif.get('SerialNumber', ''), 'From CR2') hdu.header.set('MEASEV', exif.get('MeasuredEV', ''), 'From CR2') hdu.header.set('MEASEV2', exif.get('MeasuredEV2', ''), 'From CR2') hdu.header.set('MEASRGGB', exif.get('MeasuredRGGB', ''), 'From CR2') hdu.header.set('WHTLVLN', exif.get('NormalWhiteLevel', ''), 'From CR2') hdu.header.set('WHTLVLS', exif.get('SpecularWhiteLevel', ''), 'From CR2') hdu.header.set('REDBAL', exif.get('RedBalance', ''), 'From CR2') hdu.header.set('BLUEBAL', exif.get('BlueBalance', ''), 'From CR2') hdu.header.set('WBRGGB', exif.get('WB RGGBLevelAsShot', ''), 'From CR2') hdu.header.set('DATE-OBS', obs_date) for key, value in fits_headers.items(): try: hdu.header.set(key.upper()[0: 8], value) except Exception: pass try: logger.debug(f'Saving fits file to: {fits_fname}') hdu.writeto(fits_fname, output_verify='silentfix', overwrite=overwrite) except Exception as e: warn(f'Problem writing FITS file: {e}') else: if remove_cr2: os.unlink(cr2_fname) fits_utils.update_observation_headers(fits_fname, headers) return Path(fits_fname)
[docs]def cr2_to_pgm( cr2_fname, pgm_fname=None, overwrite=True, *args, **kwargs): # pragma: no cover """ Convert CR2 file to PGM Converts a raw Canon CR2 file to a netpbm PGM file via `dcraw`. Assumes `dcraw` is installed on the system Note: This is a blocking call Arguments: cr2_fname {str} -- Name of CR2 file to convert **kwargs {dict} -- Additional keywords to pass to script Keyword Arguments: pgm_fname {str} -- Name of PGM file to output, if None (default) then use same name as CR2 (default: {None}) dcraw {str} -- Path to installed `dcraw` (default: {'dcraw'}) overwrite {bool} -- A bool indicating if existing PGM should be overwritten (default: {True}) Returns: str -- Filename of PGM that was created """ dcraw = shutil.which('dcraw') if dcraw is None: raise error.InvalidCommand('dcraw not found') if pgm_fname is None: pgm_fname = cr2_fname.replace('.cr2', '.pgm') if os.path.exists(pgm_fname) and not overwrite: logger.warning(f'PGM file exists, returning existing file: {pgm_fname}') else: try: # Build the command for this file command = f'{dcraw} -t 0 -D -4 {cr2_fname}' cmd_list = command.split() logger.debug(f'PGM Conversion command: \n {cmd_list}') # Run the command if subprocess.check_call(cmd_list) == 0: logger.debug('PGM Conversion command successful') except subprocess.CalledProcessError as err: raise error.InvalidSystemCommand(msg=f"File: {cr2_fname} \n err: {err}") return pgm_fname
[docs]def read_exif(fname, exiftool='exiftool'): # pragma: no cover """ Read the EXIF information Gets the EXIF information using exiftool Note: Assumes the `exiftool` is installed Args: fname {str} -- Name of file (CR2) to read Keyword Args: exiftool {str} -- Location of exiftool (default: {'/usr/bin/exiftool'}) Returns: dict -- Dictionary of EXIF information """ assert os.path.exists(fname), warn(f"File does not exist: {fname}") exif = {} try: # Build the command for this file command = f'{exiftool} -j {fname}' cmd_list = command.split() # Run the command exif = loads(subprocess.check_output(cmd_list).decode('utf-8')) except subprocess.CalledProcessError as err: raise error.InvalidSystemCommand(msg=f"File: {fname} \n err: {err}") return exif[0]
[docs]def read_pgm(fname, byteorder='>', remove_after=False): # pragma: no cover """Return image data from a raw PGM file as numpy array. Note: Format Spec: http://netpbm.sourceforge.net/doc/pgm.html Source: http://stackoverflow.com/questions/7368739/numpy-and-16-bit-pgm Note: This is correctly processed as a Big endian even though the CR2 itself marks it as a Little endian. See the notes in Source page above as well as the comment about significant bit in the Format Spec Args: fname(str): Filename of PGM to be converted byteorder(str): Big endian remove_after(bool): Delete fname file after reading, defaults to False. overwrite(bool): overwrite existing PGM or not, defaults to True Returns: numpy.array: The raw data from the PGMx """ with open(fname, 'rb') as f: buffer = f.read() # We know our header info is 19 chars long header_offset = 19 img_type, img_size, img_max_value, _ = buffer[ 0:header_offset].decode().split('\n') assert img_type == 'P5', warn("Not a PGM file") # Get the width and height (as strings) width, height = img_size.split(' ') data = np.flipud(np.frombuffer(buffer[header_offset:], dtype=byteorder + 'u2', ).reshape((int(height), int(width)))) if remove_after: os.remove(fname) return data
[docs]def cr2_to_jpg( cr2_fname: Path, jpg_fname: str = None, title: str = '', overwrite: bool = False, remove_cr2: bool = False, ) -> Optional[Path]: """Extract a JPG image from a CR2, return the new path name.""" exiftool = shutil.which('exiftool') if not exiftool: # pragma: no cover raise error.InvalidSystemCommand('exiftool not found') jpg_fname = Path(jpg_fname) if jpg_fname else cr2_fname.with_suffix('.jpg') if jpg_fname.exists() and overwrite is False: raise error.AlreadyExists(f'{jpg_fname} already exists and overwrite is False') cmd = [exiftool, '-b', '-PreviewImage', cr2_fname.as_posix()] comp_proc = subprocess.run(cmd, check=True, stdout=jpg_fname.open('wb')) if comp_proc.returncode != 0: # pragma: no cover raise error.InvalidSystemCommand(f'{comp_proc.returncode}') if title and title > '': try: im = Image.open(jpg_fname) id = ImageDraw.Draw(im) im.info['title'] = title try: fnt = ImageFont.truetype('FreeMono.ttf', 120) except Exception: # pragma: no cover fnt = ImageFont.load_default() bottom_padding = 25 position = (im.size[0] / 2, im.size[1] - bottom_padding) id.text(position, title, font=fnt, fill=(255, 0, 0), anchor='ms') print(f'Adding title={title} to {jpg_fname.as_posix()}') im.save(jpg_fname) except Exception: raise error.InvalidSystemCommand(f'Error adding title to {jpg_fname.as_posix()}') if remove_cr2: print(f'Removing {cr2_fname}') cr2_fname.unlink() return jpg_fname