ecpet.ecpre module

EC-PeT Preprocessing Module

Comprehensive quality control and preprocessing routines for eddy-covariance data. Implements quality checks following [Vickers and Mahrt, 1997] (Vickers & Mahrt, 1997) and extended tests from Mauder et al. (2013). Provides multiprocessing support for efficient processing of large datasets.

The module performs:
  • Spike detection and removal

  • Data resolution and dropout analysis

  • Statistical moment analysis

  • Discontinuity detection using Haar wavelets

  • Stationarity tests

  • Lag correlation analysis

  • Instrument diagnostic filtering

@author: druee

ecpet.ecpre.getconf(conf, par, kind='float')

Get configuration value from QC group with type validation.

Parameters:
  • conf (object) – Configuration object

  • par (str) – Parameter name to retrieve

  • kind (str) – Expected data type, defaults to ‘float’

Returns:

Parameter value converted to specified type

Return type:

type specified by kind parameter

ecpet.ecpre.north_angle(conf)

Returns anemometer orientation relative to north.

Determines the direction of the anemometer’s north direction (phi) in degrees clockwise from north, and the coordinate system handedness.

Parameters:

conf (object) – Configuration object containing apparatus settings

Returns:

Tuple of (phi, hand) where phi is orientation angle and hand is ±1 for handedness

Return type:

tuple(float, float)

Raises:

ValueError – If apparatus type is not a recognized anemometer

Note:

Zero degrees indicates anemometer x-axis aligned with geographic east. For right-handed systems, y-axis points north; for left-handed, south.

Example:

Campbell CSAT3: Direction from sensor heads to mounting node defines positive x direction. If device “points” west, heading is zero. Its handness is right.

ecpet.ecpre.mask_diag(conf, dat)

Remove flagged values based on instrument diagnostic flags.

Applies quality masks for sonic anemometer and gas analyzer diagnostics. Sets values to NaN when instrument flags indicate problems.

Parameters:
  • conf (object) – Configuration object with diagnostic thresholds

  • dat (pandas.DataFrame) – DataFrame containing raw measurement data

Returns:

DataFrame with flagged values masked as NaN

Return type:

pandas.DataFrame

Note:

CSAT diagnostic uses bitmask; IRGA uses AGC threshold and flag bits:

`csatmask = b'1111000000000000'` ! bits that must me lo `irgamask = b'11110000': bits that must be hi ```agclimit = 70`

ecpet.ecpre.vmspike(conf, dat, dt)

Spike detection and removal following Vickers & Mahrt.

Quality check (a) from [Vickers and Mahrt, 1997]. Iteratively identifies and removes spikes based on rolling standard deviation thresholds.

Parameters:
  • conf (object) – Configuration object with spike detection parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (flags, quality_measures, despiked_data)

Return type:

tuple(dict, dict, pandas.DataFrame)

Note:

Uses iterative approach with increasing thresholds over up to 10 iterations. Only removes short sequences of consecutive outliers as spikes.

ecpet.ecpre.ampres(conf, dat)

Amplitude resolution test following Vickers & Mahrt.

Quality check (b) from [Vickers and Mahrt, 1997]. Tests if data resolution is sufficient for flux calculations by examining distribution of values in bins.

Parameters:
  • conf (object) – Configuration object with resolution test parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Uses half-overlapping windows to test for empty bins in histogram. Pressure variables are typically skipped as they often fail without consequence.

ecpet.ecpre.dropout(conf, dat)

Dropout test for values “sticking” to certain values by Vickers & Mahrt.

Quality check (c) from V:cite:vim_jaot97. Detects when data values remain constant for too long, indicating sensor problems.

Parameters:
  • conf (object) – Configuration object with dropout test parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Examines consecutive identical values in histogram bins across half-overlapping windows. Different thresholds applied for center bins vs. edge bins.

ecpet.ecpre.limit(conf, dat)

Absolute limits test following Vickers & Mahrt.

Quality check (d) from vim_jaot97`. Checks if wind, temperature, and gas concentrations fall within physically reasonable ranges.

Parameters:
  • conf (object) – Configuration object with limit parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data with derived variables

Returns:

Dictionary of quality flags by variable

Return type:

dict

Note:

