Skip to content

Pipeline API Reference

Auto-generated reference for ca_biositing.pipeline — the ETL pipeline components for data ingestion, transformation, and loading.

Pipeline Package

CA Biositing ETL Pipeline Package.

This package contains ETL (Extract, Transform, Load) pipelines and workflows for processing bioeconomy data for the CA Biositing project.

ETL Core

ETL task package for CA BioSiting.

This package groups extract, transform, and load task modules used by Prefect flows. Keep imports in submodules to avoid import-time side effects.

Extract

Extract tasks for loading source datasets.

Modules in this package read external data sources (files, APIs, and sheets) and return pandas/geopandas data frames for downstream transforms.

Modules

basic_sample_info

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts the raw data from the '01-BasicSampleInfo' worksheet in the Google Sheet.

This function is purely for extraction (the 'E' in ETL). It connects to the data source and returns the data as is, without any transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

billion_ton

Functions

extract(file_id: str = '11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa', file_name: str = 'billionton_23_agri_download.csv', mime_type: str = 'text/csv', project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw Billion Ton data from a file on Google Drive.

Parameters:

Name Type Description Default
file_id str

The Google Drive File ID.

'11xLy_kPTHvoqciUMy3SYA3DLCDIjkOGa'
file_name str

The local filename to save as.

'billionton_23_agri_download.csv'
mime_type str

The MIME type of the file.

'text/csv'
project_root Optional[str]

Optional root directory of the project.

None

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

biodiesel_plants

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from a .csv file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

ca_proc_points

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from a .zip file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

calorimetry

Calorimetry ETL extract for reading Google Sheet data.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

cmpana

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

experiments

Functions

extract_experiments() -> Optional[pd.DataFrame]

Extracts the raw data from the '03-Experiment' worksheet in the Google Sheet.

This function is purely for extraction (the 'E' in ETL). It connects to the data source and returns the data as is, without any transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

icp

ICP ETL extract for reading Google Sheet data.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Parameters:

Name Type Description Default
project_root Optional[str]

Optional[str] path to the project root used to locate config/credentials or default to current working dir. Defaults to None.

None

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

landiq

ETL Extract for Land IQ Data.

This module provides functionality for extracting Land IQ geospatial data from shapefiles. Supports loading from a local path (Docker Compose / dev) or downloading from an HTTP URL at runtime (Cloud Run) when LANDIQ_SHAPEFILE_URL is set.

Functions

download_shapefile(url: str, logger) -> Optional[tuple[str, str]]

Download a shapefile (or zip archive containing one) from a URL.

Returns a tuple of (shp_path, tmp_dir) on success, or None on failure. The caller is responsible for cleanup of tmp_dir.

extract(shapefile_path: Optional[str] = None) -> Optional[gpd.GeoDataFrame]

Extracts raw data from a Land IQ shapefile.

Resolution order: 1. shapefile_path argument (if provided and exists locally) 2. DEFAULT_SHAPEFILE_PATH (falls back to volume-mounted file for Docker Compose)

URL download is handled by the flow before calling this task.

Parameters:

Name Type Description Default
shapefile_path Optional[str]

Path to the Land IQ shapefile. If None, uses default path.

None

Returns:

Type Description
Optional[GeoDataFrame]

A geopandas GeoDataFrame containing the raw data, or None if an error occurs.

petroleum_pipelines

Functions

extract(project_root: Optional[str] = None) -> Optional[gpd.GeoDataFrame]

Extracts raw data from a .geojson file.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[GeoDataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

preparation

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts the raw data from the '02-Preparation' worksheet in the Google Sheet.

This function is purely for extraction (the 'E' in ETL). It connects to the data source and returns the data as is, without any transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

provider_info

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

proximate

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

resources

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts the raw data from the '01-Resources' worksheet in the Google Sheet.

This function is purely for extraction (the 'E' in ETL). It connects to the data source and returns the data as is, without any transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

samplemetadata

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

static_resource_info

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

ultimate

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

usda_census_survey

USDA Census and Survey Data Extraction.

This module extracts agricultural census and survey data from the USDA NASS Quick Stats API for California. Data includes: - Census data (every 5 years): Complete agricultural census - Survey data (annual): Preliminary and final agricultural estimates The USDA API provides access to decades of historical data across many commodities and regions. For more information: https://quickstats.nass.usda.gov/api

Functions

extract() -> Optional[pd.DataFrame]

Extracts USDA data ONLY for commodities mapped in resource_usda_commodity_map and for priority counties (North San Joaquin Valley). This allows adding new crops by updating the database, no code changes needed.

xrd

XRD ETL extract for reading Google Sheet data.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

xrf

ETL Extract Template.

This module provides a template for extracting data from a Google Sheet.

To use this template: 1. Copy this file to the appropriate subdirectory in src/etl/extract/. For example: src/etl/extract/new_module/new_data.py 2. Update the configuration constants (GSHEET_NAME, WORKSHEET_NAME). 3. Ensure the CREDENTIALS_PATH is correct.

Functions

extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]

Extracts raw data from the specified Google Sheet worksheet.

This function serves as the 'Extract' step in an ETL pipeline. It connects to the data source and returns the data as is, without transformation.

Returns:

Type Description
Optional[DataFrame]

A pandas DataFrame containing the raw data, or None if an error occurs.

Transform

Transform tasks for standardizing extracted data.

Modules in this package clean, validate, and reshape raw extracts into schema-aligned tables ready for load tasks.

Modules

billion_ton

ETL Transform for Billion Ton 2023 Agricultural Dataset

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: int = None, lineage_group_id: int = None) -> Optional[pd.DataFrame]

