# Source code for AegeanTools.cluster

```
#! /usr/bin/env python
"""
Cluster and crossmatch tools and analysis functions.
Includes:
- DBSCAN clustering
"""
from __future__ import print_function
__author__= "Paul Hancock"
import numpy as np
import math
from .angle_tools import gcd, bear
from .catalogs import load_table, table_to_source_list
# join the Aegean logger
import logging
log = logging.getLogger('Aegean')
cc2fwhm = (2 * math.sqrt(2 * math.log(2)))
fwhm2cc = 1/cc2fwhm
[docs]def norm_dist(src1, src2):
"""
Calculate the normalised distance between two sources.
Sources are elliptical Gaussians.
The normalised distance is calculated as the GCD distance between the centers,
divided by quadrature sum of the radius of each ellipse along a line joining the two ellipses.
For ellipses that touch at a single point, the normalized distance will be 1/sqrt(2).
Parameters
----------
src1, src2 : object
The two positions to compare. Objects must have the following parameters: (ra, dec, a, b, pa).
Returns
-------
dist: float
The normalised distance.
"""
if np.all(src1 == src2):
return 0
dist = gcd(src1.ra, src1.dec, src2.ra, src2.dec) # degrees
# the angle between the ellipse centers
phi = bear(src1.ra, src1.dec, src2.ra, src2.dec) # Degrees
# Calculate the radius of each ellipse along a line that joins their centers.
r1 = src1.a*src1.b / np.hypot(src1.a * np.sin(np.radians(phi - src1.pa)),
src1.b * np.cos(np.radians(phi - src1.pa)))
r2 = src2.a*src2.b / np.hypot(src2.a * np.sin(np.radians(180 + phi - src2.pa)),
src2.b * np.cos(np.radians(180 + phi - src2.pa)))
R = dist / (np.hypot(r1, r2) / 3600)
return R
[docs]def sky_dist(src1, src2):
"""
Great circle distance between two sources.
A check is made to determine if the two sources are the same object, in this case
the distance is zero.
Parameters
----------
src1, src2 : object
Two sources to check. Objects must have parameters (ra,dec) in degrees.
Returns
-------
distance : float
The distance between the two sources.
See Also
--------
:func:`AegeanTools.angle_tools.gcd`
"""
if np.all(src1 == src2):
return 0
return gcd(src1.ra, src1.dec, src2.ra, src2.dec) # degrees
[docs]def pairwise_ellpitical_binary(sources, eps, far=None):
"""
Do a pairwise comparison of all sources and determine if they have a normalized distance within
eps.
Form this into a matrix of shape NxN.
Parameters
----------
sources : list
A list of sources (objects with parameters: ra,dec,a,b,pa)
eps : float
Normalised distance constraint.
far : float
If sources have a dec that differs by more than this amount then they are considered to be not matched.
This is a short-cut around performing GCD calculations.
Returns
-------
prob : numpy.ndarray
A 2d array of True/False.
See Also
--------
:func:`AegeanTools.cluster.norm_dist`
"""
if far is None:
far = max(a.a/3600 for a in sources)
l = len(sources)
distances = np.zeros((l, l), dtype=bool)
for i in range(l):
for j in range(i, l):
if i == j:
distances[i, j] = False
continue
src1 = sources[i]
src2 = sources[j]
if src2.dec - src1.dec > far:
break
if abs(src2.ra - src1.ra)*np.cos(np.radians(src1.dec)) > far:
continue
distances[i, j] = norm_dist(src1, src2) > eps
distances[j, i] = distances[i, j]
return distances
[docs]def regroup_vectorized(srccat, eps, far=None, dist=norm_dist):
"""
Regroup the islands of a catalog according to their normalised distance.
Assumes srccat is recarray-like for efficiency.
Return a list of island groups.
Parameters
----------
srccat : np.rec.arry or pd.DataFrame
Should have the following fields[units]:
ra[deg],dec[deg], a[arcsec],b[arcsec],pa[deg], peak_flux[any]
eps : float
maximum normalised distance within which sources are considered to be
grouped
far : float
(degrees) sources that are further than this distance apart will not
be grouped, and will not be tested.
Default = 0.5.
dist : func
a function that calculates the distance between a source and each
element of an array of sources.
Default = :func:`AegeanTools.cluster.norm_dist`
Returns
-------
islands : list of lists
Each island contians integer indices for members from srccat
(in descending dec order).
"""
if far is None:
far = 0.5 # 10*max(a.a/3600 for a in srccat)
# most negative declination first
# XXX: kind='mergesort' ensures stable sorting for determinism.
# Do we need this?
order = np.argsort(srccat.dec, kind='mergesort')[::-1]
# TODO: is it better to store groups as arrays even if appends are more
# costly?
groups = [[order[0]]]
for idx in order[1:]:
rec = srccat[idx]
# TODO: Find out if groups are big enough for this to give us a speed
# gain. If not, get distance to all entries in groups above
# decmin simultaneously.
decmin = rec.dec - far
for group in reversed(groups):
# when an island's largest (last) declination is smaller than
# decmin, we don't need to look at any more islands
if srccat.dec[group[-1]] < decmin:
# new group
groups.append([idx])
rafar = far / np.cos(np.radians(rec.dec))
group_recs = np.take(srccat, group, mode='clip')
group_recs = group_recs[abs(rec.ra - group_recs.ra) <= rafar]
if len(group_recs) and dist(rec, group_recs).min() < eps:
group.append(idx)
break
else:
# new group
groups.append([idx])
# TODO?: a more numpy-like interface would return only an array providing
# the mapping:
# group_idx = np.empty(len(srccat), dtype=int)
# for i, group in enumerate(groups):
# group_idx[group] = i
# return group_idx
return groups
[docs]def regroup(catalog, eps, far=None, dist=norm_dist):
"""
Regroup the islands of a catalog according to their normalised distance.
Return a list of island groups. Sources have their (island,source) parameters relabeled.
Parameters
----------
catalog : str or object
Either a filename to read into a source list, or a list of objects with the following properties[units]:
ra[deg],dec[deg], a[arcsec],b[arcsec],pa[deg], peak_flux[any]
eps : float
maximum normalised distance within which sources are considered to be grouped
far : float
(degrees) sources that are further than this distance appart will not be grouped, and will not be tested.
Default = None.
dist : func
a function that calculates the distance between two sources must accept two SimpleSource objects.
Default = :func:`AegeanTools.cluster.norm_dist`
Returns
-------
islands : list
A list of islands. Each island is a list of sources.
See Also
--------
:func:`AegeanTools.cluster.norm_dist`
"""
if isinstance(catalog, str):
table = load_table(catalog)
srccat = table_to_source_list(table)
else:
try:
srccat = catalog
_ = catalog[0].ra, catalog[0].dec, catalog[0].a, catalog[0].b, catalog[0].pa, catalog[0].peak_flux
except AttributeError as e:
log.error("catalog is not understood.")
log.error("catalog: Should be a list of objects with the following properties[units]:\n" +
"ra[deg],dec[deg], a[arcsec],b[arcsec],pa[deg], peak_flux[any]")
raise e
log.info("Regrouping islands within catalog")
log.debug("Calculating distances")
if far is None:
far = 0.5 # 10*max(a.a/3600 for a in srccat)
srccat_array = np.rec.fromrecords(
[(s.ra, s.dec, s.a, s.b, s.pa, s.peak_flux)
for s in srccat],
names=['ra', 'dec', 'a', 'b', 'pa', 'peak_flux'])
groups = regroup_vectorized(srccat_array, eps=eps, far=far, dist=dist)
groups = [[srccat[idx] for idx in group]
for group in groups]
islands = []
# now that we have the groups, we relabel the sources to have (island,component) in flux order
# note that the order of sources within an island list is not changed - just their labels
for isle, group in enumerate(groups):
for comp, src in enumerate(sorted(group, key=lambda x: -1*x.peak_flux)):
src.island = isle
src.source = comp
islands.append(group)
return islands
```