Source code for AegeanTools.fits_interp

#! /usr/bin/env python

"""
A module to allow fits files to be shrunk in size using decimation, and to be
grown in size using interpolation.
"""
import numpy as np
from astropy.io import fits
from scipy.interpolate import  RegularGridInterpolator
import logging

__author__ = 'Paul Hancock'
__date__ = '2018-08-09'


[docs]def load_file_or_hdu(filename): """ Load a file from disk and return an HDUList If filename is already an HDUList return that instead Parameters ---------- filename : str or HDUList File or HDU to be loaded Returns ------- hdulist : HDUList """ if isinstance(filename, fits.HDUList): hdulist = filename else: hdulist = fits.open(filename, ignore_missing_end=True) return hdulist
[docs]def compress(datafile, factor, outfile=None): """ Compress a file using decimation. Parameters ---------- datafile : str or HDUList Input data to be loaded. (HDUList will be modified if passed). factor : int Decimation factor. outfile : str File to be written. Default = None, which means don't write a file. Returns ------- hdulist : HDUList A decimated HDUList See Also -------- :func:`AegeanTools.fits_interp.expand` """ if not (factor > 0 and isinstance(factor, int)): logging.error("factor must be a positive integer") return None hdulist = load_file_or_hdu(datafile) header = hdulist[0].header data = np.squeeze(hdulist[0].data) cx, cy = data.shape[0], data.shape[1] nx = cx // factor ny = cy // factor # check to see if we will have some residual data points lcx = cx % factor lcy = cy % factor if lcx > 0: nx += 1 if lcy > 0: ny += 1 # decimate the data new_data = np.empty((nx + 1, ny + 1)) new_data[:nx, :ny] = data[::factor, ::factor] # copy the last row/col across new_data[-1, :ny] = data[-1, ::factor] new_data[:nx, -1] = data[::factor, -1] new_data[-1, -1] = data[-1, -1] # TODO: Figure out what to do when CD2_1 and CD1_2 are non-zero if 'CDELT1' in header: header['CDELT1'] *= factor elif 'CD1_1' in header: header['CD1_1'] *= factor else: logging.error("Error: Can't find CDELT1 or CD1_1") return None if 'CDELT2' in header: header['CDELT2'] *= factor elif "CD2_2" in header: header['CD2_2'] *= factor else: logging.error("Error: Can't find CDELT2 or CD2_2") return None # Move the reference pixel so that the WCS is correct header['CRPIX1'] = (header['CRPIX1'] + factor - 1) / factor header['CRPIX2'] = (header['CRPIX2'] + factor - 1) / factor # Update the header so that we can do the correct interpolation later on header['BN_CFAC'] = (factor, "Compression factor (grid size) used by BANE") header['BN_NPX1'] = (header['NAXIS1'], 'original NAXIS1 value') header['BN_NPX2'] = (header['NAXIS2'], 'original NAXIS2 value') header['BN_RPX1'] = (lcx, 'Residual on axis 1') header['BN_RPX2'] = (lcy, 'Residual on axis 2') header['HISTORY'] = "Compressed by a factor of {0}".format(factor) # save the changes hdulist[0].data = np.array(new_data, dtype=np.float32) hdulist[0].header = header if outfile is not None: hdulist.writeto(outfile, overwrite=True) logging.info("Wrote: {0}".format(outfile)) return hdulist
[docs]def expand(datafile, outfile=None): """ Expand and interpolate the given data file using the given method. Datafile can be a filename or an HDUList It is assumed that the file has been compressed and that there are `BN_?` keywords in the fits header that describe how the compression was done. Parameters ---------- datafile : str or HDUList filename or HDUList of file to work on outfile : str filename to write to (default = None) Returns ------- hdulist : HDUList HDUList of the expanded data. See Also -------- :func:`AegeanTools.fits_interp.compress` """ hdulist = load_file_or_hdu(datafile) header = hdulist[0].header data = hdulist[0].data # Check for the required key words, only expand if they exist if not all(a in header for a in ['BN_CFAC', 'BN_NPX1', 'BN_NPX2', 'BN_RPX1', 'BN_RPX2']): return hdulist factor = header['BN_CFAC'] (gx, gy) = np.mgrid[0:header['BN_NPX2'], 0:header['BN_NPX1']] # fix the last column of the grid to account for residuals lcx = header['BN_RPX2'] lcy = header['BN_RPX1'] rows = (np.arange(data.shape[0]) + int(lcx/factor))*factor cols = (np.arange(data.shape[1]) + int(lcy/factor))*factor # Do the interpolation hdulist[0].data = np.array(RegularGridInterpolator((rows,cols), data)((gx, gy)), dtype=np.float32) # update the fits keywords so that the WCS is correct header['CRPIX1'] = (header['CRPIX1'] - 1) * factor + 1 header['CRPIX2'] = (header['CRPIX2'] - 1) * factor + 1 if 'CDELT1' in header: header['CDELT1'] /= factor elif 'CD1_1' in header: header['CD1_1'] /= factor else: logging.error("Error: Can't find CD1_1 or CDELT1") return None if 'CDELT2' in header: header['CDELT2'] /= factor elif "CD2_2" in header: header['CD2_2'] /= factor else: logging.error("Error: Can't find CDELT2 or CD2_2") return None header['HISTORY'] = 'Expanded by factor {0}'.format(factor) # don't need these any more so delete them. del header['BN_CFAC'], header['BN_NPX1'], header['BN_NPX2'], header['BN_RPX1'], header['BN_RPX2'] hdulist[0].header = header if outfile is not None: hdulist.writeto(outfile, overwrite=True) logging.info("Wrote: {0}".format(outfile)) return hdulist