Transforms raw Billion Ton data into the BillionTon2023Record format.

Modules

landiq

Modules

landiq_record

ETL Transform for Land IQ Data.

This module provides functionality for transforming Land IQ GeoDataFrames into the LandiqRecord table format.

Functions
transform_landiq_record(gdf: gpd.GeoDataFrame, etl_run_id: str = None, lineage_group_id: int = None) -> pd.DataFrame

Transforms Land IQ GeoDataFrame into the LandiqRecord table format.

Parameters:

Name Type Description Default
gdf GeoDataFrame

Raw GeoDataFrame from Land IQ shapefile.

required
etl_run_id str

ID of the current ETL run.

None
lineage_group_id int

ID of the lineage group.

None

Returns:

Type Description
DataFrame

A pandas DataFrame formatted for the landiq_record table.

Modules

prepared_sample

ETL Transform for Prepared Sample.

This module transforms raw preparation data into the prepared_sample table format.

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: int = None, lineage_group_id: int = None) -> Optional[pd.DataFrame]

Transforms raw preparation data into a structured format for the prepared_sample table.

Parameters:

Name Type Description Default
data_sources Dict[str, DataFrame]

Dictionary where keys are source names and values are DataFrames.

required
etl_run_id int

ID of the current ETL run.

None
lineage_group_id int

ID of the lineage group.

None

Modules

resource

ETL Transform Template.

This module provides a template for transforming raw data from multiple sources. It includes standard cleaning, coercion, and normalization patterns used in the pipeline.

Functions

transform(data_sources: Dict[str, pd.DataFrame], etl_run_id: int = None, lineage_group_id: int = None) -> Optional[pd.DataFrame]

Transforms raw data from multiple sources into a structured format.

Parameters:

Name Type Description Default
data_sources Dict[str, DataFrame]

Dictionary where keys are source names and values are DataFrames.

required
etl_run_id int

ID of the current ETL run.

None
lineage_group_id int

ID of the lineage group.

None

Modules

Load

Load tasks for persisting transformed data.

Modules in this package write transformed frames to PostgreSQL using the shared data models and ETL lineage metadata.

Modules

billion_ton

Functions

load(df: pd.DataFrame)

Loads transformed Billion Ton data into the billion_ton2023_record table. Ensures that Place records exist before loading.

field_sample

Functions

load_field_sample(df: pd.DataFrame)

Upserts FieldSample records into the database based on the 'name' column.

landiq

Functions

bulk_insert_polygons_ignore(session: Session, geoms: list[str], etl_run_id: str = None, lineage_group_id: str = None, dataset_id: int = None)

Inserts polygons in bulk, ignoring duplicates based on geom.

fetch_polygon_ids_by_geoms(session: Session, geoms: list[str]) -> dict[str, int]

Fetches polygon IDs for a list of geometries.

bulk_upsert_landiq_records(session: Session, records: list[dict]) -> int

Upserts LandiqRecords in bulk using ON CONFLICT (record_id) DO UPDATE. Returns the number of records processed.

load_landiq_record(df: pd.DataFrame)

Upserts Land IQ records into the database using optimized bulk operations.

prepared_sample

Functions

load_prepared_sample(df: pd.DataFrame)

Upserts PreparedSample records into the database based on the 'name' column.

resource

Functions

load_resource(df: pd.DataFrame)

Upserts resource records into the database.

static_resource_info

Functions

load_landiq_resource_mapping(df: pd.DataFrame)

Upserts LandiqResourceMapping records.

load_resource_availability(df: pd.DataFrame)

Upserts ResourceAvailability records.

Utilities

Shared ETL utility package.

Includes reusable helpers used across extract, transform, and load modules. See subpackages for cleaning/coercion and lookup-specific functions.

Cleaning Functions

Cleaning helpers package.

Expose commonly used cleaning and coercion helpers for the ETL pipeline. This package is intentionally small and documented; individual modules contain the implementation so unit tests can target them directly.

Functions

clean_names_df(df: pd.DataFrame) -> pd.DataFrame

Return a copy of df with cleaned column names using janitor.clean_names().

If df is not a DataFrame, the original value is returned and an error is logged.

