Source code for panoptes.utils.images.cr2

import os
import shutil
import subprocess
from json import loads
from pathlib import Path
from typing import BinaryIO, TextIO
from warnings import warn

import numpy as np
from astropy.io import fits
from dateutil.parser import parse as date_parse
from loguru import logger
from PIL import Image, ImageDraw, ImageFont

from panoptes.utils import error
from panoptes.utils.images import fits as fits_utils
from panoptes.utils.utils import normalize_file_input


[docs] def cr2_to_fits( cr2_fname: str | Path | TextIO | BinaryIO, fits_fname: str | Path | TextIO | BinaryIO = None, overwrite: bool = False, headers: dict = None, fits_headers: dict = None, remove_cr2: bool = False, **kwargs, ) -> Path | None: # pragma: no cover """Convert a CR2 file to FITS. This is a convenience function that first converts the CR2 to PGM via ~cr2_to_pgm. Also adds keyword headers to the FITS file. Note: The intermediate PGM file is automatically removed Arguments: cr2_fname: Name of the CR2 file to be converted. Can be a string path, pathlib.Path object, or open filehandle. fits_fname: Name of the FITS file to output. Can be a string path, pathlib.Path object, or open filehandle. Default is `None`, in which case the `cr2_fname` is used as the base. overwrite (bool, optional): Overwrite existing FITS, default False. headers (dict, optional): Header data added to the FITS file. fits_headers (dict, optional): Header data added to the FITS file without filtering. remove_cr2 (bool, optional): If CR2 should be removed after processing, default False. **kwargs: Description Returns: str: The full path to the generated FITS file. """ if fits_headers is None: fits_headers = {} if headers is None: headers = {} # Normalize file inputs to string paths cr2_fname = normalize_file_input(cr2_fname) if fits_fname is not None: fits_fname = normalize_file_input(fits_fname) else: fits_fname = cr2_fname.replace(".cr2", ".fits") if not os.path.exists(fits_fname) or overwrite: logger.debug(f"Converting CR2 to PGM: {cr2_fname}") # Convert the CR2 to a PGM file then delete PGM try: pgm = read_pgm(cr2_to_pgm(cr2_fname), remove_after=True) except error.InvalidSystemCommand: logger.warning("No dcraw on the system, cannot proceed.") return None # Add the EXIF information from the CR2 file exif = read_exif(cr2_fname) # Set the PGM as the primary data for the FITS file hdu = fits.PrimaryHDU(pgm) obs_date = date_parse(exif.get("DateTimeOriginal", "").replace(":", "-", 2)).isoformat() # Set some default headers hdu.header.set("FILTER", "RGGB") hdu.header.set("ISO", exif.get("ISO", "")) hdu.header.set("EXPTIME", exif.get("ExposureTime", "Seconds")) hdu.header.set("CAMTEMP", exif.get("CameraTemperature", ""), "Celsius - From CR2") hdu.header.set("CIRCCONF", exif.get("CircleOfConfusion", ""), "From CR2") hdu.header.set("COLORTMP", exif.get("ColorTempMeasured", ""), "From CR2") hdu.header.set("FILENAME", exif.get("FileName", ""), "From CR2") hdu.header.set("INTSN", exif.get("InternalSerialNumber", ""), "From CR2") hdu.header.set("CAMSN", exif.get("SerialNumber", ""), "From CR2") hdu.header.set("MEASEV", exif.get("MeasuredEV", ""), "From CR2") hdu.header.set("MEASEV2", exif.get("MeasuredEV2", ""), "From CR2") hdu.header.set("MEASRGGB", exif.get("MeasuredRGGB", ""), "From CR2") hdu.header.set("WHTLVLN", exif.get("NormalWhiteLevel", ""), "From CR2") hdu.header.set("WHTLVLS", exif.get("SpecularWhiteLevel", ""), "From CR2") hdu.header.set("REDBAL", exif.get("RedBalance", ""), "From CR2") hdu.header.set("BLUEBAL", exif.get("BlueBalance", ""), "From CR2") hdu.header.set("WBRGGB", exif.get("WB RGGBLevelAsShot", ""), "From CR2") hdu.header.set("DATE-OBS", obs_date) for key, value in fits_headers.items(): try: hdu.header.set(key.upper()[0:8], value) except Exception: pass try: logger.debug(f"Saving fits file to: {fits_fname}") hdu.writeto(fits_fname, output_verify="silentfix", overwrite=overwrite) except Exception as e: warn(f"Problem writing FITS file: {e}") else: if remove_cr2: os.unlink(cr2_fname) fits_utils.update_observation_headers(fits_fname, headers) return Path(fits_fname)
[docs] def cr2_to_pgm( cr2_fname: str | Path | TextIO | BinaryIO, pgm_fname: str | Path | TextIO | BinaryIO = None, overwrite=True, *args, **kwargs, ): # pragma: no cover """Convert CR2 file to PGM Converts a raw Canon CR2 file to a netpbm PGM file via `dcraw`. Assumes `dcraw` is installed on the system Note: This is a blocking call Arguments: cr2_fname: Name of CR2 file to convert. Can be a string path, pathlib.Path object, or open filehandle. **kwargs {dict} -- Additional keywords to pass to script Keyword Arguments: pgm_fname: Name of PGM file to output. Can be a string path, pathlib.Path object, or open filehandle. If None (default) then use same name as CR2 (default: {None}) dcraw {str} -- Path to installed `dcraw` (default: {'dcraw'}) overwrite {bool} -- A bool indicating if existing PGM should be overwritten (default: {True}) Returns: str -- Filename of PGM that was created """ # Normalize file inputs to string paths cr2_fname = normalize_file_input(cr2_fname) dcraw = shutil.which("dcraw") if dcraw is None: raise error.InvalidCommand("dcraw not found") if pgm_fname is None: pgm_fname = cr2_fname.replace(".cr2", ".pgm") else: pgm_fname = normalize_file_input(pgm_fname) if os.path.exists(pgm_fname) and not overwrite: logger.warning(f"PGM file exists, returning existing file: {pgm_fname}") else: try: # Build the command for this file command = f"{dcraw} -t 0 -D -4 {cr2_fname}" cmd_list = command.split() logger.debug(f"PGM Conversion command: \n {cmd_list}") # Run the command if subprocess.check_call(cmd_list) == 0: logger.debug("PGM Conversion command successful") except subprocess.CalledProcessError as err: raise error.InvalidSystemCommand(msg=f"File: {cr2_fname} \n err: {err}") return pgm_fname
[docs] def read_exif(fname: str | Path | TextIO | BinaryIO, exiftool="exiftool"): # pragma: no cover """Read the EXIF information Gets the EXIF information using exiftool Note: Assumes the `exiftool` is installed Args: fname: Name of file (CR2) to read. Can be a string path, pathlib.Path object, or open filehandle. Keyword Args: exiftool {str} -- Location of exiftool (default: {'/usr/bin/exiftool'}) Returns: dict -- Dictionary of EXIF information """ # Normalize the file input to a string path fname = normalize_file_input(fname) assert os.path.exists(fname), warn(f"File does not exist: {fname}") exif = {} try: # Build the command for this file command = f"{exiftool} -j {fname}" cmd_list = command.split() # Run the command exif = loads(subprocess.check_output(cmd_list).decode("utf-8")) except subprocess.CalledProcessError as err: raise error.InvalidSystemCommand(msg=f"File: {fname} \n err: {err}") return exif[0]
[docs] def read_pgm(fname: str | Path | TextIO | BinaryIO, byteorder=">", remove_after=False): # pragma: no cover """Return image data from a raw PGM file as numpy array. Note: Format Spec: http://netpbm.sourceforge.net/doc/pgm.html Source: http://stackoverflow.com/questions/7368739/numpy-and-16-bit-pgm Note: This is correctly processed as a Big endian even though the CR2 itself marks it as a Little endian. See the notes in Source page above as well as the comment about significant bit in the Format Spec Args: fname: Filename of PGM to be converted. Can be a string path, pathlib.Path object, or open filehandle. byteorder(str): Big endian remove_after(bool): Delete fname file after reading, defaults to False. overwrite(bool): overwrite existing PGM or not, defaults to True Returns: numpy.array: The raw data from the PGMx """ # Normalize the file input to a string path fname = normalize_file_input(fname) with open(fname, "rb") as f: buffer = f.read() # We know our header info is 19 chars long header_offset = 19 img_type, img_size, img_max_value, _ = buffer[0:header_offset].decode().split("\n") assert img_type == "P5", warn("Not a PGM file") # Get the width and height (as strings) width, height = img_size.split(" ") data = np.flipud( np.frombuffer( buffer[header_offset:], dtype=byteorder + "u2", ).reshape((int(height), int(width))) ) if remove_after: os.remove(fname) return data
[docs] def cr2_to_jpg( cr2_fname: str | Path | TextIO | BinaryIO, jpg_fname: str | Path | TextIO | BinaryIO = None, title: str = "", overwrite: bool = False, remove_cr2: bool = False, ) -> Path | None: """Extract a JPG image from a CR2, return the new path name.""" exiftool = shutil.which("exiftool") if not exiftool: # pragma: no cover raise error.InvalidSystemCommand("exiftool not found") # Handle different input types for cr2_fname if isinstance(cr2_fname, (str, Path)) or hasattr(cr2_fname, "name"): cr2_path = Path(normalize_file_input(cr2_fname)) else: raise ValueError("cr2_fname must be a string path, Path object, or file-like object with name") # Handle different input types for jpg_fname if jpg_fname is None: jpg_fname = cr2_path.with_suffix(".jpg") else: jpg_fname = Path(normalize_file_input(jpg_fname)) if jpg_fname.exists() and overwrite is False: raise error.AlreadyExists(f"{jpg_fname} already exists and overwrite is False") cmd = [exiftool, "-b", "-PreviewImage", cr2_path.as_posix()] comp_proc = subprocess.run(cmd, check=True, stdout=jpg_fname.open("wb")) if comp_proc.returncode != 0: # pragma: no cover raise error.InvalidSystemCommand(f"{comp_proc.returncode}") if title and title > "": try: im = Image.open(jpg_fname) id = ImageDraw.Draw(im) im.info["title"] = title try: fnt = ImageFont.truetype("FreeMono.ttf", 120) except Exception: # pragma: no cover fnt = ImageFont.load_default() bottom_padding = 25 position = (im.size[0] / 2, im.size[1] - bottom_padding) id.text(position, title, font=fnt, fill=(255, 0, 0), anchor="ms") logger.debug(f"Adding title={title} to {jpg_fname.as_posix()}") im.save(jpg_fname) except Exception: raise error.InvalidSystemCommand(f"Error adding title to {jpg_fname.as_posix()}") if remove_cr2: logger.debug(f"Removing {cr2_path}") cr2_path.unlink() return jpg_fname