Operators
Every query operation is a composable operator implementing the same interface:
interface Operator { next(): Promise<RowBatch | null> close(): Promise<void>}Pull-based: call next() to get the next batch of rows. Returns null when done.
Operator reference
Section titled “Operator reference”| Category | Operator | Description |
|---|---|---|
| Filter | FilterOperator | eq, neq, gt, gte, lt, lte, in |
| Filter | SubqueryInOperator | Semi-join filter against a value set |
| Project | ProjectOperator | Column selection |
| Transform | ComputedColumnOperator | Arbitrary (row) => value per row |
| Aggregate | AggregateOperator | sum, avg, min, max, count, count_distinct, stddev, variance, median, percentile |
| Sort | ExternalSortOperator | Disk-spilling merge sort with R2 spill |
| Sort | InMemorySortOperator | In-memory sort for small datasets |
| Sort | TopKOperator | Heap-based top-K without full sort |
| Join | HashJoinOperator | inner, left, right, full, cross with Grace hash spill |
| Window | WindowOperator | row_number, rank, dense_rank, lag, lead, rolling aggregates |
| Dedup | DistinctOperator | Hash-based deduplication on column set |
| Set | SetOperator | union, union_all, intersect, except |
| Limit | LimitOperator | Row limiting with offset |
Compose operators directly
Section titled “Compose operators directly”import { FilterOperator, AggregateOperator, TopKOperator, drainPipeline, type Operator, type RowBatch,} from "querymode"
// Your data source — any async batch producerconst source: Operator = { async next() { /* return RowBatch or null */ }, async close() {},}
// Chain operatorsconst filtered = new FilterOperator(source, [ { column: "age", op: "gt", value: 25 },])const aggregated = new AggregateOperator(filtered, { table: "users", filters: [], projections: [], groupBy: ["region"], aggregates: [{ fn: "sum", column: "amount", alias: "total" }],})// "HAVING" is just a filter after aggregationconst having = new FilterOperator(aggregated, [ { column: "total", op: "gt", value: 1000 },])const top10 = new TopKOperator(having, "total", true, 10)
const rows = await drainPipeline(top10)Memory-bounded with R2 spill
Section titled “Memory-bounded with R2 spill”Operators that accumulate state accept a memory budget. When exceeded, they spill to R2:
import { HashJoinOperator, ExternalSortOperator, R2SpillBackend} from "querymode"
const spill = new R2SpillBackend(env.DATA_BUCKET, "__spill/query-123")
// 32MB budget — spills to R2 via Grace hash partitioningconst join = new HashJoinOperator( left, right, "user_id", "id", "inner", 32 * 1024 * 1024, spill,)
// 32MB budget — spills to R2 via external merge sortconst sorted = new ExternalSortOperator( join, "created_at", true, 0, 32 * 1024 * 1024, spill,)
const rows = await drainPipeline(sorted)await spill.cleanup()Spill backends:
R2SpillBackend— Cloudflare R2 (edge)FsSpillBackend— local filesystem (Node/Bun)
WASM SIMD aggregates
Section titled “WASM SIMD aggregates”For numeric columns, the WasmAggregateOperator uses Zig SIMD vector instructions:
sumInt64()— Vec2i64 accumulationminInt64()/maxInt64()— SIMD min/max- Float64 aggregates via Vec4f64
The DataFrame API automatically selects WASM aggregates when available.
Why composable operators matter
Section titled “Why composable operators matter”Traditional engines give you a fixed query language. You can’t put a window function before a join, run custom logic between pipeline stages, or swap the sort implementation.
With QueryMode, operators are building blocks. Your code assembles the pipeline, controls the memory budget, decides when to spill.
// ML scoring INSIDE the pipelineconst filtered = new FilterOperator(source, filters)const scored = new ComputedColumnOperator(filtered, [ { alias: "ml_score", fn: (row) => myModel.predict(row) },])const top10 = new TopKOperator(scored, "ml_score", true, 10)Pipe: inject custom stages into the DataFrame
Section titled “Pipe: inject custom stages into the DataFrame”The DataFrame API’s .pipe() method lets you inject any custom operator into the pipeline without leaving the fluent chain:
import { QueryMode, ComputedColumnOperator } from "querymode"
const qm = QueryMode.local()const result = await qm.table("events") .filter("created_at", "gte", "2024-01-01") .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "score", fn: row => scoreModel.predict(row) }, ])) .sort("score", "desc") .limit(10) .collect()Chain multiple .pipe() calls — each one wraps the previous stage:
const result = await qm.table("events") .filter("status", "eq", "active") .pipe(upstream => new ComputedColumnOperator(upstream, [ { alias: "risk", fn: row => computeRisk(row) }, ])) .pipe(upstream => new FilterOperator(upstream, [ { column: "risk", op: "gt", value: 0.8 }, ])) .collect()Escape hatch: toOperator / fromOperator
Section titled “Escape hatch: toOperator / fromOperator”For full control, break out of the DataFrame into raw operators, then re-enter:
// Escape: get the raw Operator pipelineconst op = await qm.table("events") .filter("status", "eq", "active") .toOperator()
// Your code: wrap with custom operatorsconst scored = new ComputedColumnOperator(op, [ { alias: "score", fn: row => scoreModel.predict(row) },])const filtered = new FilterOperator(scored, [ { column: "score", op: "gt", value: 0.9 },])
// Re-enter: wrap back into a DataFrameconst result = await DataFrame.fromOperator(filtered, executor) .sort("score", "desc") .limit(10) .collect()Three levels of control:
- DataFrame chain — convenience methods for common operations
.pipe()— inject custom operators without leaving the chaintoOperator()/fromOperator()— full escape hatch for arbitrary composition