Skip to content

Bindings API

All Cloudflare bindings are available through the env object passed to your on_fetch handler. Behind the scenes, these use WASM host imports — direct calls from Python to the Cloudflare runtime with no JS interop overhead.

def on_fetch(request, env):
# Read
value = env.MY_KV.get("key") # Returns bytes or None
text = env.MY_KV.get("key", type="text") # Returns str
data = env.MY_KV.get("key", type="json") # Returns dict/list
# Write
env.MY_KV.put("key", "value") # String value
env.MY_KV.put("key", b"\x00\x01\x02") # Binary value
env.MY_KV.put("key", {"a": 1}, type="json") # JSON value
# Delete
env.MY_KV.delete("key")

Configure in pyproject.toml:

[tool.pymode.kv_namespaces]
MY_KV = "your-kv-namespace-id"
def on_fetch(request, env):
# Read
obj = env.MY_R2.get("images/photo.jpg")
if obj:
data = obj.body # bytes
size = obj.size # int
etag = obj.etag # str
# Write
env.MY_R2.put("images/photo.jpg", image_bytes)
# Write with metadata
env.MY_R2.put("data.json", json_bytes, http_metadata={
"contentType": "application/json"
})

Configure in pyproject.toml:

[tool.pymode.r2_buckets]
MY_R2 = "your-r2-bucket-name"
def on_fetch(request, env):
# Query
results = env.MY_DB.prepare(
"SELECT * FROM users WHERE age > ?"
).bind(21).all()
for row in results:
print(row["name"], row["age"])
# Insert
env.MY_DB.prepare(
"INSERT INTO users (name, age) VALUES (?, ?)"
).bind("Alice", 30).run()
# Batch
env.MY_DB.batch([
env.MY_DB.prepare("INSERT INTO logs (msg) VALUES (?)").bind("event1"),
env.MY_DB.prepare("INSERT INTO logs (msg) VALUES (?)").bind("event2"),
])

Configure in pyproject.toml:

[tool.pymode.d1_databases]
MY_DB = "your-d1-database-id"

Make outbound HTTP requests from your handler:

from pymode.http import fetch
def on_fetch(request, env):
# GET request
resp = fetch("https://api.example.com/data")
data = resp.json()
# POST with body and headers
resp = fetch("https://api.example.com/submit",
method="POST",
body=b'{"key": "value"}',
headers={"Content-Type": "application/json"}
)
print(resp.status) # 200
print(resp.headers) # dict
print(resp.text) # str

The fetch() function uses Asyncify to suspend the WASM stack while waiting for the response. From Python’s perspective, it’s a synchronous call.

Raw TCP connections for database drivers and custom protocols:

from pymode.tcp import connect
def on_fetch(request, env):
# Connect to a database
sock = connect("my-database.example.com", 5432)
# Send data
sock.send(b"HELLO\r\n")
# Receive data
data = sock.recv(4096)
# Close
sock.close()

TCP connections persist within the Durable Object’s lifetime, enabling connection pooling across requests routed to the same DO.

Access environment variables and secrets:

def on_fetch(request, env):
api_key = env.API_KEY # From wrangler secrets or .dev.vars
debug = env.DEBUG_MODE # String value
if not api_key:
return Response("API key not configured", status=500)

Set secrets:

Terminal window
# Local development
echo "API_KEY=sk-123" >> .dev.vars
# Production
wrangler secret put API_KEY

Spawn child Durable Objects for CPU-intensive work:

from pymode.parallel import spawn, gather
def on_fetch(request, env):
# Spawn individual tasks (each gets 30s CPU, 128MB memory)
task1 = spawn(process_chunk, data[:1000])
task2 = spawn(process_chunk, data[1000:])
result1 = task1.join()
result2 = task2.join()
# Or use gather for multiple independent tasks
results = gather(
(process_chunk, [data[:1000]]),
(process_chunk, [data[1000:]]),
)
return Response.json({"results": results})
def process_chunk(data):
# Runs in a separate Durable Object
return [transform(item) for item in data]

Note: Functions passed to spawn and gather must be picklable (module-level functions, not lambdas). See API Reference for full details.

Multi-step workflows with retries and state persistence:

from pymode.workflows import Workflow
workflow = Workflow("order-processing")
@workflow.step(retries=3, backoff=2.0)
def validate(ctx):
order = ctx.input
if not order.get("items"):
raise ValueError("No items in order")
return {"valid": True, "total": sum(i["price"] for i in order["items"])}
@workflow.step(retries=3, backoff=2.0)
def charge(ctx):
total = ctx.results["validate"]["total"]
# Process payment...
return {"charged": True, "amount": total}
@workflow.step()
def fulfill(ctx):
# Ship order...
return {"shipped": True}
def on_fetch(request, env):
order = request.json()
result = workflow.run(order)
return Response.json(result)