Stay up to date with our latest developments and upcoming features. This roadmap reflects our current priorities and may change based on user feedback and business needs.
Currently Building
PyJob
Define standalone Python functions with @pyjob
. These can run independently or be composed in DAGs.
SQLJob
Define SQL logic directly in Python functions using @sqljob
decorators.
Planned Features
Lineage Tracking
Track data dependencies and lineage for better data governance and auditability.
Backwards compatibility with DBT
Allow import of existing DBT projects into Datafruit for migration or hybrid use.
Automatic Impact Analysis
Automatically detect and report how schema changes affect jobs and dependencies.
# Required imports at the top of your dft.py
from sqlmodel import SQLModel, Field
from typing import Optional
from datetime import datetime
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(unique=True)
email: str = Field(unique=True)
full_name: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
Running dft plan
or dft run
shows the impact and prevents broken jobs:
Datafruit will perform the following actions:
~ Table: user
│ - Remove column email (VARCHAR)
│ - Remove column full_name (VARCHAR)
⚠️ The following jobs will be affected:
│ ❌ user_summary
│ └─ References user.email (being removed)
│ └─ References user.full_name (being removed)
│
│ ❌ email_report
│ └─ References user.email (being removed)
Plan saved to .dft/plan.json
❌ Cannot apply - fix job dependencies first
Incremental Execution
Process only new or changed data since the last run for efficient updates.
# The incremental_key specifies which column to use for tracking new records
# Datafruit will automatically track the maximum value between runs
@dft.sql_job(db, incremental=True, incremental_key="created_at")
def daily_user_metrics():
"""Only process records newer than last run"""
return """
-- This query will only process new records since the last run
INSERT INTO user_daily_metrics
SELECT
DATE(u.created_at) as date,
COUNT(*) as new_users
FROM users u
-- {last_run_timestamp} is automatically replaced by Datafruit
WHERE u.created_at > {last_run_timestamp}
GROUP BY DATE(u.created_at)
"""
When executed, datafruit automatically:
- Looks up last successful run timestamp
- Substitutes
{last_run_timestamp}
in the SQL
- Only processes new data since last run
Advanced Job Scheduling
Support for scheduled jobs, retries, and event-based triggers.
# Schedule a job to run daily with automatic retries
# Will be executed once per day, with up to 3 retries on failure
@dft.sql_job(db, schedule="daily", retry=3)
def daily_report():
return """
-- This query runs daily to get the latest aggregates
SELECT * FROM daily_aggregates
WHERE date = CURRENT_DATE
"""
# This job runs automatically after any schema changes
# Useful for keeping materialized views in sync
@dft.sql_job(db, trigger="after_schema_change")
def refresh_materialized_views():
return """
-- Refresh any materialized views that depend on changed tables
REFRESH MATERIALIZED VIEW user_summary
"""
# This job runs when any of its data dependencies change
# Automatically detects which tables are referenced in the query
@dft.sql_job(db, trigger="on_dependency_change")
def user_derived_metrics():
return """
-- This will automatically run when the 'users' table changes
SELECT user_id, calculated_score
FROM users
"""
Parameterized Jobs
Create reusable job templates with runtime parameters.
# Define a job with default parameters
# These can be overridden at runtime via command line
@dft.sql_job(db, parameters={"lookback_days": 30, "min_score": 0.5})
def parameterized_job():
return """
-- Use {parameter_name} syntax for parameters
-- These are automatically escaped and formatted
SELECT user_id, score
FROM user_activity
WHERE created_at > CURRENT_DATE - INTERVAL '{lookback_days} days'
AND score > {min_score}
"""
Override parameters at runtime:
dft run parameterized_job --lookback_days=7
Job Ignore Flag
Mark jobs as ignored to temporarily exclude them from execution.
# The @ignore decorator tells Datafruit to skip this job during normal execution
# Useful for one-time backfills or deprecated jobs that you want to keep for reference
@dft.ignore
@dft.sql_job(db)
def backfill_user_scores():
"""
One-time backfill - already ran in production
This job is kept for reference but won't run with dft run --all
"""
return """
-- This was a one-time update to calculate historical scores
-- The job is now ignored but kept in the codebase for reference
UPDATE users
SET score = calculate_historical_score(id)
WHERE score IS NULL
"""
Running Python Jobs
(Useful for EL/ETL)
Insert data into to table by defining a function which returns a dataframe
@dft.py_job(output_table=User)
def process_user_events(events_data):
"""Loads ALL data into memory at once"""
df = pd.DataFrame(events_data) # Could be huge!
# Process entire dataset in memory
df['email'] = df['email'].str.lower()
df = df[df['email'].str.contains('@')]
return df # ⚠️ Entire DataFrame held in memory until insertion
Insert data into the table in a streamed way by defining a function which yields a dataframe
@dft.py_job(output_table=User, streaming=True)
def process_user_events(events_data):
"""Processes and yields chunks"""
for chunk in pd.read_csv(events_data, chunksize=1000):
# Process small chunk
chunk['email'] = chunk['email'].str.lower()
chunk = chunk[chunk['email'].str.contains('@')]
yield chunk # ✅ Immediate insertion, chunk gets garbage collected
Insert data into the table in a parallelized way by defining a function which executes row by row on some input data
@dft.parallelized_job(output_table=UserScore, workers=4)
def process_user_events(user_data: dict) -> UserScore:
"""Input data must be a list which can be chunked, each entry in list should correspond to a row in the table."""
user_id = user_data['id']
score = complex_ml_model.predict(user_data['features'])
category = categorize_user(score)
# Return Pydantic model instance (guaranteed single row)
# The decorator will:
# 1. Distribute user_data items across 4 workers
# 2. Call this function on each item in parallel
# 3. Collect all UserScore objects
# 4. Convert to DataFrame and insert into UserScoreTable
return UserScore(
user_id=user_id,
score=score,
category=category
)
to get the data accessible to run, somewhere in the config you would add USER_DATA
to the exports
# not ran until the job is called
USER_DATA = load_massive_dataset()...
export([db], USER_DATA)
Then you would run
dft run process_user_events USER_DATA
Interactive Visualization
Display dependencies and execution flow in a graph view for navigation and debugging.
Asset Impact Tracking
Show downstream assets impacted by changes in any pipeline or view.
Object Storage Management
Manage object storage for data assets.
Iceberg/Parquet Support
Support for Iceberg and Parquet formats.
Automatic Backfill Planning
Compute which downstream views/jobs require recomputation