Skip to content

API Reference

The core module for handling HTTP requests and responses.

from pymode.workers import Request, Response, Headers, Env

Constructed automatically from the incoming HTTP request. Do not instantiate directly.

Property / MethodReturn TypeDescription
request.methodstrHTTP method (GET, POST, etc.)
request.urlstrFull request URL
request.pathstrURL path (parsed via urlparse)
request.querydict[str, list[str]]Query parameters (via parse_qs, values are lists)
request.headersHeadersCase-insensitive headers
request.text()strBody as UTF-8 string
request.json()dict | listBody parsed as JSON
request.bytes()bytesRaw body bytes
# Text response
Response("Hello", status=200, headers={"X-Custom": "value"})
# Auto-detects content type:
# str → text/plain
# bytes → application/octet-stream
# dict/list → application/json (auto-serialized)
Response({"key": "value"})
# Class methods
Response.json(data, status=200, headers=None)
Response.redirect(url, status=302)
ParameterTypeDefaultDescription
bodystr | bytes | dict | list""Response body
statusint200HTTP status code
headersdict | NoneautoResponse headers

Case-insensitive HTTP header container.

MethodReturn TypeDescription
headers.get(name, default=None)str | NoneGet header value (case-insensitive)
headers[name]strGet header (raises KeyError if missing)
name in headersboolCheck if header exists
headers.items()list[tuple]All (name, value) pairs
headers.keys()list[str]All header names
headers.values()list[str]All header values
headers.to_dict()dictConvert to plain dict

Attribute access returns binding objects or string values:

env.MY_KV # → KVBinding (name ends with _KV or is "KV")
env.MY_R2 # → R2Binding (name ends with _R2 or is "R2")
env.MY_DB # → D1Binding (name ends with _D1, _DB, or is "D1")
env.API_KEY # → str (environment variable / secret)

Binding type is auto-detected from the attribute name suffix.

MethodDescription
kv.get(key, type="text")Get value. type: "text" (str), "json" (dict), or "arrayBuffer" (bytes)
kv.put(key, value)Store value (str or bytes)
kv.delete(key)Delete key
MethodDescription
r2.get(key)Get object (returns bytes or None)
r2.put(key, value)Store object (str auto-encoded to bytes)
# Returns a D1Statement
stmt = db.prepare("SELECT * FROM users WHERE id = ?")
# Bind parameters and execute
rows = stmt.bind(42).all() # {"results": [...]}
row = stmt.bind(42).first() # First row or None
row = stmt.bind(42).first("name") # Single column from first row
stmt.bind(42).run() # Execute without returning rows

Outbound HTTP requests via WASM host imports.

from pymode.http import fetch, get, post

fetch(url, method=“GET”, headers=None, body=None)

Section titled “fetch(url, method=“GET”, headers=None, body=None)”

Performs an HTTP request. Suspends the WASM stack via Asyncify — synchronous from Python’s perspective.

ParameterTypeDefaultDescription
urlstrrequiredRequest URL
methodstr"GET"HTTP method
headersdict | NoneNoneRequest headers
bodystr | bytes | NoneNoneRequest body

Returns an HTTPResponse:

Property / MethodTypeDescription
resp.statusintHTTP status code
resp.headersdictResponse headers (lowercase keys)
resp.read(amt=-1)bytesRead body (all or amt bytes)
resp.getheader(name, default=None)str | NoneGet response header

Shorthand for fetch(url, method="GET", headers=headers).

Shorthand for fetch(url, method="POST", headers=headers, body=body).

Patches urllib.request.urlopen to route through PyMode’s HTTP bridge. Call once at startup if you need urllib compatibility.


Raw TCP connections via WASM host imports.

from pymode.tcp import PyModeSocket

Drop-in socket replacement for TCP connections. Connections persist within the Durable Object’s lifetime.

sock = PyModeSocket()
sock.connect(("db.example.com", 5432))
sock.send(b"HELLO\r\n")
data = sock.recv(4096)
sock.close()
MethodDescription
sock.connect((host, port))Open TCP connection
sock.send(data)Send bytes
sock.recv(bufsize)Receive up to bufsize bytes
sock.close()Close connection
sock.settimeout(seconds)Set timeout (None = blocking)
sock.makefile(mode, buffering)Wrap in file-like object

Patches the socket module so database drivers (psycopg2, etc.) use PyMode TCP transparently.


Real parallelism via child Durable Objects. Each task gets its own 30s CPU budget and 128MB memory.

from pymode.parallel import spawn, gather, map_parallel

Constraint: Arguments and return values must be picklable. Functions must be module-level (not lambdas or closures).

Spawn a function in a child DO. Returns a TaskHandle.

handle = spawn(process_data, my_list)
result = handle.join() # blocks until complete

Block until the task completes. Returns the function’s return value. Raises RuntimeError if the child raised an exception.

Run multiple independent tasks in parallel. Each task is (fn, args) or (fn, args, kwargs).

results = gather(
(fetch_users, [page]),
(compute_stats, [data]),
(send_email, [msg]),
)
# results[0] = fetch_users result, etc.

map_parallel(fn, items, max_concurrent=32)

Section titled “map_parallel(fn, items, max_concurrent=32)”

Map a function over items in parallel. Returns results in order.

results = map_parallel(process_item, items)
ParameterTypeDefaultDescription
fncallablerequiredFunction to apply (must be picklable)
itemsiterablerequiredItems to process
max_concurrentint32Max simultaneous child DOs

Durable multi-step workflows with retries and state persistence.

from pymode.workflows import Workflow

Create a named workflow. Steps execute sequentially.

@workflow.step / @workflow.step(retries=N, backoff=F, timeout=T)

Section titled “@workflow.step / @workflow.step(retries=N, backoff=F, timeout=T)”

Register a step function. Each step receives a StepContext.

app = Workflow("order-processing")
@app.step
def validate(ctx):
return {"valid": True}
@app.step(retries=3, backoff=2.0)
def charge(ctx):
prev = ctx.results["validate"]
return {"charged": True}
ParameterTypeDefaultDescription
retriesint0Max retry attempts on failure
backofffloat1.0Base backoff delay (exponential: backoff * 2^attempt)
timeoutfloat | NoneNoneStep timeout in seconds
PropertyTypeDescription
ctx.workflow_idstrUnique workflow execution ID
ctx.inputanyOriginal workflow input data
ctx.resultsdictResults from completed steps (keyed by step function name)
ctx.envEnvAccess to CF bindings

workflow.run(workflow_id, input_data, env, journal=None)

Section titled “workflow.run(workflow_id, input_data, env, journal=None)”

Execute the workflow. Pass journal from a previous run to resume from the last completed step.

Returns a WorkflowResult:

PropertyTypeDescription
result.workflow_idstrWorkflow execution ID
result.statusstr"completed" or "failed"
result.resultsdictAll step results
result.errordict | NoneError details if failed
result.to_dict()dictSerialize to JSON-compatible dict