Tests horizontal wind speed, vertical wind speed, temperatures, specific humidity, and CO2 mixing ratio against configured limits.

ecpet.ecpre.himom(conf, dat)

Higher moments test following Vickers & Mahrt (1997).

Quality check (e) from [Vickers and Mahrt, 1997]. Examines skewness and kurtosis of detrended data to detect unusual statistical distributions.

Parameters:
  • conf (object) – Configuration object with moment test parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Uses scipy’s biased moments following Vickers & Mahrt methodology. Data is linearly detrended before moment calculation.

ecpet.ecpre.disco(conf, dat, dt)

Discontinuity detection using Haar wavelets after Vickers & Mahrt.

Quality check (f) from [Vickers and Mahrt, 1997]. Uses Haar transformation to detect sudden jumps or discontinuities in the data.

Parameters:
  • conf (object) – Configuration object with discontinuity test parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Applies Haar wavelets and examines variance changes in sliding windows. Uses data standard deviation for normalization.

ecpet.ecpre.nonstat(conf, dat)

Stationarity test for wind following Vickers & Mahrt.

Quality check (g) from [Vickers and Mahrt, 1997]. Tests stationarity by examining wind speed reduction and relative nonstationarity parameters.

Parameters:
  • conf (object) – Configuration object with stationarity test parameters

  • dat (pandas.DataFrame) – DataFrame containing wind measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Calculates mean wind vector, rotates to alongwind/crosswind coordinates, and examines linear trends as indicators of nonstationarity.

ecpet.ecpre.crosscorr(datax, datay, lag=0)

Calculate lag-N cross correlation between two time series.

Quality check (h) from [Vickers and Mahrt, 1997]. Tests if there is a hidden crosstalk between vertical wind and the other values.

Parameters:
Returns:

Cross correlation coefficient

Return type:

float

Note:

Based on https://stackoverflow.com/a/37215839

ecpet.ecpre.lagcor(conf, dat, dt)

Lag correlation test following Vickers & Mahrt.

Quality check (h) from [Vickers and Mahrt, 1997]. Detects hidden lag between vertical wind and scalar measurements by examining cross-correlations.

Parameters:
  • conf (object) – Configuration object with lag test parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Searches for maximum correlation at different lags to detect sensor synchronization problems or physical transport delays.

ecpet.ecpre.ratespike(conf, dat)

Spike detection based on change rate (Quality check 9).

Extended quality test detecting spikes by examining rate of change between consecutive measurements. Inspired by [Rebmann et al., 2005].

Parameters:
  • conf (object) – Configuration object with rate spike parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures, despiked_data)

Return type:

tuple(dict, dict, pandas.DataFrame)

Note:

Uses forward differences to detect excessive change rates. Different thresholds applied per variable type.

ecpet.ecpre.derive(v, o)

Untility function to calculate derivative of time series if desired.

Parameters:
  • v (pandas.Series) – Time series data

  • o (int) – Derivative order (0=none, 1=first, 2=second)

Returns:

Derivative time series or original data

Return type:

pandas.Series

Note:

Uses centered differences for derivative calculation

ecpet.ecpre.madspike(conf, dat)

Spike detection using Median Absolute Deviation (Quality check 10).

Extended quality test using MAD for robust spike detection across the entire record [Mauder et al., 2013].

Parameters:
  • conf (object) – Configuration object with MAD spike parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures, despiked_data)

Return type:

tuple(dict, dict, pandas.DataFrame)

Note:

Uses MAD (\(\approx 1.48 \times\) standard deviation for normal data) as robust measure of variability. Can operate on derivatives of the data.

ecpet.ecpre.fwstat(conf, dat)

Stationarity test following Foken & Wichura (Quality check 11).

Extended stationarity test from cite:fow_aafm96a comparing covariances from subrecords with full record covariances.

Parameters:
  • conf (object) – Configuration object with F&W stationarity parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Divides record into subrecords and compares their mean covariance with the full-record covariance following Foken & Wichura methodology.

ecpet.ecpre.cotrend(conf, dat)

Stationarity test for trend influence (Quality check 12).

Extended stationarity test comparing covariances of raw vs. detrended data to detect the influence of linear trends. Inspired by [Graf et al., 2010]

