While @sql_job is great for SQL-based transformations, some logic is too complex or better expressed in Python. The @py_job and @parallelized_job decorators are designed for these scenarios.

Python Jobs

(Useful for EL/ETL)

Insert data into to table by defining a function which returns a dataframe

@dft.py_job(output_table=User)
def process_user_events(events_data):
    """Loads ALL data into memory at once"""
    df = pd.DataFrame(events_data)  # Could be huge!
    
    # Process entire dataset in memory
    df['email'] = df['email'].str.lower()
    df = df[df['email'].str.contains('@')]
    
    return df  # ⚠️ Entire DataFrame held in memory until insertion

You could also insert data into the table in a more memory-efficient way by defining a generator which yields a dataframe

@dft.py_job(output_table=User)
def process_user_events(events_data):
    """Processes and yields chunks"""
    for chunk in pd.read_csv(events_data, chunksize=1000):
        # Process small chunk
        chunk['email'] = chunk['email'].str.lower()
        chunk = chunk[chunk['email'].str.contains('@')]
        
        yield chunk 

somewhere in the code you would need to export the data to the CLI

# not ran until the job is called
USER_DATA = load_massive_dataset()...

export(USER_DATA)

Then you would run

dft run process_user_events USER_DATA

Key parameters:

output_table

The output schema the job will conform to.

streaming

Default False. Whether or not the function will work on the entire data or return it as a generator.

Benefits of using Python / parallalized jobs

  1. Schema Validation: When you run dft plan, Datafruit checks if all referenced tables and columns exist in the current schema.
  2. Runtime Validation: If your job reads from or writes to database tables, Datafruit validates the schema at runtime before processing begins.
  3. Error Handling: If your job tries to access a missing column or table, it will fail with a clear error message indicating what’s missing.
  4. Updating Jobs: To update your job after schema changes:
    • Modify your function to work with the new schema
    • Update any SQL queries if using table_name or query parameters
    • If using a SQLModel for output, ensure it matches the new schema
    • Test with dft plan before applying changes
❌ Job execution failed: process_users

🔍 Schema Validation Error:
The job 'process_users' references columns that no longer exist in the schema:

Missing columns:
├─ user.email (referenced in job code)
└─ user.full_name (referenced in job code)

📋 Job code analysis:
   Line 3: df['email_domain'] = df['email'].str.split('@').str[1]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ❌ Column 'email' not found in User table schema

   Line 4: df['display_name'] = df['full_name'].fillna(df['username'])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ❌ Column 'full_name' not found in User table schema

⚠️  Job not executed - fix schema conflicts first

Always test your schema changes in a development environment before applying them to production.

Parallelized jobs

You write a simple Python function that processes a single row or record, and @parallelized_job handles the parallel execution across your entire dataset using Ray.

  • Row-Level Logic: You define the logic for one item, and Datafruit applies it to all.
  • Parallel Execution: Jobs run in parallel automatically, speeding up large transformations.
  • Flexible I/O: Read from and write to DataFrames or database tables seamlessly.

The most straightforward way to use @parallelized_job is to insert data row by row

import pandas as pd
from datafruit import dft
from sqlmodel import SQLModel

# Define Pydantic model for cleaned user data
class CleanUser(SQLModel):
    name: str
    email: str

@dft.parallelized_job(output_table=CleanUser, workers=2)
def clean_user_data(user_row: dict) -> CleanUser:
    """
    Takes a single user dictionary, cleans it,
    and returns a CleanUser Pydantic model instance.
    
    Input data must be a list which can be chunked, 
    each entry corresponds to a user record to be cleaned.
    """

    # The code will:
    # 1. Distribute user_data items across 2 workers
    # 2. Call the body of this function on each item in parallel
    # 3. Collect all CleanUser objects
    # 4. Convert to DataFrame and insert into CleanUserTable
    ...

    return CleanUser(
        name=user_row['name'].title(),
        email=user_row['email'].lower()
    )

assumiming that somewhere in the code you have written

dft.export(DIRTY_USER_DATA)

you can run the parallelized job by executing

dft run clean_user_data DIRTY_USER_DATA
🚀 Running parallelized job on 2 workers: clean_user_data

Processing...
[████████████████████████████████████████] 100% (1000/1000)

Worker 1: Processed 500 items in 12.3s
Worker 2: Processed 500 items in 11.8s

📝 Results:
   ├─ Total processed: 1,000 records
   ├─ Successful: 987 records  
   ├─ Validation errors: 13 records (invalid email format)
   └─ Processing time: 12.3s

💾 Database insertion:
   ├─ Table: cleanuser
   ├─ Rows inserted: 987
   └─ Insertion time: 0.4s

 Job completed successfully!

Sample processed records:
┌─────────────┬──────────────────────┐
 name email
├─────────────┼──────────────────────┤
 John Smith john.smith@email.com
 Jane Doe jane.doe@email.com
 Bob Johnson bob.johnson@email.com│
└─────────────┴──────────────────────┘

Key Parameters

output_table

The output schema the job will conform to.

workers

the number of workers that the job will be executed on