ecpet.ecdb module

EC-PeT Database Module

SQLite database operations for eddy-covariance data processing workflows. Provides comprehensive data ingestion, storage, and retrieval capabilities for TOA5 datalogger files and processed results. Manages metadata, file tracking, and temporal data organization with multiprocessing support.

Key Features:
  • TOA5 file ingestion with parallel processing

  • Automatic schema creation and column management

  • File deduplication using MD5 checksums

  • Temporal data retrieval with flexible filtering

  • Configuration persistence and retrieval

  • Processing stage checkpoint management

ecpet.ecdb.timestamp_fill(stamp)

Complete partial timestamp strings to standard format.

Parameters:

stamp (str) – Timestamp string to standardize

Returns:

Standardized timestamp string

Return type:

str

Ensures timestamps match ‘%Y-%m-%d %H:%M:%S.%f’ format by padding incomplete strings with appropriate defaults.

ecpet.ecdb.with_retry(func, *args, **kwargs)

Execute a function that operates on the database and retry if it fails.

Parameters:
  • func (function) – Function to execute

  • args – Positional arguments to pass to the function

  • kwargs – Keyword arguments to pass to the function

Returns:

Function result

Return type:

Any

The time between retries increases exponentially from try to try. At most MAX_RETRIES attempts are made.

ecpet.ecdb.exe_sql(cur, sql)

Execute SQL statement and return results with logging.

Parameters:
Returns:

Query results

Return type:

list

Provides debug logging and error checking for database operations.

ecpet.ecdb.get_free_index(cur, tab_name, id_name, reserv=None)

Find next available index number in database table.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • tab_name (str) – Table name to search

  • id_name (str) – ID column name

  • reserv (list, optional) – Reserved IDs to avoid, defaults to None

Returns:

Next available index

Return type:

int

Finds lowest available ID number, accounting for reserved values and existing database entries.

ecpet.ecdb.column_prepare(cur, tab_name, col_name, col_type)

Ensure column exists in table, creating if necessary.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • tab_name (str) – Table name

  • col_name (str) – Column name to check/create

  • col_type (str) – Column data type

Returns:

SQL query response

Return type:

list

Adds missing columns to existing tables with specified data type.

ecpet.ecdb.table_prepare(cur, tab_name, tab_cols, tab_types)

Ensure table exists with required columns, creating if necessary.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • tab_name (str) – Table name

  • tab_cols (list) – Required column names

  • tab_types (dict) – Column type specifications

Returns:

SQL query response

Return type:

list

Creates tables and adds missing columns according to specifications.

ecpet.ecdb.db_prepare(cur)

Initialize database with required metadata tables.

Parameters:

cur (sqlite3.Cursor) – Database cursor

Creates headers and files metadata tables if they don’t exist.

ecpet.ecdb.dbtable_name(station, table)

Generate standardized database table name.

Parameters:
  • station (str) – Station identifier

  • table (str) – Table identifier

Returns:

Combined table name

Return type:

str

Creates table names using station:table format.

ecpet.ecdb.ingest_toa5(params)

Ingest single TOA5 file into database with multiprocessing support.

Parameters:

params (tuple) – Tuple of (filename, station_name, table_name, free_id, force)

Process designed for multiprocessing that handles complete TOA5 file ingestion including header parsing, hash checking, and data insertion.

ecpet.ecdb.ingest_df(df, station_name=None, table_name=None, time_column=None)

Ingest pandas DataFrame into database.

Parameters:
  • df (pandas.DataFrame) – DataFrame containing data to store

  • station_name (str, optional) – Station identifier, defaults to None

  • table_name (str, optional) – Table identifier, defaults to None

  • time_column (str, optional) – Column to use for timestamps, defaults to None

Safe version of ingest_df with comprehensive error handling for multiprocessing.

ecpet.ecdb.init_worker_process(database_path)

Worker connects to existing database - does NOT create it.

ecpet.ecdb.ingest(infiles, force=False, nproc=0, station_name=None, table_name=None, progress=100)

Ingest multiple TOA5 files with parallel processing support.

