Source code for mwr_raw2l1.measurement.measurement_qc_helpers

import ephem
import numpy as np

from mwr_raw2l1.errors import UnknownManufacturer
from mwr_raw2l1.log import logger
from mwr_raw2l1.measurement.measurement_construct_helpers import drop_duplicates


[docs]def check_receiver_sanity(data, channel): """check the receiver sanity flag(s) in data Args: data: dataset, commonly Measurement.data channel: channel index for which to check the sanity flag (only used for RPG) Returns: tuple containing: mask_fail: None if check_applied=False, otherwise array of size (time,) set True where quality is bad check_applied (bool): True if check has been applied, False if check could not be applied (not enough info) """ if data['mfr'] == 'attex': logger.info('Cannot check receiver sanity for Attex as no status variable is in data file') return None, False elif data['mfr'] == 'radiometrics': # quality good if quality=0 return flag_check(data, 'quality', 1, channel=None) elif data['mfr'] == 'rpg': # quality good if channel_quality_ok=1 and alarm=0 masks_and_checks = [] # collect all output tuples from flag_check here masks_and_checks.append(flag_check(data, 'channel_quality_ok', 0, channel)) masks_and_checks.append(flag_check(data, 'alarm', 1, channel=None)) # TODO: could add checks for noisediode_ok_hum, noisediode_ok_temp, Tstab_ok_hum, Tstab_ok_temp, Tstab_ok_amb check_applied_all = [m[1] for m in masks_and_checks] if any(check_applied_all): mask_fail_all = np.stack([m[0] for m in masks_and_checks if m[1]], axis=1) return mask_fail_all.any(axis=1), True else: return None, False UnknownManufacturer("known manufacturers for this method are ['attex', 'radiometrics', 'rpg'] " "but data['mfr']={}".format(data['mfr']))
[docs]def check_rain(data): """check the receiver's rain flag in data Args: data: dataset, commonly Measurement.data Returns: tuple containing: mask_fail: None if check_applied=False, otherwise array of size (time,) set True where rain is detected check_applied (bool): True if check has been applied, False if check could not be applied (not enough info) """ if data['mfr'] == 'attex': logger.info('Cannot flag rain for Attex as this is not returned by the instruments') return None, False elif data['mfr'] == 'radiometrics': return flag_check(data, 'rainflag', 1) elif data['mfr'] == 'rpg': return flag_check(data, 'rainflag', 1) else: UnknownManufacturer("known manufacturers for this method are ['attex', 'radiometrics', 'rpg'] " "but data['mfr']={}".format(data['mfr']))
[docs]def check_sun(data, delta_ele, delta_azi): """check if sun is in beam within the tolerances 'delta_ele', 'delta_azi' ( abs(data['ele'] - ele_sun) < delta_ele ) Args: data: :class:`xarray.Dataset`, commonly Measurement.data delta_ele: elevation offset in degrees from radiometer pointing until which sun is considered to be in the beam delta_azi: azimuth offset in degrees from radiometer pointing until which sun is considered to be in the beam Returns: tuple containing: mask_fail: None if check_applied=False, otherwise array of size (time,) set True where sun is in beam check_applied: array True if check was applied, False if check could not be applied (no ele/azi in data) """ if 'azi' not in data or 'ele' not in data: logger.warning("Cannot set solar flag as 'azi' or 'ele' is missing in data.") return None, np.full(np.shape(data.time), False) ele_sun, azi_sun = orbit_position_interp(data) # additional keyword arg body='moon' could be used for lunar flag mask_fail = np.logical_and(np.abs(data.ele - ele_sun) < delta_ele, np.abs(data.azi - azi_sun) < delta_azi) check_applied = np.full(np.shape(data.time), True) # handle some missing pointing (indicated as not failing, but not checked) if data.azi.isnull().any() or data.ele.isnull().any(): logger.info("Cannot set solar flag for all times as NaN was found in 'azi' or 'ele'." " Could be due to double-side scan, when 'azi' is not unambiguous") no_pointing = np.logical_or(data.azi.isnull(), data.ele.isnull()) mask_fail[no_pointing] = False check_applied[no_pointing] = False return mask_fail, check_applied
[docs]def orbit_position_interp(data, delta_t=600, **kwargs): """wrapper to :func:`orbit_position` doing orbit calculations only each 'delta_t' seconds and interpolate to time""" time_rough = np.append(np.arange(data['time'][0].values, data['time'][-1].values, np.timedelta64(delta_t, 's')), data['time'][-1].values) # include also last time to span full grid for following interp data_rough = drop_duplicates(data.sel(time=time_rough, method='nearest'), 'time') ele_rough, azi_rough = orbit_position(data_rough, **kwargs) # append to rough dataset and interpolate to time vector of original dataset data_rough['sun_ele'] = (('time',), ele_rough) data_rough['sun_azi'] = (('time',), azi_rough) ele = data_rough.sun_ele.interp(time=data.time).values azi = data_rough.sun_azi.interp(time=data.time).values return ele, azi
[docs]def orbit_position(data, body='sun'): """calculate orbit position of sun or moon for instrument position at each time in 'data' using :class:`ephem` Args: data: :class:`xarray.Dataset`, commonly Measurement.data body (optional): name of astronomical body to calculate orbit from ('sun' or 'moon'). Defaults to 'sun' Returns: tuple containing: ele: :class:`numpy.ndarray` of elevations of the body for each time step azi: :class:`numpy.ndarray` of azimuths of the body for each time step """ obs = ephem.Observer() if body == 'sun': obj = ephem.Sun() elif body == 'moon': obj = ephem.Moon() else: raise NotImplementedError("function only implemented for 'body' in ['sun', 'moon']") ele = np.full(data['time'].shape, np.nan) azi = np.full(data['time'].shape, np.nan) for ind, time in enumerate(data['time']): # observer settings obs.lat = str(data['lat'][ind].values) # needs to be string to be interpreted as degrees obs.lon = str(data['lon'][ind].values) # needs to be string to be interpreted as degrees obs.elevation = data['altitude'][ind].values obs.date = str(time.dt.strftime('%Y/%m/%d %H:%M:%S').values) # get object's position in degrees obj.compute(obs) ele[ind] = np.rad2deg(obj.alt) azi[ind] = np.rad2deg(obj.az) return ele, azi
[docs]def flag_check(data, varname, value, channel=None): """check if flag with 'varname' in 'data' matches 'value' Args: data: dataset, commonly Measurement.data varname: name of the variable in 'data' value: value that the flag must match so that this function returns True channel (optional): channel index for which to check the flag variables of size (time, channels). If set to None a variable of size (time,) the sanity flag (only used for RPG). Defaults to None Returns: tuple containing: mask_fail: None if check_applied=False, otherwise array of size (time,) set True where value is 'matched' check_applied (bool): True if check has been applied, False if check could not be applied (not enough info) """ if varname in data: if channel is None: return (data[varname][:] == value), True return (data[varname][:, channel] == value), True else: logger.info("Cannot apply check for '{}' during quality control as variable does not exist".format(varname)) return None, False