βœ… Installation

pip install datafruit

Step 1: Define Base Tables

Create a simple schema using Python classes. This file isn’t executedβ€”just declarative config.
# models.py
import datafruit as dft
from typing import Optional

class users(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    name: str
    email: str
    is_active: bool = True

class orders(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    user_id: int = dft.Field(foreign_key="users.id")
    amount: float
    status: str = "pending"

databases = {
    "warehouse": dft.PostgresDB("postgresql://localhost/warehouse", tables=[users, orders])
}

dft.export(databases=databases)
$ dft plan

Planning schema changes...
Target: warehouse

Datafruit will perform the following actions:

+ Table: orders
β”‚ + Add column id (INTEGER)
β”‚ + Add column user_id (INTEGER)
β”‚ + Add column amount (FLOAT)
β”‚ + Add column status (VARCHAR)

+ Table: users
β”‚ + Add column id (INTEGER)
β”‚ + Add column name (VARCHAR)
β”‚ + Add column email (VARCHAR)
β”‚ + Add column is_active (BOOLEAN)

βœ“ Plan saved to .dft/plan.json
Run 'dft apply' to apply these changes.
$ dft apply

Applying schema changes...
βœ“ Successfully applied changes to 'warehouse'

Step 2: Add Views and Models with @query

Use @query(db=..., type="") to define transformations:
materialization typemeaning
viewStored as a SQL view
tableMaterialized and written to a table
# models.py (continued)
db = databases["warehouse"]

@dft.query(db=db, type="view")
def active_users():
    return f"SELECT * FROM {users} WHERE is_active = true"

@dft.query(db=db, type="view")
def completed_orders():
    return f"""
    SELECT * FROM {orders}
    WHERE status = 'completed' AND amount > 0
    """

@dft.query(db=db, type="table")
def user_stats():
    return f"""
    SELECT 
        u.id AS user_id,
        COUNT(o.id) AS total_orders,
        SUM(o.amount) AS total_spent,
        AVG(o.amount) AS avg_order_value
    FROM {active_users} u
    LEFT JOIN {completed_orders} o ON u.id = o.user_id
    GROUP BY u.id
    """
And define the table schema for user_stats:
class user_stats(dft.Table):
    user_id: int = dft.Field(primary_key=True)
    total_orders: int
    total_spent: float
    avg_order_value: float

Step 3: Plan, Apply, Run

$ dft plan

Planning schema changes...
Target: warehouse

+ View: active_users
+ View: completed_orders
+ Table: user_stats
β”‚ + Add column user_id (INTEGER)
β”‚ + Add column total_orders (INTEGER)
β”‚ + Add column total_spent (FLOAT)
β”‚ + Add column avg_order_value (FLOAT)

βœ“ Plan saved to .dft/plan.json
$ dft apply

Applying schema changes...
βœ“ Created views: active_users, completed_orders
βœ“ Created table: user_stats
βœ“ Schema synced
$ dft run user_stats

β†’ Executing DAG:
   β€’ active_users (view)
   β€’ completed_orders (view)
   β€’ user_stats (table)

βœ“ user_stats completed (1,247 rows)
βœ“ Data written to table: user_stats

🧠 Auto-Suggest Missing Schema

If you forget the user_stats class:
$ dft plan

βœ“ Computation graph valid

⚠️ Table 'user_stats' is declared as persistent, but has no schema defined.

Suggested schema:

class user_stats(dft.Table):
    user_id: int = dft.Field(primary_key=True)
    total_orders: int
    total_spent: float
    avg_order_value: float

Add this to your models and re-run:
  dft plan
  dft apply

πŸ›‘ Safe by Default: Detecting Breakage

If you rename a column in users:
class users(dft.Table):
    id: Optional[int] = dft.Field(primary_key=True)
    name: str
    email_address: str  # renamed from 'email'
    is_active: bool = True
$ dft plan

~ Table: users
β”‚ ~ Rename: email β†’ email_address

BREAKING CHANGE DETECTED:

Γ— View 'active_users' references users.email
Γ— Query 'user_stats' depends on active_users

Fix downstream queries before applying.