Parameters:
  • infiles (list) – List of file paths to process

  • force (bool, optional) – Force reprocessing of existing files, defaults to False

  • nproc (int, optional) – Number of parallel processes (0=auto), defaults to 0

  • station_name (str, optional) – Override station name, defaults to None

  • table_name (str, optional) – Override table name, defaults to None

  • progress (int, optional) – Progress reporting weight, defaults to 100

Coordinates parallel ingestion with ID reservation and progress tracking.

ecpet.ecdb.dbcolumn_name(n, u, s)

Generate database column name with metadata.

Parameters:
  • n (str) – Column name

  • u (str) – Units

  • s (str) – Sampling information

Returns:

Combined column identifier

Return type:

str

Creates column names including measurement units and sampling metadata.

ecpet.ecdb.dbcolumn_split(dbcolumn, check=True)

Parse database column name into components.

Parameters:
  • dbcolumn (str) – Database column identifier

  • check (bool, optional) – Whether to validate skeleton columns, defaults to True

Returns:

Tuple of (name, units, sampling)

Return type:

tuple

Extracts measurement metadata from standardized column names.

ecpet.ecdb.db_get_columns(cur, dbtable, check=True)

Retrieve column names from database table.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • dbtable (str) – Table name

  • check (bool, optional) – Whether to validate metadata fields, defaults to True

Returns:

List of column names

Return type:

list

Gets data columns excluding metadata fields.

ecpet.ecdb.db_get_header(cur, opt, dbtable, time_condition)

Retrieve header information for specified table and time period.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • opt (str) – Header selection option (‘f’=first, ‘l’=last, ‘u’=unique, ‘m’=majority)

  • dbtable (str) – Table name

  • time_condition (str) – SQL time condition clause

Returns:

Dictionary with header information

Return type:

dict

Selects appropriate header metadata based on temporal constraints.

ecpet.ecdb.db_get_values(cur, dbtable, columns, time_condition, check=True)

Retrieve data values from database table with time filtering.

Parameters:
  • cur (sqlite3.Cursor) – Database cursor

  • dbtable (str) – Table name

  • columns (list or None) – Columns to retrieve (None for all)

  • time_condition (str) – SQL time condition clause

  • check (bool, optional) – Whether to validate columns, defaults to True

Returns:

Tuple of (column_names, data_rows)

Return type:

tuple

Retrieves measurement data with temporal filtering and column validation.

ecpet.ecdb.conf_to_db(conf)

Store configuration parameters in database.

ecpet.ecdb.conf_from_db()

Retrieve configuration parameters from database.

Returns:

Configuration object with stored parameters

Return type:

Config

Restores processing configuration for workflow continuation.

ecpet.ecdb.list_tables()

List all tables in database.

Returns:

List of table names

Return type:

list

Provides inventory of available data tables.

ecpet.ecdb.retrieve_columns(station_name, table_name)

Get available columns for specified station and table.

Parameters:
  • station_name (str) – Station identifier

  • table_name (str) – Table identifier

Returns:

List of column names

Return type:

list

Returns measurement variables available in specified dataset.

ecpet.ecdb.mark_finished()

Mark processing as completed in database.

Creates completion marker for workflow status tracking.

ecpet.ecdb.retrieve_df(station_name, table_name, columns=None, tbegin=None, tend=None)

Retrieve data as pandas DataFrame with temporal filtering.

Parameters:
  • station_name (str) – Station identifier

  • table_name (str) – Table identifier

  • columns (list, optional) – Columns to retrieve (None for all), defaults to None

  • tbegin (datetime, pd.Timestamp, or str, optional) – Start time for filtering, defaults to None

  • tend (datetime, pd.Timestamp, or str, optional) – End time for filtering, defaults to None

Returns:

DataFrame with requested data

Return type:

pandas.DataFrame

Main data retrieval interface with automatic type conversion and indexing.

ecpet.ecdb.retrieve_time_range(station_name, table_name)

Get temporal extent of data in specified table.

Parameters:
  • station_name (str) – Station identifier

  • table_name (str) – Table identifier

Returns:

Tuple of (earliest_time, latest_time)

Return type:

tuple

Efficiently determines data coverage for time range planning.