replace_empty_with_na(df: pd.DataFrame, columns: Optional[Iterable[str]] = None, regex: str = '^\\s*$') -> pd.DataFrame

Replace empty/whitespace-only strings with np.nan.

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
columns Optional[Iterable[str]]

optional iterable of column names to process; if None operate on whole frame

None
regex str

regex used to identify empty/whitespace strings

'^\\s*$'

Returns:

Type Description
DataFrame

A new DataFrame with replacements applied.

to_lowercase_df(df: pd.DataFrame, columns: Optional[Iterable[str]] = None) -> pd.DataFrame

Lowercase string columns.

Converts selected columns (or all string-like columns) to pandas string dtype, then applies .str.lower(). Missing values are preserved.

standard_clean(df: pd.DataFrame, lowercase: bool = True, replace_empty: bool = True) -> Optional[pd.DataFrame]

Run a composed standard cleaning pipeline and return a cleaned DataFrame.

Steps
  1. clean_names_df
  2. replace_empty_with_na (optional)
  3. to_lowercase_df (optional)
  4. convert_dtypes() to allow pandas to pick improved nullable dtypes

coerce_columns(df: pd.DataFrame, int_cols: Optional[Iterable[str]] = None, float_cols: Optional[Iterable[str]] = None, datetime_cols: Optional[Iterable[str]] = None, bool_cols: Optional[Iterable[str]] = None, category_cols: Optional[Iterable[str]] = None, geometry_cols: Optional[Iterable[str]] = None, dtype_map: Optional[dict] = None, float_dtype=np.float64, geometry_format: str = 'wkt') -> pd.DataFrame

Coerce groups of columns to target types.

dtype_map may be provided as an alternative mapping with keys like 'int','float','datetime','bool','category','geometry'. Explicit keyword lists take precedence over dtype_map entries.

geometry_format controls how geometry columns are coerced: - 'wkt': parse WKT strings using shapely (default) - 'geodataframe': skip coercion (columns already GeoSeries from geopandas)

coerce_columns_list(dfs: Iterable[pd.DataFrame], **coerce_kwargs) -> list

Apply coerce_columns to each DataFrame in dfs and return a list of results.

Non-DataFrame items are preserved with a warning.

detect_latlon_columns(df: pd.DataFrame) -> Dict[str, list]

Auto-detect latitude and longitude columns in a DataFrame.

Searches for common naming patterns like: - latitude/longitude, lat/lon, desc_lat/desc_lon - sampling_lat/sampling_lon, prod_lat/prod_lon, etc. - Combined columns: latlong, lat_lon, latlng, location, coordinates

Returns:

Type Description
Dict[str, list]

Dict with keys:

Dict[str, list]
  • 'latitude': list of detected latitude columns
Dict[str, list]
  • 'longitude': list of detected longitude columns
Dict[str, list]
  • 'combined': list of combined lat/lon columns

split_combined_latlon(df: pd.DataFrame, col: str, sep: Optional[str] = None, lat_col: str = 'desc_lat', lon_col: str = 'desc_lon', keep_original: bool = False) -> pd.DataFrame

Split a combined lat/lon column into two separate columns.

Handles multiple separators: comma, space, semicolon, pipe, tab. Auto-detects delimiter if not specified.

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
col str

name of combined lat/lon column

required
sep Optional[str]

delimiter (e.g., ',', ';'); if None, auto-detects

None
lat_col str

name for output latitude column

'desc_lat'
lon_col str

name for output longitude column

'desc_lon'
keep_original bool

if True, keep the original combined column

False

Returns:

Type Description
DataFrame

DataFrame with new lat/lon columns

standardize_latlon(df: pd.DataFrame, lat_cols: Optional[Iterable[str]] = None, lon_cols: Optional[Iterable[str]] = None, combined_cols: Optional[Iterable[str]] = None, auto_detect: bool = True, output_lat: str = 'desc_lat', output_lon: str = 'desc_lon', sep: Optional[str] = None, coerce_to_float: bool = True) -> pd.DataFrame

Standardize latitude/longitude columns in a DataFrame.

Workflow: 1. Auto-detect lat/lon columns if enabled 2. Split any combined lat/lon columns 3. Rename detected separate columns to output names 4. Optionally coerce to float with error handling

Parameters:

Name Type Description Default
df DataFrame

input DataFrame

required
lat_cols Optional[Iterable[str]]

explicit list of latitude columns to process

None
lon_cols Optional[Iterable[str]]

explicit list of longitude columns to process

None
combined_cols Optional[Iterable[str]]

explicit list of combined lat/lon columns to split

None
auto_detect bool

if True, automatically detect columns by name pattern

True
output_lat str

name for standardized latitude column

'desc_lat'
output_lon str

name for standardized longitude column

'desc_lon'
sep Optional[str]

delimiter for parsing combined columns

None
coerce_to_float bool

if True, coerce to float64

True

Returns:

Type Description
DataFrame

DataFrame with standardized lat/lon columns