Operators
Every query operation is a composable operator implementing the same interface:
interface Operator { next(): Promise<RowBatch | null> nextColumnar?(): Promise<ColumnarBatch | null> // optional columnar mode close(): Promise<void>}Pull-based: call next() to get the next batch of rows. Returns null when done.
Operator reference
Section titled “Operator reference”| Category | Operator | Description |
|---|---|---|
| Scan | ScanOperator (internal) | Page reads with prefetch, coalesced I/O, WASM SIMD filter |
| Filter | FilterOperator | eq, neq, gt, gte, lt, lte, in, not_in, between, not_between, like, not_like, is_null, is_not_null |
| Filter | SubqueryInOperator | Semi-join filter against a value set |
| Project | ProjectOperator | Column selection |
| Transform | ComputedColumnOperator | Arbitrary (row) => value per row |
| Aggregate | AggregateOperator | sum, avg, min, max, count, count_distinct, stddev, variance, median, percentile |
| Aggregate | WasmAggregateOperator (internal) | SIMD-accelerated numeric aggregates (int64, float64) |
| Sort | ExternalSortOperator | Disk-spilling merge sort with R2 spill |
| Sort | InMemorySortOperator | In-memory sort for small datasets |
| Sort | TopKOperator | Heap-based top-K without full sort |
| Join | HashJoinOperator | inner, left, right, full, cross with Grace hash spill |
| Window | WindowOperator | row_number, rank, dense_rank, lag, lead, rolling aggregates |
| Dedup | DistinctOperator | Hash-based deduplication on column set |
| Set | SetOperator | union, union_all, intersect, except |
| Limit | LimitOperator | Row limiting with offset |
Compose operators directly
Section titled “Compose operators directly”import { FilterOperator, AggregateOperator, TopKOperator, drainPipeline, type Operator, type RowBatch,} from "querymode"
// Your data source — any async batch producerconst source: Operator = { async next() { /* return RowBatch or null */ }, async close() {},}
// Chain operatorsconst filtered = new FilterOperator(source, [ { column: "age", op: "gt", value: 25 },])const aggregated = new AggregateOperator(filtered, { table: "users", filters: [], projections: [], groupBy: ["region"], aggregates: [{ fn: "sum", column: "amount", alias: "total" }],})// "HAVING" is just a filter after aggregationconst having = new FilterOperator(aggregated, [ { column: "total", op: "gt", value: 1000 },])const top10 = new TopKOperator(having, "total", true, 10)
const rows = await drainPipeline(top10)Memory-bounded with R2 spill
Section titled “Memory-bounded with R2 spill”Operators that accumulate state accept a memory budget. When exceeded, they spill to R2:
import { HashJoinOperator, ExternalSortOperator, R2SpillBackend} from "querymode"
const spill = new R2SpillBackend(env.DATA_BUCKET, "__spill/query-123")
// 64MB budget — spills to R2 via Grace hash partitioningconst join = new HashJoinOperator( left, right, "user_id", "id", "inner", 64 * 1024 * 1024, spill,)
// 64MB budget — spills to R2 via external merge sortconst sorted = new ExternalSortOperator( join, "created_at", true, 0, 64 * 1024 * 1024, spill,)
const rows = await drainPipeline(sorted)await spill.cleanup()Spill backends:
R2SpillBackend— Cloudflare R2 (edge)FsSpillBackend— local filesystem (Node/Bun)
WASM SIMD aggregates
Section titled “WASM SIMD aggregates”For numeric columns, the WasmAggregateOperator uses Zig SIMD vector instructions:
sumInt64()— Vec2i64 accumulationminInt64()/maxInt64()— SIMD min/max- Float64 aggregates via Vec4f64
The DataFrame API automatically selects WASM aggregates when available.
Columnar fast path (nextColumnar)
Section titled “Columnar fast path (nextColumnar)”Operators can optionally implement nextColumnar() alongside next() for zero-copy performance. When the pipeline detects a columnar-capable operator chain, it uses nextColumnar() to pass column data directly without creating Row[] objects.
import type { Operator, RowBatch, ColumnarBatch, DecodedValue } from "querymode"
class DoublingOperator implements Operator { constructor(private source: Operator, private column: string) {}
// Standard path — always required async next(): Promise<RowBatch | null> { const batch = await this.source.next() if (!batch) return null for (const row of batch) row[this.column] = (row[this.column] as number) * 2 return batch }
// Columnar path — optional, avoids Row[] creation async nextColumnar(): Promise<ColumnarBatch | null> { if (!this.source.nextColumnar) return null const batch = await this.source.nextColumnar() if (!batch) return null const col = batch.columns.get(this.column) if (col) { for (let i = 0; i < col.length; i++) { col[i] = ((col[i] as number) ?? 0) * 2 } } return batch }
async close() { await this.source.close() }}The pipeline ColumnarBatch (from operators.ts) uses Map<string, DecodedValue[]> for columns and an optional selection?: Uint32Array for post-filter row indices. This is distinct from the QMCB wire-format ColumnarBatch documented in Columnar Format — see that page for the distinction.
When to implement nextColumnar():
- Do: numeric transforms on large datasets (avoids
Row[]allocation) - Skip: operators that reshape data (joins, aggregates) or need random access across rows
Why composable operators matter
Section titled “Why composable operators matter”Traditional engines give you a fixed query language. You can’t put a window function before a join, run custom logic between pipeline stages, or swap the sort implementation.
With QueryMode, operators are building blocks. Your code assembles the pipeline, controls the memory budget, decides when to spill.
// ML scoring INSIDE the pipelineconst filtered = new FilterOperator(source, filters)const scored = new ComputedColumnOperator(filtered, [ { alias: "ml_score", fn: (row) => myModel.predict(row) },])const top10 = new TopKOperator(scored, "ml_score", true, 10)Pipe: inject custom stages into the DataFrame
Section titled “Pipe: inject custom stages into the DataFrame”The DataFrame API’s .pipe() method lets you inject any custom operator into the pipeline without leaving the fluent chain:
import { QueryMode, ComputedColumnOperator } from "querymode"
const qm = QueryMode.local()const result = await qm.table("events") .filter("created_at", "gte", "2024-01-01") .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "score", fn: row => scoreModel.predict(row) }, ])) .sort("score", "desc") .limit(10) .collect()Chain multiple .pipe() calls — each one wraps the previous stage:
const result = await qm.table("events") .filter("status", "eq", "active") .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "risk", fn: row => computeRisk(row) }, ])) .pipe(upstream => new FilterOperator(upstream, [ { column: "risk", op: "gt", value: 0.8 }, ])) .collect()Escape hatch: toOperator / fromOperator
Section titled “Escape hatch: toOperator / fromOperator”For full control, break out of the DataFrame into raw operators, then re-enter:
// Escape: get the raw Operator pipelineconst op = await qm.table("events") .filter("status", "eq", "active") .toOperator()
// Your code: wrap with custom operatorsconst scored = new ComputedColumnOperator(op, [ { alias: "score", fn: row => scoreModel.predict(row) },])const filtered = new FilterOperator(scored, [ { column: "score", op: "gt", value: 0.9 },])
// Re-enter: wrap back into a DataFrameconst result = await DataFrame.fromOperator(filtered, executor) .sort("score", "desc") .limit(10) .collect()Three levels of control:
- DataFrame chain — convenience methods for common operations
.pipe()— inject custom operators without leaving the chaintoOperator()/fromOperator()— full escape hatch for arbitrary composition