Bindings API
All Cloudflare bindings are available through the env object passed to your on_fetch handler. Behind the scenes, these use WASM host imports — direct calls from Python to the Cloudflare runtime with no JS interop overhead.
KV (Key-Value Storage)
Section titled “KV (Key-Value Storage)”def on_fetch(request, env): # Read value = env.MY_KV.get("key") # Returns bytes or None text = env.MY_KV.get("key", type="text") # Returns str data = env.MY_KV.get("key", type="json") # Returns dict/list
# Write env.MY_KV.put("key", "value") # String value env.MY_KV.put("key", b"\x00\x01\x02") # Binary value env.MY_KV.put("key", {"a": 1}, type="json") # JSON value
# Delete env.MY_KV.delete("key")Configure in pyproject.toml:
[tool.pymode.kv_namespaces]MY_KV = "your-kv-namespace-id"R2 (Object Storage)
Section titled “R2 (Object Storage)”def on_fetch(request, env): # Read obj = env.MY_R2.get("images/photo.jpg") if obj: data = obj.body # bytes size = obj.size # int etag = obj.etag # str
# Write env.MY_R2.put("images/photo.jpg", image_bytes)
# Write with metadata env.MY_R2.put("data.json", json_bytes, http_metadata={ "contentType": "application/json" })Configure in pyproject.toml:
[tool.pymode.r2_buckets]MY_R2 = "your-r2-bucket-name"D1 (SQL Database)
Section titled “D1 (SQL Database)”def on_fetch(request, env): # Query results = env.MY_DB.prepare( "SELECT * FROM users WHERE age > ?" ).bind(21).all()
for row in results: print(row["name"], row["age"])
# Insert env.MY_DB.prepare( "INSERT INTO users (name, age) VALUES (?, ?)" ).bind("Alice", 30).run()
# Batch env.MY_DB.batch([ env.MY_DB.prepare("INSERT INTO logs (msg) VALUES (?)").bind("event1"), env.MY_DB.prepare("INSERT INTO logs (msg) VALUES (?)").bind("event2"), ])Configure in pyproject.toml:
[tool.pymode.d1_databases]MY_DB = "your-d1-database-id"HTTP Fetch
Section titled “HTTP Fetch”Make outbound HTTP requests from your handler:
from pymode.http import fetch
def on_fetch(request, env): # GET request resp = fetch("https://api.example.com/data") data = resp.json()
# POST with body and headers resp = fetch("https://api.example.com/submit", method="POST", body=b'{"key": "value"}', headers={"Content-Type": "application/json"} )
print(resp.status) # 200 print(resp.headers) # dict print(resp.text) # strThe fetch() function uses Asyncify to suspend the WASM stack while waiting for the response. From Python’s perspective, it’s a synchronous call.
TCP Sockets
Section titled “TCP Sockets”Raw TCP connections for database drivers and custom protocols:
from pymode.tcp import connect
def on_fetch(request, env): # Connect to a database sock = connect("my-database.example.com", 5432)
# Send data sock.send(b"HELLO\r\n")
# Receive data data = sock.recv(4096)
# Close sock.close()TCP connections persist within the Durable Object’s lifetime, enabling connection pooling across requests routed to the same DO.
Environment Variables
Section titled “Environment Variables”Access environment variables and secrets:
def on_fetch(request, env): api_key = env.API_KEY # From wrangler secrets or .dev.vars debug = env.DEBUG_MODE # String value
if not api_key: return Response("API key not configured", status=500)Set secrets:
# Local developmentecho "API_KEY=sk-123" >> .dev.vars
# Productionwrangler secret put API_KEYParallel Execution
Section titled “Parallel Execution”Spawn child Durable Objects for CPU-intensive work:
from pymode.parallel import spawn, gather
def on_fetch(request, env): # Spawn individual tasks (each gets 30s CPU, 128MB memory) task1 = spawn(process_chunk, data[:1000]) task2 = spawn(process_chunk, data[1000:]) result1 = task1.join() result2 = task2.join()
# Or use gather for multiple independent tasks results = gather( (process_chunk, [data[:1000]]), (process_chunk, [data[1000:]]), )
return Response.json({"results": results})
def process_chunk(data): # Runs in a separate Durable Object return [transform(item) for item in data]Note: Functions passed to
spawnandgathermust be picklable (module-level functions, not lambdas). See API Reference for full details.
Durable Workflows
Section titled “Durable Workflows”Multi-step workflows with retries and state persistence:
from pymode.workflows import Workflow
workflow = Workflow("order-processing")
@workflow.step(retries=3, backoff=2.0)def validate(ctx): order = ctx.input if not order.get("items"): raise ValueError("No items in order") return {"valid": True, "total": sum(i["price"] for i in order["items"])}
@workflow.step(retries=3, backoff=2.0)def charge(ctx): total = ctx.results["validate"]["total"] # Process payment... return {"charged": True, "amount": total}
@workflow.step()def fulfill(ctx): # Ship order... return {"shipped": True}
def on_fetch(request, env): order = request.json() result = workflow.run(order) return Response.json(result)