import numpy as np
import xarray as xr
from mwr_raw2l1.errors import DimensionError, MissingInputArgument, TimeMismatch
from mwr_raw2l1.log import logger
from mwr_raw2l1.measurement.scan_transform import scan_to_timeseries_from_aux
[docs]def attex_to_datasets(data_all, dims, vars, vars_opt):
"""generate unique :class:`xarray.Dataset` for each type of obs in 'data' using dimensions and variables specified
Args:
data_all: single instance of the read-in class (with observations in instance variable 'data') or as a list
containing a series of instances of read-in classes.
dims: list of keys that are a dimension (must correspond to the order of dimensions in data)
vars: list of keys that are data variables (dimensions don't need to be specified again)
vars_opt: list of keys that are optional data variables
Returns:
a :class:`xarray.Dataset` containing the data
"""
if not isinstance(data_all, list): # accept also single instances of read-in class not inside a list
data_all = [data_all]
# add dim in second pos as Attex are single-channel instruments but frequency shall be present as 2nd
for dat in data_all:
if dat.data['Tb'].ndim == 2:
dat.data['Tb'] = np.expand_dims(dat.data['Tb'], 1)
return to_single_dataset([dat.data for dat in data_all], dims, vars, vars_opt)
[docs]def radiometrics_to_datasets(data_all, dims, vars, vars_opt):
"""generate unique :class:`xarray.Dataset` for each type of obs in 'data' using dimensions and variables specified
Args:
data_all: single instance of the read-in class (with observations in instance variable 'data') or as a list
containing a series of instances of read-in classes.
dims: list of keys that are a dimension (must correspond to the order of dimensions in data)
vars: list of keys that are data variables (dimensions don't need to be specified again)
vars_opt: list of keys that are optional data variables
Returns:
dictionary with one :class:`xarray.Dataset` for each key. It contains one item for each key in data
"""
if not isinstance(data_all, list): # accept also single instances of read-in class not inside a list
data_all = [data_all]
out = {}
sources = data_all[0].data.keys()
for src in sources:
out[src] = to_single_dataset([dat.data[src] for dat in data_all], dims[src], vars[src], vars_opt[src])
return out
[docs]def rpg_to_datasets(data, dims, vars, vars_opt):
"""generate unique :class:`xarray.Dataset` for each type of obs in 'data' using dimensions and variables specified
Args:
data: dictionary containing the observations by type. Its keys correspond to the type of observations (e.g. brt,
blb, irt ...). The observations themselves can be given as a single instance of the read-in class
(with observations in variable 'data') or as a list containing a series of instances of read-in classes.
dims: list of keys that are a dimension (must correspond to the order of dimensions in data)
vars: list of keys that are data variables (dimensions don't need to be specified again)
vars_opt: list of keys that are optional data variables (added as 1-dim series of NaN if missing in 'data')
Returns:
dictionary with one :class:`xarray.Dataset` for each key. It contains one item for each key in data
"""
multidim_vars_per_obstype = {'irt': {'IRT': 2}, 'brt': {'Tb': 2}, 'blb': {'Tb': 3}}
out = {}
for src, data_series in data.items():
if src in multidim_vars_per_obstype:
multidim_vars = multidim_vars_per_obstype[src]
else:
multidim_vars = {}
if not data_series: # fill in NaN variables if meas source does not exist (loop over empty data_series skipped)
if src in ('brt', 'blb'): # don't create empty datasets for missing MWR data
continue
logger.info('No {}-data available. Will generate a dataset fill values only for {}'.format(src, src))
min_time = min([x.data['time'][0] for x in data['hkd']]) # class instances in data['hkd'] can be unordered
max_time = max([x.data['time'][-1] for x in data['hkd']]) # class instances in data['hkd'] can be unordered
out[src] = make_dataset(None, dims[src], vars[src], vars_opt[src],
multidim_vars=multidim_vars, time_vector=[min_time, max_time])
continue
elif not isinstance(data_series, list): # accept also single instances of read-in class not inside a list
data_series = [data_series]
# if HKD has no statusflag (has_statusflag=0) 'channels_rec' variable will not be set, hence dim must not be set
if src == 'hkd' and 'channels_rec' not in data_series[0].data and 'channels_rec' in dims[src]:
dims[src].remove('channels_rec')
out[src] = to_single_dataset([dat.data for dat in data_series], dims[src], vars[src], vars_opt[src],
multidim_vars=multidim_vars)
return out
[docs]def rpg_to_si(all_data):
"""transform non-SI units to SI units for RPG datasets (e.g. km/h to m/s for windspeed
Args:
all_data: dictionary with a :class:`xarray.Dataset` attached to each key (output of :func:`rpg_to_datasets`)
Returns:
all_data with but with values corresponding to SI units
"""
try:
all_data['met']['windspeed'] = all_data['met']['windspeed'] / 3.6 # km/h -> m/s
except KeyError: # KeyError will only occur if quantity not in data, what can well happen. Do nothing in this case
pass
return all_data
[docs]def make_dataset(data, dims, vars, vars_opt=None, multidim_vars=None, time_vector=None):
"""generate a :class:`xarray.Dataset` from 'data' dictionary using the dimensions and variables specified
Args:
data: dictionary containing the data. If set to None or empty a placeholder dataset with all-NaN time series
(except variable IRT, which is 2d) is returned. If set to None or empty time_vector must be specified.
dims: list of keys that are a dimension (must correspond to the order of dimensions in data)
vars: list of keys that are data variables (dimensions don't need to be specified again)
vars_opt (optional): list of keys that are optional data variables (added as 1-d series of NaN if not in 'data')
multidim_vars (optional): dictionary of variables with more than time dimension. Variable name as key, number of
dimensions as values. This argument will be ignored as long as the variable is present in dataset
time_vector (optional): :class:`numpy.ndarray` of :class:`numpy.datetime64` to take as time dimension for
generating all-NaN datasets. This argument will be ignored as long as data is not None or empty
Returns:
:class:`xarray.Dataset`
"""
# config for empty datasets or variables
missing_val = np.nan
if multidim_vars is None:
multidim_vars = {}
# init
if vars_opt is None:
vars_opt = []
all_vars = vars + vars_opt
# prepare for empty variables
ndims_per_var = {var: 1 for var in dims + all_vars}
for var, nd in multidim_vars.items(): # can grow larger than keys that shall be in output, only accessed by key
ndims_per_var[var] = nd
# prepare all NaN-variables for case of data==None or empty
if data is None or not data:
if time_vector is None:
raise MissingInputArgument('if data is empty or None the input argument time_vector must be specified')
data = {'time': time_vector} # start overwriting empty data variable
for dim in dims[1:]: # assume first dimension to be 'time'
data[dim] = np.array([missing_val]) # other dimensions all one-element
for var in all_vars:
shape_act = [len(data[dims[k]]) for k in range(ndims_per_var[var])]
data[var] = np.full(shape_act, missing_val)
# add optional variables as NaN-series to data if not in input data
for varo in vars_opt:
if varo not in data:
shape_act = [len(data[dims[k]]) for k in range(ndims_per_var[varo])]
data[varo] = np.full(shape_act, missing_val)
logger.info('Optional variable {} not found in input data. Will create a all-NaN placeholder'.format(varo))
# collect specifications and data for generating xarray Dataset from dict
spec = {}
for dim in dims:
spec[dim] = dict(dims=dim, data=data[dim])
# add vars to spec
for var in all_vars:
nd = np.ndim(data[var])
if nd > len(dims):
raise DimensionError(dims, var, nd)
spec[var] = dict(dims=dims[0:nd], data=data[var])
return xr.Dataset.from_dict(spec)
[docs]def to_single_dataset(data_dicts, *args, **kwargs):
"""return a single :class:`xarray.Dataset` with unique time vector from a list of data dictionaries
Args:
data_dicts: list of data dictionaries to be concatenated to a time series
*args: dimension and variable specifications passed on to :func:`make_dataset`
**kwargs: dimension and variable specifications passed on to :func:`make_dataset`
"""
datasets = []
for dat in data_dicts:
datasets.append(make_dataset(dat, *args, **kwargs))
out = xr.concat(datasets, dim='time') # merge all datasets of the same type
out = drop_duplicates(out, dim='time') # remove duplicate measurements
return out
[docs]def merge_aux_data(mwr_data, all_data, srcs_to_ignore=None):
"""merge auxiliary data to time grid of microwave data
Args:
mwr_data: :class:`xarray.Dataset` of microwave radiometer data
all_data: Dictionary of data from different sources (keys) as :class:`xarray.Dataset` (values). Can also contain
the data in 'mwr_data' in which case it must be made sure the key is specified in 'srcs_to_ignore'
srcs_to_ignore (optional): list of sources (keys) to ignore from 'all_data' e.g. because they are already
contained in 'mwr_data'. Defaults to ['mwr', 'brt', 'blb']
Returns:
merged dataset of type :class:`xarray.Dataset`
"""
if srcs_to_ignore is None:
srcs_to_ignore = ['mwr', 'brt', 'blb']
out = mwr_data
for src in all_data:
if src in srcs_to_ignore:
continue
# to make sure no variable is overwritten rename duplicates by suffixing it with its source
for var in all_data[src]:
if var in out:
varname_map = {var: var + '_' + src}
all_data[src] = all_data[src].rename(varname_map)
# interp to same time grid (time grid from blb now stems from some interp) and merge into out
srcdat_interp = all_data[src].interp(time=out['time'], method='nearest') # nearest: flags stay integer
out = out.merge(srcdat_interp, join='left')
return out
[docs]def drop_duplicates(ds, dim):
"""drop duplicates from all data in ds for duplicates in dimension vector
Args:
ds: :class:`xarray.Dataset` or :class:`xarray.DataArray` containing the data
dim: string indicating the dimension name to check for duplicates
Returns:
ds with unique dimension vector
"""
_, ind = np.unique(ds[dim], return_index=True) # keep first index but assume duplicate values identical anyway
return ds.isel({dim: ind})
[docs]def merge_brt_blb(all_data):
"""merge brt (zenith MWR) and blb (scanning MWR) observations from an RPG instrument
Args:
all_data: dictionary with a :class:`xarray.Dataset` attached to each key (output of :func:`rpg_to_datasets`)
"""
if 'brt' in all_data:
out = all_data['brt']
if 'blb' in all_data:
if 'brt' in all_data:
blb_ts = scan_to_timeseries_from_aux(all_data['blb'], hkd=all_data['hkd'], brt=all_data['brt'])
try:
out = out.merge(blb_ts, join='outer')
except xr.MergeError: # on rare occasions end times of blb and brt are same (not due to scan tranform)
# prepare logger info and override merge problem
# logger.warning('Skipping xarray merge error: {}'.format(e)) # would need to except as e above
duplicate_times = []
for t in blb_ts.time.values:
if t in out.time:
duplicate_times.append(t)
logger.warning(
'Skipping {} of {} scanning observations due to identical timestamp with zenith obs for {}'.format(
len(duplicate_times), len(blb_ts.time), duplicate_times))
out = out.merge(blb_ts, join='outer', compat='override')
else:
out = scan_to_timeseries_from_aux(all_data['blb'], hkd=all_data['hkd'])
return out
[docs]def check_temporal_consistency(all_data, tolerance=15):
"""check that data of all file types fall into the time covered by HKD for RPG observations
Args:
all_data: dictionary with a :class:`xarray.Dataset` attached to each key (output of :func:`rpg_to_datasets`)
tolerance: time interval in seconds up to which other data sources are allowed to extend beyond HKD times
"""
min_limit = all_data['hkd'].time.min() - np.timedelta64(tolerance, 's')
max_limit = all_data['hkd'].time.max() + np.timedelta64(tolerance, 's')
for src, dat in all_data.items():
if dat.time.min() < min_limit or dat.time.max() > max_limit:
raise TimeMismatch("time period of '{}' data extends beyond time period covered by 'hkd' which is an error"
.format(src))