Source code for mwr_raw2l1.write_netcdf

import datetime as dt
from copy import deepcopy

import numpy as np
from pkg_resources import get_distribution

import mwr_raw2l1
from mwr_raw2l1.errors import MissingConfig, OutputDimensionError
from mwr_raw2l1.log import logger
from mwr_raw2l1.utils.config_utils import get_inst_config, get_nc_format_config

# value for _FillValue attribute of variables encoding field to have unset _FillValue in NetCDF
ENC_NO_FILLVALUE = None  # tutorials from 2017 said False must be used, but with xarray 0.20.1 only None works


[docs]class Writer(object): """Class for writing data (Dataset) to NetCDF according to the format definition in conf_file Args: data_in: :class:`xarray.Dataset` or :class:`DataArray` containing data to write to file. Some support is also provided if data_in is a dictionary, but this option is deprecated. filename: name and path of output NetCDF file conf_nc: configuration dict of yaml file defining the format and contents of the output NetCDF file conf_inst: configuration dict of yaml file with instrument specifications (contains global attrs for NetCDF) nc_format: NetCDF format type of the output file. Default is NETCDF4 copy_data (bool): In case of False, the dataset might experience in-place modifications which is suitable when the dataset is not used in its original form after calling the write function, for True a copy is modified. Defaults to False. """ def __init__(self, data_in, filename, conf_nc, conf_inst, nc_format='NETCDF4', copy_data=False): self.filename = filename self.nc_format = nc_format if copy_data: self.data = deepcopy(data_in) else: self.data = data_in # read in config file if config was not provided as dict self.conf_nc = conf_nc if not isinstance(self.conf_nc, dict): self.conf_nc = get_nc_format_config(self.conf_nc) self.conf_inst = conf_inst if not isinstance(self.conf_inst, dict): self.conf_inst = get_inst_config(self.conf_inst) self.config_dims = self.conf_nc['dimensions']['unlimited'] + self.conf_nc['dimensions']['fixed']
[docs] def run(self): """write Dataset to NetCDF according to the format definition in conf_file by using the :class:`xarray` module """ logger.info('Starting to write to ' + self.filename) self.prepare_datavars() self.global_attrs_from_conf(self.conf_nc, attr_key='attributes') self.global_attrs_from_conf(self.conf_inst, attr_key='nc_attributes') self.add_title_attr() # compose title self.add_history_attr() self.data.to_netcdf(self.filename, format=self.nc_format) # write to output NetCDF file logger.info('Data written to ' + self.filename)
[docs] def prepare_datavars(self): """prepare data variables :class:`xarray.Dataset` for writing to file standard specified in 'conf_nc'""" self.data.encoding.update( # acts during to_netcdf() unlimited_dims=self.conf_nc['dimensions']['unlimited']) # default is fixed, i.e. only need to set unlimited for var, specs in self.conf_nc['variables'].items(): # check availability and fill absent variables if var not in self.data.keys(): if specs['optional']: shape_var = tuple(map(lambda x: len(self.data[x]), specs['dim'])) self.data[var] = (specs['dim'], np.full(shape_var, np.nan)) else: raise KeyError('Variable {} is a mandatory input but was not found in input dictionary'.format(var)) # dimensions, fill value and encoding self.check_dims(var, specs) self.set_fillvalue(var, specs) self.data[var].encoding.update(dtype=specs['type']) # set attributes and make sure that encoding for flag_values and flag_masks corresponds to data type self.data[var].attrs.update(specs['attributes']) for att in ['flag_values', 'flag_masks']: if att in self.data[var].attrs: self.data[var].attrs[att] = np.array(self.data[var].attrs[att], dtype=specs['type']) self.prepare_time() self.append_qc_thresholds() self.remove_vars() self.rename_vars() # must be last step
[docs] def global_attrs_from_conf(self, conf, attr_key): """add global attributes from configuration dictionary Args: conf: configuration dictionary with global attributes under the key given by attr_key attr_key: string specifying the key under which attributes are stored in conf dict. Usually 'attributes' or 'nc_attributes' """ for attname, attval in conf[attr_key].items(): self.data.attrs[attname] = attval
[docs] def add_title_attr(self): """add global attribute 'title' recombining info from other previously set global attributes""" if 'title' in self.data.attrs: return # do not overwrite a title deliberately set by config # specify global attributes in order they will be used to set the title att_seq = ['instrument_model', 'instrument_generation', 'site_location', 'institution'] for att in att_seq: if att not in self.data.attrs: raise MissingConfig("cannot set global attribute 'title' as attribute '{}' was not found in config" .format(att)) self.data.attrs['title'] = '{} {} MWR at {} ({})'.format(*[self.data.attrs[att] for att in att_seq])
[docs] def add_history_attr(self): """add global attribute 'history' with date and version of mwr_raw2l1 code run""" current_time_str = dt.datetime.now(tz=dt.timezone(dt.timedelta(0))).strftime('%Y%m%d') # ensure UTC proj_dir = mwr_raw2l1.__file__.split('/')[-2] try: proj_dist = get_distribution(proj_dir) hist_str = '{}: {} ({})'.format(current_time_str, proj_dist.project_name, proj_dist.version) except Exception as err: # noqa E722 # Don't want code to fail for just writing history hist_str = '{}: mwr_raw2l1'.format(current_time_str) logger.warning('Received error {} while trying to set history global attribute. Therefore, will be using ' 'hardcoded project name without version number'.format(err)) self.data.attrs['history'] = hist_str
[docs] def check_dims(self, var, specs): """check dims of var (retain order of config specs, but order of dims returned by xarray Dataset is arbitrary) Args: var (str): the name of the variable of whom the dimension shall be checked specs: specifications for this variable from config. Must contain the key 'dim' with a list of dimensions. """ if sorted(list(self.data[var].dims)) != sorted(specs['dim']): # if last dim of specs is missing in data and scalar, add it to data (no need for subsequent check) if sorted(list(self.data[var].dims)) == sorted(specs['dim'][:-1]) \ and specs['dim'][-1] in self.data and len(self.data[specs['dim'][-1]]) == 1: newdim = specs['dim'][-1] tmp = self.data[var].expand_dims({newdim: 1}, axis=-1) self.data[var] = tmp.assign_coords({newdim: self.data[newdim]}) else: err_msg = "dimensions in data['{}'] (['{}']) do not match specs for output file (['{}'])".format( var, "', '".join(list(self.data[var].dims)), "', '".join(specs['dim'])) raise OutputDimensionError(err_msg)
[docs] def set_fillvalue(self, var, specs): """set the fill value of var by taking care not to remove any fill value for dimensions for CF compliance Args: var (str): the name of the variable of whom the dimension shall be checked specs: specifications for this variable from config. Must contain the key 'dim' with a list of dimensions. """ if var in self.config_dims or specs['_FillValue'] is None: # using None in fillna (else clause) destroys dtype self.data[var].encoding.update(_FillValue=ENC_NO_FILLVALUE) else: self.data[var] = self.data[var].fillna(specs['_FillValue']) # don't use with _FillValue=None, dtype problem self.data[var].encoding.update(_FillValue=specs['_FillValue'])
[docs] def prepare_time(self): """workaround for correctly setting units and calendar of time variable (use encoding instead of attrs)""" time_vars = ['time'] # 'time' variable assumed to be always present time_vars.extend([s for s in self.data.keys() if 'time_' in s or '_time' in s]) # get also *time_* and *_time* time_vars.extend([var for var in self.data.keys() if 'calendar' in self.data[var].attrs]) # all with calendar for var in time_vars: encs = {} for att in ['units', 'calendar']: if att in self.data[var].attrs: encs[att] = self.data[var].attrs.pop(att) self.data[var].encoding.update(encs)
[docs] def append_qc_thresholds(self): """append quality control thresholds to comment attribute of quality_flag if not refused by 'conf_nc'""" var = 'quality_flag' # cases that need no action by this method if var not in self.conf_nc['variables']: return if ('append_thresholds' in self.conf_nc['variables'][var] and not self.conf_nc['variables'][var]['append_thresholds']): return # append thresholds to comment (or set new comment if absent) if 'comment' in self.data[var].attrs: new_comment = ' '.join([self.data[var].attrs['comment'], str(self.data.qc_thresholds.values)]) else: new_comment = self.data.qc_thresholds.values self.data[var].attrs.update({'comment': new_comment})
[docs] def rename_vars(self): """set variable and dimension names to the ones set in conf_nc (CARE: must be last operation before save!)""" varname_map = {var: specs['name'] for var, specs in self.conf_nc['variables'].items()} self.data = self.data.rename(varname_map) # take care of encoding set for unlimited dims renamed_unlim_dim = [varname_map[dim] for dim in self.data.encoding['unlimited_dims']] self.data.encoding['unlimited_dims'] = renamed_unlim_dim
[docs] def remove_vars(self): """remove undesired variables and dimensions from data (all that are not in the conf_nc)""" vars_to_drop = [] for var in self.data.variables: if var not in self.conf_nc['variables']: vars_to_drop.append(var) self.data = self.data.drop_vars(vars_to_drop) dims_to_drop = [] for var in self.data.dims: if var not in self.config_dims: dims_to_drop.append(var) self.data = self.data.drop_dims(dims_to_drop)