Source code for AegeanTools.BANE

#! /usr/bin/env python

"""
This module contains all of the BANE specific code
The function filter_image should be imported from elsewhere and run as is.
"""
from __future__ import division
# standard imports
from astropy.io import fits
import copy
import logging
import multiprocessing
import multiprocessing.sharedctypes
import numpy as np
import os
from scipy.interpolate import RegularGridInterpolator
import sys
from tempfile import NamedTemporaryFile
from time import gmtime, strftime

# Aegean tools
from .fits_interp import compress

__author__ = 'Paul Hancock'
__version__ = 'v1.8.0'
__date__ = '2018-09-15'

# global variables for multiprocessing
ibkg = irms = None
bkg_events = []
mask_events = []


[docs]def barrier(events, sid, kind='neighbour'): """ act as a multiprocessing barrier """ events[sid].set() # only wait for the neighbours if kind=='neighbour': if sid > 0: logging.debug("{0} is waiting for {1}".format(sid, sid - 1)) events[sid - 1].wait() if sid < len(bkg_events) - 1: logging.debug("{0} is waiting for {1}".format(sid, sid + 1)) events[sid + 1].wait() # wait for all else: [e.wait() for e in events] return
[docs]def sigmaclip(arr, lo, hi, reps=3): """ Perform sigma clipping on an array, ignoring non finite values. During each iteration return an array whose elements c obey: mean -std*lo < c < mean + std*hi where mean/std are the mean std of the input array. Parameters ---------- arr : iterable An iterable array of numeric types. lo : float The negative clipping level. hi : float The positive clipping level. reps : int The number of iterations to perform. Default = 3. Returns ------- mean : float The mean of the array, possibly nan std : float The std of the array, possibly nan Notes ----- Scipy v0.16 now contains a comparable method that will ignore nan/inf values. """ clipped = np.array(arr)[np.isfinite(arr)] if len(clipped) < 1: return np.nan, np.nan std = np.std(clipped) mean = np.mean(clipped) for _ in range(int(reps)): clipped = clipped[np.where(clipped > mean-std*lo)] clipped = clipped[np.where(clipped < mean+std*hi)] pstd = std if len(clipped) < 1: break std = np.std(clipped) mean = np.mean(clipped) if 2*abs(pstd-std)/(pstd+std) < 0.2: break return mean, std
def _sf2(args): """ A shallow wrapper for sigma_filter. Parameters ---------- args : list A list of arguments for sigma_filter Returns ------- None """ # an easier to debug traceback when multiprocessing # thanks to https://stackoverflow.com/a/16618842/1710603 try: return sigma_filter(*args) except: import traceback raise Exception("".join(traceback.format_exception(*sys.exc_info())))
[docs]def sigma_filter(filename, region, step_size, box_size, shape, domask, sid): """ Calculate the background and rms for a sub region of an image. The results are written to shared memory - irms and ibkg. Parameters ---------- filename : string Fits file to open region : list Region within the fits file that is to be processed. (row_min, row_max). step_size : (int, int) The filtering step size box_size : (int, int) The size of the box over which the filter is applied (each step). shape : tuple The shape of the fits image domask : bool If true then copy the data mask to the output. sid : int The stripe number Returns ------- None """ ymin, ymax = region logging.debug('rows {0}-{1} starting at {2}'.format(ymin, ymax, strftime("%Y-%m-%d %H:%M:%S", gmtime()))) # cut out the region of interest plus 1/2 the box size, but clip to the image size data_row_min = max(0, ymin - box_size[0]//2) data_row_max = min(shape[0], ymax + box_size[0]//2) # Figure out how many axes are in the datafile NAXIS = fits.getheader(filename)["NAXIS"] with fits.open(filename, memmap=True) as a: if NAXIS == 2: data = a[0].section[data_row_min:data_row_max, 0:shape[1]] elif NAXIS == 3: data = a[0].section[0, data_row_min:data_row_max, 0:shape[1]] elif NAXIS == 4: data = a[0].section[0, 0, data_row_min:data_row_max, 0:shape[1]] else: logging.error("Too many NAXIS for me {0}".format(NAXIS)) logging.error("fix your file to be more sane") raise Exception("Too many NAXIS") row_len = shape[1] logging.debug('data size is {0}'.format(data.shape)) def box(r, c): """ calculate the boundaries of the box centered at r,c with size = box_size """ r_min = max(0, r - box_size[0] // 2) r_max = min(data.shape[0] - 1, r + box_size[0] // 2) c_min = max(0, c - box_size[1] // 2) c_max = min(data.shape[1] - 1, c + box_size[1] // 2) return r_min, r_max, c_min, c_max # set up a grid of rows/cols at which we will compute the bkg/rms rows = list(range(ymin-data_row_min, ymax-data_row_min, step_size[0])) rows.append(ymax-data_row_min) cols = list(range(0, shape[1], step_size[1])) cols.append(shape[1]) # store the computed bkg/rms in this smaller array vals = np.zeros(shape=(len(rows),len(cols))) for i, row in enumerate(rows): for j, col in enumerate(cols): r_min, r_max, c_min, c_max = box(row, col) new = data[r_min:r_max, c_min:c_max] new = np.ravel(new) bkg, _ = sigmaclip(new, 3, 3) vals[i,j] = bkg # indices of all the pixels within our region gr, gc = np.mgrid[ymin-data_row_min:ymax-data_row_min, 0:shape[1]] logging.debug("Interpolating bkg to sharemem") ifunc = RegularGridInterpolator((rows, cols), vals) for i in range(gr.shape[0]): row = np.array(ifunc((gr[i], gc[i])), dtype=np.float32) start_idx = np.ravel_multi_index((ymin+i, 0), shape) end_idx = start_idx + row_len ibkg[start_idx:end_idx] = row # np.ctypeslib.as_ctypes(row) del ifunc logging.debug(" ... done writing bkg") # signal that the bkg is done for this region, and wait for neighbours barrier(bkg_events, sid) logging.debug("{0} background subtraction".format(sid)) for i in range(data_row_max - data_row_min): start_idx = np.ravel_multi_index((data_row_min + i, 0), shape) end_idx = start_idx + row_len data[i, :] = data[i, :] - ibkg[start_idx:end_idx] # reset/recycle the vals array vals[:] = 0 for i, row in enumerate(rows): for j, col in enumerate(cols): r_min, r_max, c_min, c_max = box(row, col) new = data[r_min:r_max, c_min:c_max] new = np.ravel(new) _ , rms = sigmaclip(new, 3, 3) vals[i,j] = rms logging.debug("Interpolating rm to sharemem rms") ifunc = RegularGridInterpolator((rows, cols), vals) for i in range(gr.shape[0]): row = np.array(ifunc((gr[i], gc[i])), dtype=np.float32) start_idx = np.ravel_multi_index((ymin+i, 0), shape) end_idx = start_idx + row_len irms[start_idx:end_idx] = row # np.ctypeslib.as_ctypes(row) del ifunc logging.debug(" .. done writing rms") if domask: barrier(mask_events, sid) logging.debug("applying mask") for i in range(gr.shape[0]): mask = np.where(np.bitwise_not(np.isfinite(data[i + ymin-data_row_min,:])))[0] for j in mask: idx = np.ravel_multi_index((i + ymin,j),shape) ibkg[idx] = np.nan irms[idx] = np.nan logging.debug(" ... done applying mask") logging.debug('rows {0}-{1} finished at {2}'.format(ymin, ymax, strftime("%Y-%m-%d %H:%M:%S", gmtime()))) return
[docs]def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None, domask=True): """ Calculate the background and noise images corresponding to the input file. The calculation is done via a box-car approach and uses multiple cores and shared memory. Parameters ---------- filename : str Filename to be filtered. step_size : (int, int) Step size for the filter. box_size : (int, int) Box size for the filter. cores : int Number of cores to use. If None then use all available. nslice : int The image will be divided into this many horizontal stripes for processing. Default = None = equal to cores shape : (int, int) The shape of the image in the given file. domask : bool True(Default) = copy data mask to output. Returns ------- bkg, rms : numpy.ndarray The interpolated background and noise images. """ if cores is None: cores = multiprocessing.cpu_count() if (nslice is None) or (cores==1): nslice = cores img_y, img_x = shape # initialise some shared memory global ibkg # bkg = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32)) # ibkg = multiprocessing.sharedctypes.Array(bkg._type_, bkg, lock=True) ibkg = multiprocessing.Array('f', img_y*img_x) global irms #rms = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32)) #irms = multiprocessing.sharedctypes.Array(rms._type_, rms, lock=True) irms = multiprocessing.Array('f', img_y * img_x) logging.info("using {0} cores".format(cores)) logging.info("using {0} stripes".format(nslice)) if nslice > 1: # box widths should be multiples of the step_size, and not zero width_y = int(max(img_y/nslice/step_size[1], 1) * step_size[1]) # locations of the box edges ymins = list(range(0, img_y, width_y)) ymaxs = list(range(width_y, img_y, width_y)) ymaxs.append(img_y) else: ymins = [0] ymaxs = [img_y] logging.debug("ymins {0}".format(ymins)) logging.debug("ymaxs {0}".format(ymaxs)) # create an event per stripe global bkg_events, mask_events bkg_events = [multiprocessing.Event() for _ in range(len(ymaxs))] mask_events = [multiprocessing.Event() for _ in range(len(ymaxs))] args = [] for i, region in enumerate(zip(ymins, ymaxs)): args.append((filename, region, step_size, box_size, shape, domask, i)) # start a new process for each task, hopefully to reduce residual memory use pool = multiprocessing.Pool(processes=cores, maxtasksperchild=1) try: # chunksize=1 ensures that we only send a single task to each process pool.map_async(_sf2, args, chunksize=1).get(timeout=10000000) except KeyboardInterrupt: logging.error("Caught keyboard interrupt") pool.close() sys.exit(1) pool.close() pool.join() bkg = np.reshape(np.array(ibkg[:], dtype=np.float32), shape) rms = np.reshape(np.array(irms[:], dtype=np.float32), shape) del ibkg, irms return bkg, rms
[docs]def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False, cores=None, mask=True, compressed=False, nslice=None): """ Create a background and noise image from an input image. Resulting images are written to `outbase_bkg.fits` and `outbase_rms.fits` Parameters ---------- im_name : str or HDUList Image to filter. Either a string filename or an astropy.io.fits.HDUList. out_base : str The output filename base. Will be modified to make _bkg and _rms files. step_size : (int,int) Tuple of the x,y step size in pixels box_size : (int,int) The size of the box in piexls twopass : bool Perform a second pass calculation to ensure that the noise is not contaminated by the background. Default = False cores : int Number of CPU corse to use. Default = all available nslice : int The image will be divided into this many horizontal stripes for processing. Default = None = equal to cores mask : bool Mask the output array to contain np.nna wherever the input array is nan or not finite. Default = true compressed : bool Return a compressed version of the background/noise images. Default = False Returns ------- None """ header = fits.getheader(im_name) shape = (header['NAXIS2'],header['NAXIS1']) if step_size is None: if 'BMAJ' in header and 'BMIN' in header: beam_size = np.sqrt(abs(header['BMAJ']*header['BMIN'])) if 'CDELT1' in header: pix_scale = np.sqrt(abs(header['CDELT1']*header['CDELT2'])) elif 'CD1_1' in header: pix_scale = np.sqrt(abs(header['CD1_1']*header['CD2_2'])) if 'CD1_2' in header and 'CD2_1' in header: if header['CD1_2'] != 0 or header['CD2_1']!=0: logging.warning("CD1_2 and/or CD2_1 are non-zero and I don't know what to do with them") logging.warning("Ingoring them") else: logging.warning("Cannot determine pixel scale, assuming 4 pixels per beam") pix_scale = beam_size/4. # default to 4x the synthesized beam width step_size = int(np.ceil(4*beam_size/pix_scale)) else: logging.info("BMAJ and/or BMIN not in fits header.") logging.info("Assuming 4 pix/beam, so we have step_size = 16 pixels") step_size = 16 step_size = (step_size, step_size) if box_size is None: # default to 6x the step size so we have ~ 30beams box_size = (step_size[0]*6, step_size[1]*6) if compressed: if not step_size[0] == step_size[1]: step_size = (min(step_size), min(step_size)) logging.info("Changing grid to be {0} so we can compress the output".format(step_size)) logging.info("using grid_size {0}, box_size {1}".format(step_size,box_size)) logging.info("on data shape {0}".format(shape)) bkg, rms = filter_mc_sharemem(im_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, nslice=nslice, domask=mask) logging.info("done") bkg_out = '_'.join([os.path.expanduser(out_base), 'bkg.fits']) rms_out = '_'.join([os.path.expanduser(out_base), 'rms.fits']) # add a comment to the fits header header['HISTORY'] = 'BANE {0}-({1})'.format(__version__, __date__) # compress if compressed: hdu = fits.PrimaryHDU(bkg) hdu.header = copy.deepcopy(header) hdulist = fits.HDUList([hdu]) compress(hdulist, step_size[0], bkg_out) hdulist[0].header = copy.deepcopy(header) hdulist[0].data = rms compress(hdulist, step_size[0], rms_out) return write_fits(bkg, header, bkg_out) write_fits(rms, header, rms_out)
### # Helper functions ###
[docs]def write_fits(data, header, file_name): """ Combine data and a fits header to write a fits file. Parameters ---------- data : numpy.ndarray The data to be written. header : astropy.io.fits.hduheader The header for the fits file. file_name : string The file to write Returns ------- None """ hdu = fits.PrimaryHDU(data) hdu.header = header hdulist = fits.HDUList([hdu]) hdulist.writeto(file_name, overwrite=True) logging.info("Wrote {0}".format(file_name)) return