Skip to content

Operators

Every query operation is a composable operator implementing the same interface:

interface Operator {
next(): Promise<RowBatch | null>
close(): Promise<void>
}

Pull-based: call next() to get the next batch of rows. Returns null when done.

CategoryOperatorDescription
FilterFilterOperatoreq, neq, gt, gte, lt, lte, in
FilterSubqueryInOperatorSemi-join filter against a value set
ProjectProjectOperatorColumn selection
TransformComputedColumnOperatorArbitrary (row) => value per row
AggregateAggregateOperatorsum, avg, min, max, count, count_distinct, stddev, variance, median, percentile
SortExternalSortOperatorDisk-spilling merge sort with R2 spill
SortInMemorySortOperatorIn-memory sort for small datasets
SortTopKOperatorHeap-based top-K without full sort
JoinHashJoinOperatorinner, left, right, full, cross with Grace hash spill
WindowWindowOperatorrow_number, rank, dense_rank, lag, lead, rolling aggregates
DedupDistinctOperatorHash-based deduplication on column set
SetSetOperatorunion, union_all, intersect, except
LimitLimitOperatorRow limiting with offset
import {
FilterOperator, AggregateOperator, TopKOperator,
drainPipeline, type Operator, type RowBatch,
} from "querymode"
// Your data source — any async batch producer
const source: Operator = {
async next() { /* return RowBatch or null */ },
async close() {},
}
// Chain operators
const filtered = new FilterOperator(source, [
{ column: "age", op: "gt", value: 25 },
])
const aggregated = new AggregateOperator(filtered, {
table: "users", filters: [], projections: [],
groupBy: ["region"],
aggregates: [{ fn: "sum", column: "amount", alias: "total" }],
})
// "HAVING" is just a filter after aggregation
const having = new FilterOperator(aggregated, [
{ column: "total", op: "gt", value: 1000 },
])
const top10 = new TopKOperator(having, "total", true, 10)
const rows = await drainPipeline(top10)

Operators that accumulate state accept a memory budget. When exceeded, they spill to R2:

import {
HashJoinOperator, ExternalSortOperator, R2SpillBackend
} from "querymode"
const spill = new R2SpillBackend(env.DATA_BUCKET, "__spill/query-123")
// 32MB budget — spills to R2 via Grace hash partitioning
const join = new HashJoinOperator(
left, right, "user_id", "id", "inner",
32 * 1024 * 1024, spill,
)
// 32MB budget — spills to R2 via external merge sort
const sorted = new ExternalSortOperator(
join, "created_at", true, 0,
32 * 1024 * 1024, spill,
)
const rows = await drainPipeline(sorted)
await spill.cleanup()

Spill backends:

  • R2SpillBackend — Cloudflare R2 (edge)
  • FsSpillBackend — local filesystem (Node/Bun)

For numeric columns, the WasmAggregateOperator uses Zig SIMD vector instructions:

  • sumInt64() — Vec2i64 accumulation
  • minInt64() / maxInt64() — SIMD min/max
  • Float64 aggregates via Vec4f64

The DataFrame API automatically selects WASM aggregates when available.

Traditional engines give you a fixed query language. You can’t put a window function before a join, run custom logic between pipeline stages, or swap the sort implementation.

With QueryMode, operators are building blocks. Your code assembles the pipeline, controls the memory budget, decides when to spill.

// ML scoring INSIDE the pipeline
const filtered = new FilterOperator(source, filters)
const scored = new ComputedColumnOperator(filtered, [
{ alias: "ml_score", fn: (row) => myModel.predict(row) },
])
const top10 = new TopKOperator(scored, "ml_score", true, 10)

Pipe: inject custom stages into the DataFrame

Section titled “Pipe: inject custom stages into the DataFrame”

The DataFrame API’s .pipe() method lets you inject any custom operator into the pipeline without leaving the fluent chain:

import { QueryMode, ComputedColumnOperator } from "querymode"
const qm = QueryMode.local()
const result = await qm.table("events")
.filter("created_at", "gte", "2024-01-01")
.pipe(upstream => new ComputedColumnOperator(upstream, [
{ alias: "score", fn: row => scoreModel.predict(row) },
]))
.sort("score", "desc")
.limit(10)
.collect()

Chain multiple .pipe() calls — each one wraps the previous stage:

const result = await qm.table("events")
.filter("status", "eq", "active")
.pipe(upstream => new ComputedColumnOperator(upstream, [
{ alias: "risk", fn: row => computeRisk(row) },
]))
.pipe(upstream => new FilterOperator(upstream, [
{ column: "risk", op: "gt", value: 0.8 },
]))
.collect()

For full control, break out of the DataFrame into raw operators, then re-enter:

// Escape: get the raw Operator pipeline
const op = await qm.table("events")
.filter("status", "eq", "active")
.toOperator()
// Your code: wrap with custom operators
const scored = new ComputedColumnOperator(op, [
{ alias: "score", fn: row => scoreModel.predict(row) },
])
const filtered = new FilterOperator(scored, [
{ column: "score", op: "gt", value: 0.9 },
])
// Re-enter: wrap back into a DataFrame
const result = await DataFrame.fromOperator(filtered, executor)
.sort("score", "desc")
.limit(10)
.collect()

Three levels of control:

  1. DataFrame chain — convenience methods for common operations
  2. .pipe() — inject custom operators without leaving the chain
  3. toOperator() / fromOperator() — full escape hatch for arbitrary composition