The @sql_job decorator allows you to define data transformations as Python functions that return SQL queries. Datafruit handles the execution and materialization of these queries, allowing you to build complex data pipelines with simple, version-controlled SQL.

Basic Usage: Query to DataFrame

The simplest way to use @sql_job is to execute a query and get the results back as a Pandas DataFrame. This is useful for exploration, analysis, or as an input to a subsequent Python job.

To reference a table you’ve defined with SQLModel, use the {{ ref('ModelName') }} syntax, similar to dbt.

import datafruit as dft
from sqlmodel import Field, SQLModel
from dotenv import load_dotenv
import os
from typing import Optional
from datetime import datetime
import pandas as pd
load_dotenv()

# Assume 'users' model is defined as in the Quickstart guide
class users(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(unique=True)
    email: str = Field(unique=True)
    full_name: Optional[str] = None
    is_active: bool = Field(default=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)

db = dft.PostgresDB(os.getenv("PG_DB_URL") or "", [users])
dft.export([db])

# --- Define a SQL Job ---
@dft.sql_job(db=db)
def get_active_users():
    """Returns a count of active users."""
    return "SELECT COUNT(*) as active_user_count FROM {{ ref('users') }} WHERE is_active = true"

When you run this script, get_active_users() will execute the query against your database and active_users_df will be a Pandas DataFrame containing the result.

Materializing Results: Query to Table For production pipelines, you often want to save the results of a transformation back to the database. You can do this by providing an output_table to the decorator.

Datafruit will automatically create the output table if it doesn’t exist, based on the SQLModel you provide.

import datafruit as dft
from sqlmodel import Field, SQLModel
from dotenv import load_dotenv
import os
from typing import Optional
from datetime import datetime
load_dotenv()

# Define models for input and output
class users(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(unique=True)
    email: str = Field(unique=True)
    is_active: bool = Field(default=True)

class active_user_report(SQLModel, table=True):
    active_user_count: int

# Configure the database with both models
db = dft.PostgresDB(
    os.getenv("PG_DB_URL") or "",
    [users, active_user_report]
)
dft.export([db])

# --- Define the SQL Job ---
@dft.sql_job(db=db, output_table=active_user_report)
def create_active_user_report():
    """Counts active users and saves the result to the 'active_user_report' table."""
    return "SELECT COUNT(*) as active_user_count FROM {{ ref('users') }} WHERE is_active = true"

When this job runs, it performs an INSERT INTO active_user_report SELECT ... operation, materializing the results directly in your database and returning a status message.

Decorator Parameters

db

The PostgresDB instance to run the query against.

output_table

The SQLModel class representing the destination table. If provided, results are inserted into this table. If None, results are returned as a DataFrame.

Handling Schema Changes

When you modify your schema in a way that affects the requirements of your SQL job (e.g., renaming or removing columns/tables used in your queries), you’ll need to update your job accordingly. Here’s what happens:

  1. Schema Changes Detected: When you run dft plan, Datafruit will detect any schema changes that might affect your jobs.

  2. Job Validation: Before executing dft run, Datafruit will validate that all tables and columns referenced in your SQL queries exist in the current schema.

  3. Error Handling: If a job references a column or table that no longer exists, the job will fail with a clear error message indicating what’s missing.

  4. Updating Jobs: To fix the job:

    • Update your SQL query to reference the correct tables/columns
    • Run dft plan to verify the changes
    • Run dft apply to update the schema
    • Run your job with dft run job_name

Example of a breaking change:

# Old query (before schema change)
"""SELECT username, email FROM {{ ref('users') }}"""

# New query (after renaming 'email' to 'email_address')
"""SELECT username, email_address as email FROM {{ ref('users') }}"""

Always test your schema changes in a development environment before applying them to production.