Stay up to date with our latest developments and upcoming features. This roadmap reflects our current priorities and may change based on user feedback and business needs.

In Development
Actively being built

Currently Building

1

PyJob

Define standalone Python functions with @pyjob. These can run independently or be composed in DAGs.

2

SQLJob

Define SQL logic directly in Python functions using @sqljob decorators.

Planned
On our near-term roadmap

Planned Features

1

Lineage Tracking

Track data dependencies and lineage for better data governance and auditability.

2

Backwards compatibility with DBT

Allow import of existing DBT projects into Datafruit for migration or hybrid use.

3

Automatic Impact Analysis

Automatically detect and report how schema changes affect jobs and dependencies.

# Required imports at the top of your dft.py
from sqlmodel import SQLModel, Field
from typing import Optional
from datetime import datetime

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(unique=True)
    email: str = Field(unique=True)
    full_name: Optional[str] = None
    created_at: datetime = Field(default_factory=datetime.utcnow)

Running dft plan or dft run shows the impact and prevents broken jobs:

Datafruit will perform the following actions:

~ Table: user
 - Remove column email (VARCHAR)
 - Remove column full_name (VARCHAR)

⚠️  The following jobs will be affected:

 user_summary
   └─ References user.email (being removed)
   └─ References user.full_name (being removed)

 email_report  
   └─ References user.email (being removed)

Plan saved to .dft/plan.json
 Cannot apply - fix job dependencies first
4

Incremental Execution

Process only new or changed data since the last run for efficient updates.

# The incremental_key specifies which column to use for tracking new records
# Datafruit will automatically track the maximum value between runs
@dft.sql_job(db, incremental=True, incremental_key="created_at")
def daily_user_metrics():
    """Only process records newer than last run"""
    return """
        -- This query will only process new records since the last run
        INSERT INTO user_daily_metrics
        SELECT 
            DATE(u.created_at) as date,
            COUNT(*) as new_users
        FROM users u
        -- {last_run_timestamp} is automatically replaced by Datafruit
        WHERE u.created_at > {last_run_timestamp}
        GROUP BY DATE(u.created_at)
    """

When executed, datafruit automatically:

  1. Looks up last successful run timestamp
  2. Substitutes {last_run_timestamp} in the SQL
  3. Only processes new data since last run
5

Advanced Job Scheduling

Support for scheduled jobs, retries, and event-based triggers.

# Schedule a job to run daily with automatic retries
# Will be executed once per day, with up to 3 retries on failure
@dft.sql_job(db, schedule="daily", retry=3)
def daily_report():
    return """
        -- This query runs daily to get the latest aggregates
        SELECT * FROM daily_aggregates 
        WHERE date = CURRENT_DATE
    """

# This job runs automatically after any schema changes
# Useful for keeping materialized views in sync
@dft.sql_job(db, trigger="after_schema_change")
def refresh_materialized_views():
    return """
        -- Refresh any materialized views that depend on changed tables
        REFRESH MATERIALIZED VIEW user_summary
    """

# This job runs when any of its data dependencies change
# Automatically detects which tables are referenced in the query
@dft.sql_job(db, trigger="on_dependency_change")
def user_derived_metrics():
    return """
        -- This will automatically run when the 'users' table changes
        SELECT user_id, calculated_score 
        FROM users
    """
6

Parameterized Jobs

Create reusable job templates with runtime parameters.

# Define a job with default parameters
# These can be overridden at runtime via command line
@dft.sql_job(db, parameters={"lookback_days": 30, "min_score": 0.5})
def parameterized_job():
    return """
        -- Use {parameter_name} syntax for parameters
        -- These are automatically escaped and formatted
        SELECT user_id, score
        FROM user_activity 
        WHERE created_at > CURRENT_DATE - INTERVAL '{lookback_days} days'
        AND score > {min_score}
    """

Override parameters at runtime:

dft run parameterized_job --lookback_days=7
7

Job Ignore Flag

Mark jobs as ignored to temporarily exclude them from execution.

# The @ignore decorator tells Datafruit to skip this job during normal execution
# Useful for one-time backfills or deprecated jobs that you want to keep for reference
@dft.ignore
@dft.sql_job(db)
def backfill_user_scores():
    """
    One-time backfill - already ran in production
    This job is kept for reference but won't run with dft run --all
    """
    return """
        -- This was a one-time update to calculate historical scores
        -- The job is now ignored but kept in the codebase for reference
        UPDATE users 
        SET score = calculate_historical_score(id)
        WHERE score IS NULL
    """
8

Running Python Jobs

(Useful for EL/ETL)

Insert data into to table by defining a function which returns a dataframe

@dft.py_job(output_table=User)
def process_user_events(events_data):
    """Loads ALL data into memory at once"""
    df = pd.DataFrame(events_data)  # Could be huge!
    
    # Process entire dataset in memory
    df['email'] = df['email'].str.lower()
    df = df[df['email'].str.contains('@')]
    
    return df  # ⚠️ Entire DataFrame held in memory until insertion

Insert data into the table in a streamed way by defining a function which yields a dataframe

@dft.py_job(output_table=User, streaming=True)
def process_user_events(events_data):
    """Processes and yields chunks"""
    for chunk in pd.read_csv(events_data, chunksize=1000):
        # Process small chunk
        chunk['email'] = chunk['email'].str.lower()
        chunk = chunk[chunk['email'].str.contains('@')]
        
        yield chunk  # ✅ Immediate insertion, chunk gets garbage collected

Insert data into the table in a parallelized way by defining a function which executes row by row on some input data

@dft.parallelized_job(output_table=UserScore, workers=4)
def process_user_events(user_data: dict) -> UserScore:
    """Input data must be a list which can be chunked, each entry in list should correspond to a row in the table."""
    user_id = user_data['id']
    score = complex_ml_model.predict(user_data['features'])
    category = categorize_user(score)
    
	# Return Pydantic model instance (guaranteed single row)
   # The decorator will:
   # 1. Distribute user_data items across 4 workers
   # 2. Call this function on each item in parallel  
   # 3. Collect all UserScore objects
   # 4. Convert to DataFrame and insert into UserScoreTable
    return UserScore(
        user_id=user_id,
        score=score,
        category=category
    )

to get the data accessible to run, somewhere in the config you would add USER_DATA to the exports

# not ran until the job is called
USER_DATA = load_massive_dataset()...

export([db], USER_DATA)

Then you would run

dft run process_user_events USER_DATA
9

Interactive Visualization

Display dependencies and execution flow in a graph view for navigation and debugging.

10

Asset Impact Tracking

Show downstream assets impacted by changes in any pipeline or view.

11

Object Storage Management

Manage object storage for data assets.

12

Iceberg/Parquet Support

Support for Iceberg and Parquet formats.

13

Automatic Backfill Planning

Compute which downstream views/jobs require recomputation