SQL Jobs
Use the @sql_job decorator to define data transformations using familiar SQL, right within your Python code.
The @sql_job
decorator allows you to define data transformations as Python functions that return SQL queries. Datafruit handles the execution and materialization of these queries, allowing you to build complex data pipelines with simple, version-controlled SQL.
Basic Usage: Query to DataFrame
The simplest way to use @sql_job
is to execute a query and get the results back as a Pandas DataFrame. This is useful for exploration, analysis, or as an input to a subsequent Python job.
To reference a table you’ve defined with SQLModel
, use the {{ ref('ModelName') }}
syntax, similar to dbt.
When you run this script, get_active_users()
will execute the query against your database and active_users_df
will be a Pandas DataFrame containing the result.
Materializing Results: Query to Table For production pipelines, you often want to save the results of a transformation back to the database. You can do this by providing an output_table to the decorator.
Datafruit will automatically create the output table if it doesn’t exist, based on the SQLModel you provide.
When this job runs, it performs an INSERT INTO active_user_report SELECT ...
operation, materializing the results directly in your database and returning a status message.
Decorator Parameters
db
The PostgresDB instance to run the query against.
output_table
The SQLModel class representing the destination table. If provided, results are inserted into this table. If None, results are returned as a DataFrame.
Handling Schema Changes
When you modify your schema in a way that affects the requirements of your SQL job (e.g., renaming or removing columns/tables used in your queries), you’ll need to update your job accordingly. Here’s what happens:
-
Schema Changes Detected: When you run
dft plan
, Datafruit will detect any schema changes that might affect your jobs. -
Job Validation: Before executing
dft run
, Datafruit will validate that all tables and columns referenced in your SQL queries exist in the current schema. -
Error Handling: If a job references a column or table that no longer exists, the job will fail with a clear error message indicating what’s missing.
-
Updating Jobs: To fix the job:
- Update your SQL query to reference the correct tables/columns
- Run
dft plan
to verify the changes - Run
dft apply
to update the schema - Run your job with
dft run job_name
Example of a breaking change:
Always test your schema changes in a development environment before applying them to production.