Skip to content

Pipeline

Traditional data pipelines need Spark, YARN, and a cluster. QueryMode replaces all of that with TypeScript orchestration over Durable Objects and R2.

Each stage executes a query, writes intermediate results to a table, and feeds them to the next stage. The intermediate tables live in R2 — the same storage backing everything else. No separate shuffle service, no HDFS, no job scheduler.

import { Pipeline } from "querymode"
const result = await Pipeline.create(qm.table("events"))
.stage(df => df
.filter("type", "eq", "click")
.groupBy("page")
.aggregate("count", "*")
.aggregate("avg", "duration"))
.stage(df => df
.filter("count_*", "gt", 100)
.sort("avg_duration", "desc")
.limit(50))
.run()

Intermediate tables are auto-named (__pipe_{id}_stage_{n}) and auto-cleaned after run() completes — even if a stage fails.

const clicksByPage = await qm.table("events")
.filter("type", "eq", "click")
.groupBy("page")
.aggregate("count", "*")
.materializeAs("clicks_by_page")
const topPages = await clicksByPage
.filter("count_*", "gt", 100)
.sort("count_*", "desc")
.limit(50)
.collect()
// Clean up when done
await qm.table("clicks_by_page").dropTable()

You name the intermediate tables. You control when they’re cleaned up. Use this when you need to branch — run multiple downstream queries against the same intermediate.

3. pipe() — operator injection (single stage)

Section titled “3. pipe() — operator injection (single stage)”
const result = await qm.table("events")
.filter("created_at", "gte", startDate)
.pipe(upstream => new ComputedColumnOperator(upstream, [
{ alias: "risk_score", fn: row => riskModel.predict(row) },
]))
.sort("risk_score", "desc")
.limit(50)
.collect()

No intermediate table — the custom operator runs inline within a single query’s pipeline. Use this for per-row transforms like ML scoring, enrichment, or custom filtering.

materializeAs() supports branching — materialize once, query multiple ways:

// Shared intermediate
const dailyAgg = await qm.table("events")
.filter("date", "eq", "2026-03-11")
.groupBy("page", "country")
.aggregate("count", "*")
.materializeAs("daily_agg")
// Branch 1: top countries
const topCountries = await dailyAgg
.groupBy("country")
.aggregate("sum", "count_*")
.sort("sum_count_*", "desc")
.collect()
// Branch 2: anomalies
const anomalies = await dailyAgg
.filter("count_*", "gt", 10000)
.collect()
// Cleanup
await qm.table("daily_agg").dropTable()

Pass metadata through materializeAs() for pipeline observability:

const stage1 = await qm.table("events")
.filter("type", "eq", "click")
.materializeAs("clicks", {
metadata: {
pipeline: "daily-etl",
stage: "filter-clicks",
source: "events",
ttl: "24h",
},
})

Metadata is attached to the write operation and visible to any catalog layer reading the table’s manifest.

Each stage runs through the full QueryMode execution path:

  1. Fragment pruning — partition catalog + min/max stats skip irrelevant data
  2. Fan-out — Fragment DOs scan in parallel
  3. Hierarchical reduction — tree merge keeps memory bounded at any scale
  4. Write — results written to R2 as Lance fragments
  5. Next stage — reads the intermediate table like any other table

The intermediate tables are real tables — they benefit from the same pruning, caching, and parallel scan as source data.

SparkQueryMode Pipeline
SchedulerYARN / Mesos / K8sTypeScript
Shuffle storageHDFS / local diskR2
ComputeJVM executorsDurable Objects (WASM)
Cluster managementYesNo (serverless)
Configspark-defaults.conf, 100+ knobsZero config
LatencySeconds to minutes (job startup)Milliseconds (DO wake)
Cost at restCluster always runningZero (DOs hibernate)