Parameters:
  • conf (object) – Configuration object with cotrend parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Removes linear trends and compares covariances to assess whether trends significantly affect flux calculations.

Parameters 1-4 can be used to estimate the pdf of data:

\[p_\mathrm{df}(x) = \frac{\Gamma(p) \Gamma(q)}{\Gamma(p+q)} \frac{(x-x_\mathrm{min})^{p-1} (x_\mathrm{max}-x)^{q-1}} {(x_\mathrm{max}-x_\mathrm{min})^{p+q-1}}\]

where:

  • \(\Gamma\) = the Gamma function

  • \(p\) = param(1), \(q\) = param(2)

  • \(x_\mathrm{min}\) = param(3), \(x_\mathrm{max}\) = param(4)

\(x_\mathrm{min}\) and \(x_\mathrm{max}\) can also be used for comparison with the actual sample min and max of \(x\), or boundary-layer min & max.

Distribution Properties

Distribution Type

r value

rootterm

The lowest possible kurtosis

0

>0

Beta distribution

>0

>0

Gaussian, beta/leptokurtic

1/0

1/0

More leptokurtic than beta

<0

NaN

ecpet.ecpre.beta(conf, dat)

Beta distribution analysis (Quality check 13).

Extended statistical test comparing data distribution to beta distribution to assess departure from Gaussian behavior . Inspired by [Graf et al., 2010]

Parameters:
  • conf (object) – Configuration object with beta distribution parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Calculates beta distribution parameters from moments and flags data that is too leptokurtic or shows bimodal characteristics.

ecpet.ecpre.varstat(conf, dat, dt)

Variance stationarity test (Quality check 14).

Extended test detecting discontinuities in variance using sliding windows. Inspired by [Drüe and Heinemann, 2007]

Parameters:
  • conf (object) – Configuration object with variance stationarity parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Compares variances in adjacent windows to detect sudden changes in data variability that might indicate instrument problems.

ecpet.ecpre.fturb(conf, dat, dt)

Turbulent fraction test (Quality check 15).

Extended test detecting intermittent turbulence by analyzing what fraction of the record contributes to most of the flux. Inspired by [Drüe and Heinemann, 2007]

Parameters:
  • conf (object) – Configuration object with turbulent fraction parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Divides record into subrecords, sorts covariances by magnitude, and determines what fraction of subrecords contains 90% of total flux.

ecpet.ecpre.survive(conf, dat, ch)

Data survival fraction test (Quality check 16).

Extended test examining what fraction of data survives all quality checks and corrections.

Parameters:
  • conf (object) – Configuration object with survival parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • ch (pandas.DataFrame) – DataFrame tracking which values were changed/removed

Returns:

Tuple of (flags, quality_measures)

Return type:

tuple(dict, dict)

Note:

Low survival rates indicate severe data quality problems or overly aggressive quality control settings.

ecpet.ecpre.docovmax(conf, dat, dt)

Lag correction by covariance maximization (Quality check 17).

Extended lag correction finding optimal shift for maximum correlation, then correcting based on expected physical transport time. Inspired by [Foken and Wichura, 1996]

Parameters:
  • conf (object) – Configuration object with lag correction parameters

  • dat (pandas.DataFrame) – DataFrame containing measurement data

  • dt (float) – Time step between measurements [s]

Returns:

Tuple of (quality_measures, corrected_data)

Return type:

tuple(dict, pandas.DataFrame)

Note:

Calculates expected lag from sensor separation and wind speed, compares to correlation-based lag, and applies correction.

ecpet.ecpre.init_intervals(conf)

Initialize processing intervals from configuration dates.

Expands start/end date definitions into individual averaging intervals based on the specified averaging period.

Parameters:

conf (object) – Configuration object with date and interval settings

Returns:

DataFrame with interval start/end times

Return type:

pandas.DataFrame

Note:

Creates intervals with right-closed boundaries for compatibility with standard eddy-covariance processing conventions.

ecpet.ecpre.intervals_to_file(conf, intervals)

Write interval information to output file.

Creates formatted output file with interval boundaries and metadata for subsequent processing steps.

Parameters:
  • conf (object) – Configuration object with output settings

  • intervals (pandas.DataFrame) – DataFrame containing interval data and results

