Pipeline
The Spark replacement
Section titled “The Spark replacement”Traditional data pipelines need Spark, YARN, and a cluster. QueryMode replaces all of that with TypeScript orchestration over Durable Objects and R2.
Each stage executes a query, writes intermediate results to a table, and feeds them to the next stage. The intermediate tables live in R2 — the same storage backing everything else. No separate shuffle service, no HDFS, no job scheduler.
Three ways to chain stages
Section titled “Three ways to chain stages”1. Pipeline class — auto-managed
Section titled “1. Pipeline class — auto-managed”import { Pipeline } from "querymode"
const result = await Pipeline.create(qm.table("events")) .stage(df => df .filter("type", "eq", "click") .groupBy("page") .aggregate("count", "*") .aggregate("avg", "duration")) .stage(df => df .filter("count_*", "gt", 100) .sort("avg_duration", "desc") .limit(50)) .run()Intermediate tables are auto-named (__pipe_{id}_stage_{n}) and auto-cleaned after run() completes — even if a stage fails.
2. materializeAs() — manual control
Section titled “2. materializeAs() — manual control”const clicksByPage = await qm.table("events") .filter("type", "eq", "click") .groupBy("page") .aggregate("count", "*") .materializeAs("clicks_by_page")
const topPages = await clicksByPage .filter("count_*", "gt", 100) .sort("count_*", "desc") .limit(50) .collect()
// Clean up when doneawait qm.table("clicks_by_page").dropTable()You name the intermediate tables. You control when they’re cleaned up. Use this when you need to branch — run multiple downstream queries against the same intermediate.
3. pipe() — operator injection (single stage)
Section titled “3. pipe() — operator injection (single stage)”const result = await qm.table("events") .filter("created_at", "gte", startDate) .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "risk_score", fn: row => riskModel.predict(row) }, ])) .sort("risk_score", "desc") .limit(50) .collect()No intermediate table — the custom operator runs inline within a single query’s pipeline. Use this for per-row transforms like ML scoring, enrichment, or custom filtering.
Branching DAGs
Section titled “Branching DAGs”materializeAs() supports branching — materialize once, query multiple ways:
// Shared intermediateconst dailyAgg = await qm.table("events") .filter("date", "eq", "2026-03-11") .groupBy("page", "country") .aggregate("count", "*") .materializeAs("daily_agg")
// Branch 1: top countriesconst topCountries = await dailyAgg .groupBy("country") .aggregate("sum", "count_*") .sort("sum_count_*", "desc") .collect()
// Branch 2: anomaliesconst anomalies = await dailyAgg .filter("count_*", "gt", 10000) .collect()
// Cleanupawait qm.table("daily_agg").dropTable()Lineage tracking
Section titled “Lineage tracking”Pass metadata through materializeAs() for pipeline observability:
const stage1 = await qm.table("events") .filter("type", "eq", "click") .materializeAs("clicks", { metadata: { pipeline: "daily-etl", stage: "filter-clicks", source: "events", ttl: "24h", }, })Metadata is attached to the write operation and visible to any catalog layer reading the table’s manifest.
How it scales
Section titled “How it scales”Each stage runs through the full QueryMode execution path:
- Fragment pruning — partition catalog + min/max stats skip irrelevant data
- Fan-out — Fragment DOs scan in parallel
- Hierarchical reduction — tree merge keeps memory bounded at any scale
- Write — results written to R2 as Lance fragments
- Next stage — reads the intermediate table like any other table
The intermediate tables are real tables — they benefit from the same pruning, caching, and parallel scan as source data.
Comparison
Section titled “Comparison”| Spark | QueryMode Pipeline | |
|---|---|---|
| Scheduler | YARN / Mesos / K8s | TypeScript |
| Shuffle storage | HDFS / local disk | R2 |
| Compute | JVM executors | Durable Objects (WASM) |
| Cluster management | Yes | No (serverless) |
| Config | spark-defaults.conf, 100+ knobs | Zero config |
| Latency | Seconds to minutes (job startup) | Milliseconds (DO wake) |
| Cost at rest | Cluster always running | Zero (DOs hibernate) |