ETL Pipeline Workflow
This guide explains the structure of the ETL (Extract, Transform, Load) pipeline and how to use Prefect to execute it.
Core Concepts
The ETL pipeline extracts data from Google Sheets, transforms it using pandas, and loads it into the PostgreSQL database.
- Structure: The core logic is organized into three main packages within
src/ca_biositing/pipeline/ca_biositing/pipeline/etl/: extract: Functions to pull raw data from sources (e.g., Google Sheets).transform: Functions to clean, process, and structure the raw data.-
load: Functions to insert the transformed data into the database using SQLAlchemy. -
Hierarchical Pipelines: Transform and load logic are organized into subdirectories reflecting the data they handle (e.g.,
products,usda,analysis).
Running the ETL Pipelines
The ETL system runs in a containerized Prefect environment.
Step 1: Start Services
pixi run start-services
Step 2: Apply Datamodel
pixi run migrate
Step 3: Deploy Flows
pixi run deploy
Step 4: Run the Master Pipeline
pixi run run-etl
Step 5: Monitor Access the Prefect UI at http://localhost:4200.
How to Add a New ETL Flow
Step 1: Create the Task Files Create the three Python files for your
extract, transform, and load logic under
src/ca_biositing/pipeline/ca_biositing/pipeline/etl/. Extract tasks go
directly in extract/; transform and load tasks go in appropriately named
subdirectories (e.g., transform/products/, load/products/). Decorate each
function with @task.
Step 2: Create the Pipeline Flow Create a new file in
src/ca_biositing/pipeline/ca_biositing/pipeline/flows/ to define the flow.
from prefect import flow
from ca_biositing.pipeline.etl.extract.my_source import extract
from ca_biositing.pipeline.etl.transform.products.my_product import transform
from ca_biositing.pipeline.etl.load.products.my_product import load
@flow
def my_product_flow():
raw_data = extract()
transformed_data = transform(raw_data)
load(transformed_data)
Step 3: Register the New Flow Add your flow to the AVAILABLE_FLOWS
dictionary in resources/prefect/run_prefect_flow.py.
Step 4: Deploy and Run
pixi run deploy
pixi run run-etl
Using Templates
Template files are available in
src/ca_biositing/pipeline/ca_biositing/pipeline/etl/templates/ to help you get
started with new tasks.