Composability
Operators are Unix pipes
Section titled “Operators are Unix pipes”Every operator in QueryMode implements one interface:
interface Operator { next(): Promise<RowBatch | null> nextColumnar?(): Promise<ColumnarBatch | null> // optional columnar mode close(): Promise<void>}Each operator reads from upstream and writes downstream. Wrap one operator around another — that’s composition. Same idea as cat | grep | sort | uniq, except the data is columnar row batches instead of text lines.
ScanOperator → FilterOperator → YourCustomOperator → AggregateOperator → ProjectOperatorThree levels of control
Section titled “Three levels of control”1. DataFrame chain
Section titled “1. DataFrame chain”The DataFrame API handles the common case. Filter, aggregate, sort, join — all without thinking about operators:
const result = await qm.table("events") .filter("status", "eq", "active") .groupBy("region") .aggregate("sum", "amount", "total") .sort("total", "desc") .limit(10) .collect()Under the hood, this builds a pipeline of operators. You don’t see them, but they’re there — doing page-level skip, SIMD decode, partial aggregation.
2. .pipe() — inject custom operators mid-chain
Section titled “2. .pipe() — inject custom operators mid-chain”When you need something the built-in methods don’t cover, .pipe() lets you inject any operator without leaving the chain:
const result = await qm.table("events") .filter("created_at", "gte", "2024-01-01") .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "risk_score", fn: row => riskModel.predict(row) }, ])) .pipe(upstream => new FilterOperator(upstream, [ { column: "risk_score", op: "gt", value: 0.8 }, ])) .sort("risk_score", "desc") .limit(50) .collect()ML scoring inside the query pipeline. No round-trip to a separate service, no serialization. The model runs on the same row batch that the filter just produced.
3. toOperator() / fromOperator() — full escape hatch
Section titled “3. toOperator() / fromOperator() — full escape hatch”For maximum control, break out of the DataFrame entirely:
// Escape: DataFrame → raw Operatorconst op = await qm.table("events") .filter("status", "eq", "active") .toOperator()
// Your code: arbitrary operator compositionconst enriched = new ComputedColumnOperator(op, [ { alias: "score", fn: row => score(row) },])const deduped = new DistinctOperator(enriched, ["user_id"])
// Re-enter: raw Operator → DataFrameconst result = await DataFrame.fromOperator(deduped, executor) .sort("score", "desc") .limit(10) .collect()You left the DataFrame, did whatever you wanted with raw operators, and came back. The DataFrame doesn’t know or care what happened in between.
Write your own operators
Section titled “Write your own operators”Any object that implements next() and close() is an operator:
class SamplingOperator implements Operator { constructor( private upstream: Operator, private rate: number, ) {}
async next(): Promise<RowBatch | null> { const batch = await this.upstream.next() if (!batch) return null return batch.filter(() => Math.random() < this.rate) }
async close() { await this.upstream.close() }}
// Use itconst result = await qm.table("events") .filter("status", "eq", "active") .pipe(upstream => new SamplingOperator(upstream, 0.1)) .collect()No framework, no plugin system, no registration. It’s a two-method interface.
Why this matters
Section titled “Why this matters”Traditional query engines are black boxes. You send a SQL string, get rows back. You can’t put a rate limiter between the scan and the filter. You can’t run ML scoring before the aggregation. You can’t swap the sort algorithm based on the data size.
With QueryMode, the pipeline is your code. The operators do real query engine work — page-level skip, SIMD decode, memory-bounded spill — but you control how they’re assembled. That’s the difference between using a query engine and building with one.