Note:

Output format compatible with EC-PACK interval file conventions.

ecpet.ecpre.process_slow(conf, intervals, progress=100)

Process slow-response meteorological data.

Retrieves and processes “slowe data”, i.e. slowly-varying reference measurements (pressure, temperature, humidity) for use in quality control and flux calculations.

Parameters:
  • conf (object) – Configuration object with slow data settings

  • intervals (pandas.DataFrame) – DataFrame with processing intervals

  • progress (float) – Progress reporting weight, defaults to 100

Returns:

Updated intervals DataFrame with slow data

Return type:

pandas.DataFrame

Note:

Resamples slow data to interval averages and calculates derived quantities like water vapor density from relative humidity.

ecpet.ecpre.qc_raw_init(conf, interval, dat)

Initialize quality control by calculating derived variables.

Converts raw measurements to derived quantities needed for quality control tests (specific humidity, CO2 mixing ratio, etc.).

Parameters:
  • conf (object) – Configuration object

  • interval (dict) – Single interval data record

  • dat (pandas.DataFrame) – DataFrame with raw measurement data

Returns:

Updated interval record

Return type:

dict

Note:

Handles missing pressure/temperature by interpolation or using reference values from slow measurements.

ecpet.ecpre.qc_raw_run(conf, interval, dat)

Execute comprehensive quality control test suite.

Runs the complete set of quality control tests following [Vickers and Mahrt, 1997] and extended tests, storing flags and quality measures.

Parameters:
  • conf (object) – Configuration object with QC parameters

  • interval (dict) – Single interval data record

  • dat (pandas.DataFrame) – DataFrame with measurement data

Returns:

Tuple of (updated_interval, processed_data)

Return type:

tuple(dict, pandas.DataFrame)

Note:

Tests are conditionally executed based on configuration settings. Data can be despiked using various methods (spk, chr, mad).

ecpet.ecpre.dat_to_netcdf(conf, dat)

Write processed measurement data to NetCDF file.

Parameters:
  • conf (object) – Configuration object with output settings

  • dat (pandas.DataFrame) – DataFrame containing processed measurement data

Returns:

Generated NetCDF filename or empty string if no data

Return type:

str

Note:

Creates time variables and applies NaN replacement if configured

ecpet.ecpre.dat_to_toa5(conf, dat)

Write processed measurement data to TOA5 format file.

Parameters:
  • conf (object) – Configuration object with output settings

  • dat (pandas.DataFrame) – DataFrame containing processed measurement data

Returns:

Generated TOA5 filename

Return type:

str

Note:

TOA5 is Campbell Scientific’s table-oriented ASCII format

ecpet.ecpre.flags1_to_file(conf, intervals)

Write quality control flags and measures to output file.

Parameters:
  • conf (object) – Configuration object with output directory settings

  • intervals (pandas.DataFrame) – DataFrame containing interval results with flags

Note:

Creates flags1.dat file with test flags and quality measures

ecpet.ecpre.process_fast_interval(args)

Process single interval of high-frequency measurement data.

Parameters:

args (tuple) – Tuple of (conf, interval, lock) for multiprocessing

Returns:

Updated interval record with QC results

Return type:

dict

Note:

Retrieves raw data, applies QC tests, calibrates, and stores results. Designed for parallel execution with multiprocessing.

ecpet.ecpre.process_fast(conf, intervals, progress=100.0)

Process all high-frequency measurement intervals with quality control.

Parameters:
  • conf (object) – Configuration object with processing parameters

  • intervals (pandas.DataFrame) – DataFrame with processing intervals

  • progress (float) – Progress reporting weight, defaults to 100

Returns:

Updated intervals DataFrame with QC results

Return type:

pandas.DataFrame

Note:

Supports parallel processing using multiprocessing Pool. Number of processes controlled by conf[‘nproc’].

ecpet.ecpre.preprocessor(conf)

Main preprocessing routine for eddy-covariance data.

Parameters:

conf (object) – Configuration object with all processing parameters

Returns:

DataFrame with processed intervals and QC results

Return type:

pandas.DataFrame

Note:

Orchestrates complete preprocessing workflow: - Initialize processing intervals - Process slow reference data - Process fast measurement data with QC - Generate output files