While @sql_job
is great for SQL-based transformations, some logic is too complex or better expressed in Python. The @py_job
and @parallelized_job
decorators are designed for these scenarios.
Python Jobs
(Useful for EL/ETL)
Insert data into to table by defining a function which returns a dataframe
@dft.py_job(output_table=User)
def process_user_events(events_data):
"""Loads ALL data into memory at once"""
df = pd.DataFrame(events_data) # Could be huge!
# Process entire dataset in memory
df['email'] = df['email'].str.lower()
df = df[df['email'].str.contains('@')]
return df # ⚠️ Entire DataFrame held in memory until insertion
You could also insert data into the table in a more memory-efficient way by defining a generator which yields a dataframe
@dft.py_job(output_table=User)
def process_user_events(events_data):
"""Processes and yields chunks"""
for chunk in pd.read_csv(events_data, chunksize=1000):
# Process small chunk
chunk['email'] = chunk['email'].str.lower()
chunk = chunk[chunk['email'].str.contains('@')]
yield chunk
somewhere in the code you would need to export the data to the CLI
# not ran until the job is called
USER_DATA = load_massive_dataset()...
export(USER_DATA)
Then you would run
dft run process_user_events USER_DATA
Key parameters:
output_table
The output schema the job will conform to.
streaming
Default False
. Whether or not the function will work on the entire data or return it as a generator.
Benefits of using Python / parallalized jobs
- Schema Validation: When you run
dft plan
, Datafruit checks if all referenced tables and columns exist in the current schema.
- Runtime Validation: If your job reads from or writes to database tables, Datafruit validates the schema at runtime before processing begins.
- Error Handling: If your job tries to access a missing column or table, it will fail with a clear error message indicating what’s missing.
- Updating Jobs: To update your job after schema changes:
- Modify your function to work with the new schema
- Update any SQL queries if using
table_name
or query
parameters
- If using a SQLModel for output, ensure it matches the new schema
- Test with
dft plan
before applying changes
❌ Job execution failed: process_users
🔍 Schema Validation Error:
The job 'process_users' references columns that no longer exist in the schema:
Missing columns:
├─ user.email (referenced in job code)
└─ user.full_name (referenced in job code)
📋 Job code analysis:
Line 3: df['email_domain'] = df['email'].str.split('@').str[1]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
❌ Column 'email' not found in User table schema
Line 4: df['display_name'] = df['full_name'].fillna(df['username'])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
❌ Column 'full_name' not found in User table schema
⚠️ Job not executed - fix schema conflicts first
Always test your schema changes in a development environment before applying them to production.
Parallelized jobs
You write a simple Python function that processes a single row or record, and @parallelized_job
handles the parallel execution across your entire dataset using Ray.
- Row-Level Logic: You define the logic for one item, and Datafruit applies it to all.
- Parallel Execution: Jobs run in parallel automatically, speeding up large transformations.
- Flexible I/O: Read from and write to DataFrames or database tables seamlessly.
The most straightforward way to use @parallelized_job
is to insert data row by row
import pandas as pd
from datafruit import dft
from sqlmodel import SQLModel
# Define Pydantic model for cleaned user data
class CleanUser(SQLModel):
name: str
email: str
@dft.parallelized_job(output_table=CleanUser, workers=2)
def clean_user_data(user_row: dict) -> CleanUser:
"""
Takes a single user dictionary, cleans it,
and returns a CleanUser Pydantic model instance.
Input data must be a list which can be chunked,
each entry corresponds to a user record to be cleaned.
"""
# The code will:
# 1. Distribute user_data items across 2 workers
# 2. Call the body of this function on each item in parallel
# 3. Collect all CleanUser objects
# 4. Convert to DataFrame and insert into CleanUserTable
...
return CleanUser(
name=user_row['name'].title(),
email=user_row['email'].lower()
)
assumiming that somewhere in the code you have written
dft.export(DIRTY_USER_DATA)
you can run the parallelized job by executing
dft run clean_user_data DIRTY_USER_DATA
🚀 Running parallelized job on 2 workers: clean_user_data
Processing...
[████████████████████████████████████████] 100% (1000/1000)
Worker 1: Processed 500 items in 12.3s
Worker 2: Processed 500 items in 11.8s
📝 Results:
├─ Total processed: 1,000 records
├─ Successful: 987 records
├─ Validation errors: 13 records (invalid email format)
└─ Processing time: 12.3s
💾 Database insertion:
├─ Table: cleanuser
├─ Rows inserted: 987
└─ Insertion time: 0.4s
✅ Job completed successfully!
Sample processed records:
┌─────────────┬──────────────────────┐
│ name │ email │
├─────────────┼──────────────────────┤
│ John Smith │ john.smith@email.com │
│ Jane Doe │ jane.doe@email.com │
│ Bob Johnson │ bob.johnson@email.com│
└─────────────┴──────────────────────┘
Key Parameters
output_table
The output schema the job will conform to.
workers
the number of workers that the job will be executed on