Wrenn Python SDK
Python client for the Wrenn microVM code execution platform. Create isolated capsules, execute commands, manage files, run interactive terminals, and execute persistent code — all from Python.
Installation
pip install wrenn
Requires Python 3.13+.
Quick Start
from wrenn import WrennClient
client = WrennClient(api_key="wrn_your_api_key_here")
# Create a capsule and run a command
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60)
result = cap.exec("echo", args=["hello world"])
print(result.stdout) # "hello world"
print(result.exit_code) # 0
Authentication
The SDK supports two authentication methods:
# API key
client = WrennClient(api_key="wrn_...")
# JWT token
client = WrennClient(token="eyJ...")
You can obtain an API key via the dashboard or create one programmatically:
with WrennClient(token="jwt_token") as client:
key = client.api_keys.create(name="my-key")
print(key.key) # wrn_...
Capsules
Capsules are isolated microVM environments. Create, manage, and interact with them:
# Create
cap = client.capsules.create(
template="base-python",
vcpus=2,
memory_mb=1024,
timeout_sec=300,
)
# List
for c in client.capsules.list():
print(c.id, c.status)
# Get
cap = client.capsules.get("cl-abc123")
# Destroy
client.capsules.destroy("cl-abc123")
Context Manager
Use capsules as context managers for automatic cleanup:
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60)
cap.exec("python -c 'print(42)'")
# cap.destroy() is called automatically
Command Execution
exec() — One-off Commands
Starts a fresh process for each call. No state persists between calls.
result = cap.exec("python", args=["-c", "import os; print(os.getcwd())"])
print(result.stdout) # "/home/user\n"
print(result.stderr) # ""
print(result.exit_code) # 0
print(result.duration_ms) # 42
exec_stream() — Streaming Output
Stream real-time output from long-running commands:
for event in cap.exec_stream("python", args=["-u", "train.py"]):
match event.type:
case "stdout":
print(event.data, end="")
case "stderr":
print(event.data, end="", file=sys.stderr)
case "exit":
print(f"\nExited with code {event.exit_code}")
run_code() — Stateful Code Execution
Execute Python code in a persistent Jupyter kernel. Variables, imports, and function definitions survive across calls:
with client.capsules.create(template="python-interpreter-v0-beta") as cap:
cap.wait_ready(timeout=60)
cap.run_code("x = 42")
r = cap.run_code("x * 2")
print(r.text) # "84"
cap.run_code("def greet(name): return f'hello {name}'")
r = cap.run_code("greet('world')")
print(r.text) # "'hello world'"
r = cap.run_code("1/0")
print(r.error) # "ZeroDivisionError: division by zero\n..."
CodeResult fields:
| Field | Type | Description |
|---|---|---|
text |
str | None |
Plain text representation |
data |
dict | None |
Rich MIME bundle (e.g. {"image/png": "..."}) |
stdout |
str |
Accumulated stdout |
stderr |
str |
Accumulated stderr |
error |
str | None |
Error traceback string |
Filesystem
Upload, download, and manage files inside capsules:
# Upload / Download
cap.upload("/app/main.py", b"print('hello')")
content = cap.download("/app/main.py")
# Streaming (for large files)
def chunks():
yield b"chunk1"
yield b"chunk2"
cap.stream_upload("/data/large.bin", chunks())
for chunk in cap.stream_download("/data/large.bin"):
process(chunk)
# Directory operations
entries = cap.list_dir("/home/user", depth=1)
for entry in entries:
print(entry.name, entry.type, entry.size)
cap.mkdir("/home/user/data")
cap.remove("/home/user/old_data")
Interactive Terminal (PTY)
Open a full interactive terminal session over WebSocket:
with cap.pty(cmd="/bin/bash", cols=120, rows=40, cwd="/home/user") as term:
term.write(b"ls -la\n")
for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
PtySession methods:
| Method | Description |
|---|---|
write(data: bytes) |
Send raw bytes to stdin |
resize(cols, rows) |
Resize the terminal |
kill() |
Send SIGKILL to the process |
tag |
Session tag (available after started event) |
pid |
Process PID (available after started event) |
Reconnect to an existing session using the tag:
with cap.pty_connect(term.tag) as term:
term.write(b"echo reconnected\n")
Lifecycle
Pause and resume capsules to save resources:
cap = client.capsules.create(template="minimal")
cap.wait_ready(timeout=60)
# Pause (snapshots and releases resources)
cap.pause()
print(cap.status) # "paused"
# Resume (restores from snapshot)
cap.resume()
cap.wait_ready(timeout=60)
Keep a capsule alive with ping():
cap.ping() # Resets the inactivity timer
Proxy URL
Access services running inside a capsule through the proxy:
url = cap.get_url(8888)
# "wss://8888-cl-abc123.api.wrenn.dev"
# Pre-configured HTTP client targeting port 8888
resp = cap.http_client.get("/api/kernels")
Snapshots
Create templates from running capsules:
# Create a snapshot
template = client.snapshots.create(
capsule_id="cl-abc123",
name="my-template",
overwrite=True,
)
# List templates
for t in client.snapshots.list():
print(t.name, t.type)
# Delete
client.snapshots.delete("my-template")
Hosts
Manage host machines:
host = client.hosts.create(type="regular")
client.hosts.list()
client.hosts.get("h-1")
client.hosts.delete("h-1")
client.hosts.regenerate_token("h-1")
client.hosts.list_tags("h-1")
client.hosts.add_tag("h-1", "gpu")
client.hosts.remove_tag("h-1", "gpu")
Async Support
All operations have async variants. Use AsyncWrennClient and prefix capsule methods with async_:
from wrenn import AsyncWrennClient
async with AsyncWrennClient(api_key="wrn_...") as client:
cap = await client.capsules.create(template="minimal")
await cap.async_wait_ready(timeout=60)
result = await cap.async_exec("echo", args=["hello"])
await cap.async_upload("/app/file.txt", b"data")
entries = await cap.async_list_dir("/home/user")
r = await cap.async_run_code("42 * 2")
await cap.async_destroy()
Async method mapping:
| Sync | Async |
|---|---|
exec() |
async_exec() |
upload() |
async_upload() |
download() |
async_download() |
stream_upload() |
async_stream_upload() |
stream_download() |
async_stream_download() |
list_dir() |
async_list_dir() |
mkdir() |
async_mkdir() |
remove() |
async_remove() |
wait_ready() |
async_wait_ready() |
pause() |
async_pause() |
resume() |
async_resume() |
destroy() |
async_destroy() |
ping() |
async_ping() |
run_code() |
async_run_code() |
Error Handling
The SDK maps server error codes to typed exceptions:
from wrenn import (
WrennError,
WrennValidationError, # 400
WrennAuthenticationError, # 401
WrennForbiddenError, # 403
WrennNotFoundError, # 404
WrennConflictError, # 409
WrennHostHasCapsulesError, # 409 — host has running capsules
WrennAgentError, # 502
WrennInternalError, # 500
WrennHostUnavailableError, # 503
)
try:
client.capsules.get("nonexistent")
except WrennNotFoundError as e:
print(e.code) # "not_found"
print(e.message) # "capsule not found"
print(e.status_code) # 404
All exceptions inherit from WrennError and expose .code, .message, and .status_code.
Development
This project uses uv for dependency management.
# Install dependencies
uv sync
# Run linting
make lint
# Run unit tests
make test
# Run all tests (including integration)
make test-integration
# Regenerate models from OpenAPI spec
make generate
Running Integration Tests
Integration tests require a live Wrenn server. Set environment variables:
export WRENN_API_KEY="wrn_..."
export WRENN_BASE_URL="http://localhost:8080" # optional
make test-integration
License
MIT