import glob
import itertools
import re
import os
import tempfile
import astropy.table as atpy
import numpy as np
from .utils import (get_data_path, get_data_path_for_grid, tail_head,
_get_cubic_coeffs, _interpolator_4d, _interpolator_5d)
POINTS_NPY = 'bolom_points.npy'
FILT_NPY = 'filt_%s.npy'
def _interpolator_4cubic(grid, ws, idxs):
"""
Perform 4D cubic interpolation for bolometric corrections.
"""
return _interpolator_4d(grid, ws[0], idxs[0], ws[1], idxs[1], ws[2],
idxs[2], ws[3], idxs[3])
def _interpolator_5cubic(grid, ws, idxs):
"""
Perform 5D cubic interpolation for bolometric corrections.
"""
return _interpolator_5d(grid, ws[0], idxs[0], ws[1], idxs[1], ws[2],
idxs[2], ws[3], idxs[3], ws[4], idxs[4])
def _header_preview_file(fin, nout=10):
"""
Create a temporary file starting from the detected header line.
Supports both v1.2 and v2.5 BC table formats.
"""
fp = open(fin, 'r')
lines = []
for i, ll in enumerate(fp):
lines.append(ll)
if i > 200:
break
fp.close()
header_idx = None
for i, ll in enumerate(lines):
lls = ll.strip()
if not lls.startswith('#'):
continue
if ('Teff' in lls) or ('lgTef' in lls) or ('logT' in lls):
header_idx = i
break
if header_idx is None:
return tail_head(fin, 5, nout)
fpout = tempfile.NamedTemporaryFile(delete=False, mode='w')
for ll in lines[header_idx:header_idx + nout + 1]:
print(ll, file=fpout, end='')
fpout.close()
return fpout.name
[docs]
def read_bolom(filt, iprefix):
"""
Read the bolometric corrections files for
a given filter system.
Parameters:
-----------
filt: string
Filter system/group like UBVRIplus or WISE
iprefix: string
Location of the bc correction files
"""
fs = sorted(glob.glob('%s/*%s' % (iprefix, filt)))
if len(fs) == 0:
raise RuntimeError(
'Filter system %s bolometric correction not found in %s' %
(filt, iprefix))
tmpfile = _header_preview_file(fs[0], 10)
tab0 = atpy.Table().read(tmpfile, format='ascii.fast_commented_header')
os.unlink(tmpfile)
tabs = []
colnames = list(tab0.columns)
for f in fs:
curt = atpy.Table().read(f,
format='ascii.basic',
names=colnames,
comment='#')
tabs.append(curt)
tabs = atpy.vstack(tabs)
return tabs
[docs]
class BCInterpolator:
def __init__(self, prefix, filts):
"""
Initialize bolometric-correction interpolation grids.
Parameters
----------
prefix: str
Directory containing `bolom_points.npy` and `filt_*.npy` files.
filts: iterable of str
Filter names to load from `filt_<name>.npy`.
"""
filts = set(filts)
vec = np.load(prefix + '/' + POINTS_NPY)
ndim = vec.shape[0]
self.ndim = ndim
uids = [np.unique(vec[i, :], return_inverse=True) for i in range(ndim)]
self.uvecs = [uids[_][0] for _ in range(ndim)]
self.uids = [uids[_][1] for _ in range(ndim)]
size = [len(self.uvecs[_]) for _ in range(ndim)]
self.filts = filts
self.dats = {}
self.box_list = []
for a in itertools.product(*[[0, 1]] * self.ndim):
self.box_list.append((a))
self.box_list = np.array(self.box_list)
for f in filts:
curd = np.zeros(size) - np.nan
curd[tuple(self.uids)] = np.load(prefix + '/' + FILT_NPY % (f, ))
self.dats[f] = curd
def __call__(self, p):
"""
Return bolometric corrections given the stellar parameters
The input is an array shaped Nx4
where the 4 dimensions corresponds to
logteff, logg ,feh, A_V
and N for the number of stars
"""
res = {}
bad = np.zeros(p.shape[0], dtype=bool)
ws = []
idxs = []
for i in range(self.ndim):
p_dim = p[:, i]
if i == 2:
p_dim = np.maximum(p_dim, self.uvecs[i][0])
pos = np.searchsorted(self.uvecs[i], p_dim, 'right') - 1
bad = bad | (pos < 0) | (pos >= (len(self.uvecs[i]) - 1))
pos_clipped = np.clip(pos, 0, len(self.uvecs[i]) - 2)
w, idx = _get_cubic_coeffs(p_dim, self.uvecs[i], pos_clipped)
ws.append(w)
idxs.append(idx)
for f in self.filts:
if self.ndim == 4:
curres = _interpolator_4cubic(self.dats[f], ws, idxs)
elif self.ndim == 5:
curres = _interpolator_5cubic(self.dats[f], ws, idxs)
else:
raise RuntimeError(f'Unsupported BC dimensionality: {self.ndim}')
res[f] = curres
res[f][bad] = np.nan
return res
[docs]
def list_filters(path=None, mist_version='1.2', vvcrit=0.4):
"""
Return filter names available in prepared bolometric-correction data.
Parameters
----------
path: str or None
Directory to scan. If None, resolve from `mist_version` and `vvcrit`.
mist_version: str
MIST version used when resolving the default path.
vvcrit: float
Rotation value used when resolving the default path.
"""
if path is None:
path = get_data_path_for_grid(mist_version=mist_version,
vvcrit=vvcrit,
create=False)
if not os.path.isdir(path):
path = get_data_path()
fs = glob.glob(os.path.join(path, FILT_NPY % '*'))
filts = []
for f in fs:
filts.append(
re.match(FILT_NPY % '(.*)',
f.split(os.path.sep)[-1]).group(1))
return filts
[docs]
def prepare(iprefix,
oprefix,
filters=('SDSSugriz', 'SkyMapper', 'UBVRIplus', 'DECam', 'WISE',
'GALEX')):
"""
Read bolometric-correction tables and save compact `.npy` grids.
Parameters
----------
iprefix: str
Input directory containing raw BC table files.
oprefix: str
Output directory where `bolom_points.npy` and `filt_*.npy` are saved.
filters: iterable of str
Filter-system groups to read from the input directory.
"""
cols_ex = ['Teff', 'logg', '[Fe/H]', 'Av', 'Rv', 'lgTef', 'Fe_H', 'a_Fe']
last_vec = None
for i, filt in enumerate(filters):
tabs = read_bolom(filt, iprefix)
if 'Teff' in tabs.colnames:
vec = np.array(
[np.log10(tabs['Teff']), tabs['logg'], tabs['[Fe/H]'],
tabs['Av']])
elif 'lgTef' in tabs.colnames:
vec = np.array(
[tabs['lgTef'], tabs['logg'], tabs['Fe_H'], tabs['a_Fe'],
tabs['Av']])
else:
raise RuntimeError('Unrecognized BC table columns')
if last_vec is not None and (last_vec != vec).sum() > 0:
raise Exception("shouldn't happen")
last_vec = vec.copy()
if i == 0:
np.save(os.path.join(oprefix, POINTS_NPY), vec)
for k in tabs.columns:
if k not in cols_ex:
np.save(os.path.join(oprefix, FILT_NPY % k), tabs[k])