Source code for mwr_raw2l1.readers.reader_rpg_helpers

"""
helper functions for the module reader_rpg
"""
import datetime as dt

import numpy as np

from mwr_raw2l1.errors import UnknownFlagValue, WrongInputFormat


[docs]def interpret_time(time_in): """translate the time format of RPG files to datetime object""" # offset between RPG and POSIX time in seconds. Care: datetime base must be in UTC, hence tzinfo muxt be given. posix_offset = dt.datetime.timestamp(dt.datetime(2001, 1, 1, tzinfo=dt.timezone(dt.timedelta(0)))) scalar_input = False if np.isscalar(time_in): time_in = np.array([time_in]) scalar_input = True times = [dt.datetime.utcfromtimestamp(x + posix_offset) for x in time_in] out = np.array(times) if scalar_input: out = out[0] return out
[docs]def interpret_angle(x, version): """translate the angle encoding from RPG to elevation and azimuth in degrees Args: x: RPG angle. version: version of RPG angle encoding: 1: sign(ele) * (abs(ele)+1000*azi) 2: digits 1-5 = elevation*100; digits 6-10 = azimuth*100 Returns: elevation, azimuth """ scalar_input = False if np.isscalar(x): x = np.array([x]) scalar_input = True if version == 1: ele_offset = np.zeros(x.shape) ind_offset_corr = (x >= 1e6) ele_offset[ind_offset_corr] = 100 x[ind_offset_corr] -= 1e6 azi = (np.abs(x) // 100) / 10 # assume azi and ele are measured in 0.1 degree steps ele = x - np.sign(x) * azi * 1000 + ele_offset elif version == 2: ele = np.sign(x) * (np.abs(x) // 1e5) / 100 azi = (np.abs(x) - np.abs(ele) * 1e7) / 100 else: raise NotImplementedError('Known versions for angle encoding are 1 and 2, but received {:f}'.format(version)) if scalar_input: ele = ele[0] azi = azi[0] return ele, azi
[docs]def interpret_coord(x, version=2): # TODO: Ask Harald how to find out which coord version used. In manual 06/2020 version=1 is described """Translate coordinate encoding from RPG to degrees with decimal digits. Args: x: float encoding latitude of longitude coordinate in RPG binaries version: version of RPG coordinate encoding: 1: latitude or longitude in format (-)DDDMM.mmmm where DDD is degrees, MM is minutes and mmmm is the decimal fraction of MM 2: latitude and longitude already in decimal degrees, function does nothing. Returns: latitude or longitude in format decimal degrees. """ if version == 1: degabs = np.abs(x) // 100 minabs = np.abs(x) - degabs * 100 return np.sign(x) * (degabs + minabs / 60) elif version == 2: return x else: raise NotImplementedError('Known versions for coordinates encoding are 1 and 2, but received {:f}'.format( version))
[docs]def interpret_met_auxsens_code(auxsenscode): """interpret integer code for the availability of auxiliary sensors in MET files, return dict of contents vars""" auxsenscode_int8 = np.uint8(auxsenscode) auxsensbits = np.unpackbits(auxsenscode_int8, bitorder='little') if auxsensbits is None: out = {'has_windspeed': 0, 'has_winddir': 0, 'has_rainrate': 0} else: out = {'has_windspeed': auxsensbits[0], 'has_winddir': auxsensbits[1], 'has_rainrate': auxsensbits[2]} return out
[docs]def interpret_hkd_contents_code(contents_code_integer): """interpret the integer contents code from HKD files and return dict of contents variables""" contents_code_int8 = np.array([contents_code_integer]).view(np.uint8) hkdselbits = np.unpackbits(contents_code_int8, bitorder='little') out = {'has_coord': hkdselbits[0], 'has_T': hkdselbits[1], 'has_stability': hkdselbits[2], 'has_flashmemoryinfo': hkdselbits[3], 'has_qualityflag': hkdselbits[4], 'has_statusflag': hkdselbits[5]} return out
[docs]def interpret_statusflag(flag_integer): """interpret the statusflag from HKD files and return a dict of status variables (input time series or scalar)""" # order of channel-dependent quantities in HKD. All RPG instruments have the same HKD file format, independent of # the actual number of channels of the instrument receivers really have n_channels_rec = 7 # number of frequency channels per receiver. Assume all rec have same number of ch in datafile ind_start_qc_hum = 0 # index of start of frequency channel quality flag for humidity receiver ind_start_qc_temp = 8 # index of start of frequency channel quality flag for temperature receiver statusflagbits = flag_int2bits(flag_integer) # interpret the different bits according to the manual tstabflag_hum = statusflagbits[:, 24] + 2 * statusflagbits[:, 25] tstabflag_temp = statusflagbits[:, 26] + 2 * statusflagbits[:, 27] out = { 'channels_rec': np.arange(n_channels_rec), # needed as dim for channel_quality_ok_hum and channel_quality_ok_temp 'channel_quality_ok_hum': statusflagbits[:, ind_start_qc_hum:(ind_start_qc_hum + n_channels_rec)], 'channel_quality_ok_temp': statusflagbits[:, ind_start_qc_temp:(ind_start_qc_temp + n_channels_rec)], 'rainflag': statusflagbits[:, 16], 'blowerspeed_status': statusflagbits[:, 17], 'BLscan_active': statusflagbits[:, 18], 'tipcal_active': statusflagbits[:, 19], 'gaincal_active': statusflagbits[:, 20], 'noisecal_active': statusflagbits[:, 21], 'noisediode_ok_hum': statusflagbits[:, 22], 'noisediode_ok_temp': statusflagbits[:, 23], 'Tstab_ok_hum': interpret_tstab_flag(tstabflag_hum), 'Tstab_ok_temp': interpret_tstab_flag(tstabflag_temp), 'recent_powerfailure': statusflagbits[:, 28], 'Tstab_ok_amb': interpret_tstab_flag(statusflagbits[:, 29]), 'noisediode_on': statusflagbits[:, 30]} return out
[docs]def interpret_scanflag(flag_integer): """interpret flag of BLB scan Args: flag_integer: scanflag from BLB as integer. When transformed to bit, first bit interpreted as rainflag while 2nd/3rd encodes scan time (see notes) Returns: Dictionary with keys 'rainflag' and 'scan_quadrant' Notes: =================== ====================== ============================ 2nd/3rd bit (input) scan_quadrant (output) description =================== ====================== ============================ 0/0 1 first quadrant 0/1 2 second quadrant 1/0 0 average over both quadrants =================== ====================== ============================ """ scanflagbits = flag_int2bits(flag_integer) int_quadr = scanflagbits[:, 1] + 2 * scanflagbits[:, 2] out = { 'rainflag': scanflagbits[:, 0], 'scan_quadrant': interpret_quadrant_int(int_quadr)} return out
[docs]def interpret_tstab_flag(flag): """interpret temperature stability flag and return a flag saying stability ok (1), not ok (0) or unknown (NaN) Notes: ================== ================= ========================================================== tstab flag (input) tstab ok (output) description ================== ================= ========================================================== 0 NaN unknown stability (too short measurement series available) 1 1 stability ok 2 0 not ok (T sensors differ by >0.3 K) ================== ================= ========================================================== """ flag2tstab_ok = {0: np.nan, 1: 1, 2: 0} # correspondence of RPG stability flag (keys) with stability ok (value) try: if np.isscalar(flag): return flag2tstab_ok[flag] else: return np.array([flag2tstab_ok[el] for el in flag]) except KeyError as err: raise UnknownFlagValue('Expected 0, 1 or 2 for RPG temperature stability flag but found {}'.format(err))
[docs]def interpret_quadrant_int(int_quadr): """helper function for interpret scanflag for interpreting 2nd and 3rd bit (as int). See documentation from there""" int2quadrant = {0: 1, 1: 2, 2: 0} # correspondence of int(2nd and 3rd bit) (keys) with quadrant (value) try: if np.isscalar(int_quadr): int2quadrant[int_quadr] else: return np.array([int2quadrant[el] for el in int_quadr]) except KeyError as err: raise UnknownFlagValue('Expected 0, 1 or 2 for scan quadrant encoding but found {}'.format(err))
[docs]def interpret_bit_order(bit_order): """interpret bit_order for use in :func:`numpy.unpackbits` so that it also '>' and '<' can be used""" if bit_order == 'little' or bit_order == 'big': return bit_order elif bit_order == '<': return 'little' elif bit_order == '>': return 'big' else: raise ValueError("argument bit_order can receive '<', '>', 'little' or 'big' but got " + bit_order)
[docs]def flag_int2bits(flag_integer): """transform an integer value to bits. Input can be time series or scalars""" # format input to a 2d numpy array with all data along first dim if len(np.shape(flag_integer)) == 0: # input is scalar flag_integer = np.array([[flag_integer]]) elif not isinstance(flag_integer, np.ndarray): raise WrongInputFormat('input must either be a numpy array or a scalar') elif np.ndim(np.squeeze(flag_integer)) > 1: raise WrongInputFormat('input can only be vector or scalar but not matrix') elif len(flag_integer) == 1: # input is numpy array with just one input (will be made scalar by numpy squeeze) flag_integer = np.squeeze(flag_integer)[np.newaxis, np.newaxis] else: # input is column or row vector flag_integer = np.squeeze(flag_integer)[:, np.newaxis] # transform integer (time series) to flag bits flag_int8 = flag_integer.view(np.uint8) return np.unpackbits(flag_int8, axis=1, bitorder='little')