9 Commits

Author SHA1 Message Date
c4296ddd22 Updated SDK to match v0.1.1 2026-04-20 02:51:58 +06:00
2002c3f7a7 Modularized the integration tests 2026-04-18 03:26:47 +06:00
0ac9bf79ee feat: created README 2026-04-13 03:16:44 +06:00
bf5914c0a8 fix: renamed sandbox to capsule 2026-04-13 03:16:27 +06:00
976af9a209 ci: woodpecker doesn't support variable expansions outside of commands 2026-04-12 03:08:34 +06:00
f3fd6865f9 ci: bug fixes 2026-04-12 03:03:33 +06:00
340ed46df6 CI for linting and testing 2026-04-12 02:51:14 +06:00
a5bf66c199 feat: add sandbox filesystem and terminal support
Add sandbox filesystem methods (list_dir, mkdir, remove, upload,
download, stream_upload, stream_download) and interactive PTY sessions
(PtySession, AsyncPtySession) with reconnect support per
FILE_TERMINAL.md spec. Refactor error handling into exceptions.py as
shared handle_response(). Replace API-key-only proxy auth with unified
_proxy_headers() supporting both API key and JWT. Fix stream_upload to
build multipart manually instead of relying on httpx files= with
generators. Switch Makefile SPEC_URL from main to dev branch. Regenerate
models from updated OpenAPI spec (adds teams, channels, metrics, PTY
endpoints). Add comprehensive unit and integration tests. Trim AGENTS.md
to verified facts only.
2026-04-12 02:35:20 +06:00
f51a962fff feat: implement client architecture and sandbox environment
Introduces the core Wrenn client and a dedicated sandbox execution
environment. This includes automated model generation and a custom
exception hierarchy to support robust integration.

- Add `WrennClient` in `src/wrenn/client.py` for API interaction.
- Implement `Sandbox` in `src/wrenn/sandbox.py` for isolated execution.
- Add Pydantic/model support via `_generated.py`.
- Define project-specific error types in `exceptions.py`.
- Include AGENTS.md documentation for specialized logic.
- Add comprehensive unit and integration tests.
- Update build system (Makefile, uv.lock, pyproject.toml) and LICENSE.
2026-04-10 22:24:50 +06:00
29 changed files with 7980 additions and 134 deletions

1
.gitignore vendored
View File

@ -174,3 +174,4 @@ cython_debug/
# PyPI configuration file
.pypirc
CODE_EXECUTION.md

46
.woodpecker/check.yml Normal file
View File

@ -0,0 +1,46 @@
when:
event: push
branch:
- main
- dev
variables:
- &python_image "ghcr.io/astral-sh/uv:python3.13-bookworm-slim"
- &uv_cache_dir "/root/.cache/uv"
steps:
- name: restore-cache
image: woodpeckerci/plugin-cache
settings:
restore: true
cache_key: "uv-{{ checksum \"uv.lock\" }}"
mount:
- /root/.cache/uv
- name: lint
image: *python_image
environment:
UV_CACHE_DIR: *uv_cache_dir
UV_FROZEN: 1
commands:
- uv sync --no-install-project
- make lint
- name: test
image: *python_image
environment:
UV_CACHE_DIR: *uv_cache_dir
UV_FROZEN: 1
commands:
- uv sync --no-install-project
- make test
- name: rebuild-cache
image: woodpeckerci/plugin-cache
when:
- status: [success]
settings:
rebuild: true
cache_key: "uv-{{ checksum \"uv.lock\" }}"
mount:
- /root/.cache/uv

80
AGENTS.md Normal file
View File

@ -0,0 +1,80 @@
# AGENTS.md
## What this repo is
Python SDK for **Wrenn** (microVM code execution platform). Communicates with the Control Plane via REST + WebSockets only — no gRPC. The `envd` and `HostAgentService` are internal to the Go backend and never reachable from this SDK.
## Build & dev commands
All commands go through `uv` and the `Makefile`. Never use raw `pip`, `venv`, or `python -m venv`.
```bash
make generate # Fetch openapi.yaml → src/wrenn/models/_generated.py
make lint # ruff check + ruff format --check on src/
make test # runs ONLY tests/test_client.py
make test-integration # runs ALL tests (unit + integration, needs live server)
make check # lint + test (test_client.py only)
```
To run all unit tests (not just test_client.py):
```bash
uv run pytest tests/test_client.py tests/test_sandbox_features.py tests/test_filesystem_pty.py -v
```
To run a single test:
```bash
uv run pytest tests/test_client.py::TestAuth::test_signup -v
```
## Code generation (CRITICAL)
Models in `src/wrenn/models/_generated.py` are generated by `datamodel-codegen` from `api/openapi.yaml`.
1. **Never edit `_generated.py`** — overwritten on next `make generate`.
2. All user-facing models must be re-exported in `src/wrenn/models/__init__.py` via `__all__`.
3. To extend a generated model with custom methods, subclass it (e.g. `Sandbox` in `sandbox.py` subclasses the generated `SandboxModel`).
## Dependency management
```bash
uv add <package> # runtime dep
uv add --dev <package> # dev dep
uv run <command> # run in managed .venv
```
## Implemented resource namespaces
Only these are currently implemented in `client.py`:
- **`client.auth`** — `signup`, `login`
- **`client.api_keys`** — `create`, `list`, `delete`
- **`client.sandboxes`** — `create`, `list`, `get`, `destroy`
- **`client.snapshots`** — `create`, `list`, `delete`
- **`client.hosts`** — `create`, `list`, `get`, `delete`, `regenerate_token`, `list_tags`, `add_tag`, `remove_tag`
Both sync and async variants exist for every resource.
## Architecture notes
- **Sync/async parity**: `WrennClient` + `AsyncWrennClient` in `client.py`, using `httpx.Client`/`httpx.AsyncClient`. Async methods on `Sandbox` are prefixed `async_` (e.g. `async_exec`, `async_upload`).
- **WebSocket library**: `httpx-ws` (not `websockets`). Used for `exec_stream`, `pty`, and `run_code`.
- **Sandbox proxy URL**: `get_url(port)` returns `ws://` or `wss://` scheme. The `http_client` property converts to `http://`/`https://` automatically.
- **`Sandbox`** (in `sandbox.py`) is the main developer-facing class — subclasses generated model, adds lifecycle methods (`exec`, `upload`, `download`, `list_dir`, `mkdir`, `remove`, `pty`, `run_code`, `wait_ready`, `pause`, `resume`, `destroy`, `ping`, `metrics`), context manager support, and proxy helpers.
- **Error handling**: `handle_response()` in `exceptions.py` maps server error `code` field to typed exceptions (not just HTTP status). All inherit from `WrennError` with `.code`, `.message`, `.status_code`.
## Testing
- **HTTP mocking**: `respx` library (not `responses` or `pytest-httpx`). Mock routes with `@respx.mock` decorator or `respx.mock` context manager.
- **Async tests**: use `@pytest.mark.asyncio` (backed by `pytest-asyncio`).
- **Integration tests**: in `test_integration.py`, require env vars `WRENN_API_KEY` or `WRENN_TOKEN` (plus optional `WRENN_BASE_URL`, `WRENN_TEST_EMAIL`, `WRENN_TEST_PASSWORD`). They are skipped via `@requires_auth` if credentials are absent.
- **Fixtures**: test fixtures create `WrennClient(api_key="wrn_test1234567890abcdef12345678")` with context manager cleanup.
## Coding conventions
- **Python 3.13+** with modern syntax (`|` unions, `list[str]` generics).
- **Strict typing** throughout. `pyright`/`mypy` available but not in CI.
- **`ruff`** is the sole linter and formatter. Do not use `black`, `isort`, or `flake8`.
- **Google-style docstrings** on all public APIs.
- **No comments** unless explicitly asked.

20
LICENSE
View File

@ -1,18 +1,18 @@
MIT License
Copyright (c) 2026 wrenn
Copyright (c) 2026 M/S Omukk, Bangladesh
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO
EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO
EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,8 +1,8 @@
# Makefile
.PHONY: generate
.PHONY: generate lint test check test-integration
# Variables
SPEC_URL = "https://git.omukk.dev/wrenn/sandbox/raw/branch/main/internal/api/openapi.yaml"
SPEC_URL = "https://git.omukk.dev/wrenn/wrenn/raw/branch/dev/internal/api/openapi.yaml"
SPEC_PATH = "api/openapi.yaml"
generate:
@ -22,3 +22,15 @@ generate:
--target-python-version 3.13 \
--use-annotated \
--openapi-scopes schemas
lint:
uv run ruff check src/
uv run ruff format --check src/
test:
uv run pytest tests/test_client.py -v
test-integration:
uv run pytest tests/ -v -m "integration or not integration"
check: lint test

371
README.md
View File

@ -1,3 +1,370 @@
# python-sdk
# Wrenn Python SDK
Python SDK for wrenn
Python client for the [Wrenn](https://wrenn.dev) microVM code execution platform. Create isolated capsules, execute commands, manage files, run interactive terminals, and execute persistent code — all from Python.
## Installation
```bash
pip install wrenn
```
Requires Python 3.13+.
## Quick Start
```python
from wrenn import WrennClient
client = WrennClient(api_key="wrn_your_api_key_here")
# Create a capsule and run a command
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60)
result = cap.exec("echo", args=["hello world"])
print(result.stdout) # "hello world"
print(result.exit_code) # 0
```
## Authentication
The SDK supports two authentication methods:
```python
# API key
client = WrennClient(api_key="wrn_...")
# JWT token
client = WrennClient(token="eyJ...")
```
You can obtain an API key via the dashboard or create one programmatically:
```python
with WrennClient(token="jwt_token") as client:
key = client.api_keys.create(name="my-key")
print(key.key) # wrn_...
```
## Capsules
Capsules are isolated microVM environments. Create, manage, and interact with them:
```python
# Create
cap = client.capsules.create(
template="base-python",
vcpus=2,
memory_mb=1024,
timeout_sec=300,
)
# List
for c in client.capsules.list():
print(c.id, c.status)
# Get
cap = client.capsules.get("cl-abc123")
# Destroy
client.capsules.destroy("cl-abc123")
```
### Context Manager
Use capsules as context managers for automatic cleanup:
```python
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60)
cap.exec("python -c 'print(42)'")
# cap.destroy() is called automatically
```
## Command Execution
### `exec()` — One-off Commands
Starts a fresh process for each call. No state persists between calls.
```python
result = cap.exec("python", args=["-c", "import os; print(os.getcwd())"])
print(result.stdout) # "/home/user\n"
print(result.stderr) # ""
print(result.exit_code) # 0
print(result.duration_ms) # 42
```
### `exec_stream()` — Streaming Output
Stream real-time output from long-running commands:
```python
for event in cap.exec_stream("python", args=["-u", "train.py"]):
match event.type:
case "stdout":
print(event.data, end="")
case "stderr":
print(event.data, end="", file=sys.stderr)
case "exit":
print(f"\nExited with code {event.exit_code}")
```
### `run_code()` — Stateful Code Execution
Execute Python code in a persistent Jupyter kernel. Variables, imports, and function definitions survive across calls:
```python
with client.capsules.create(template="python-interpreter-v0-beta") as cap:
cap.wait_ready(timeout=60)
cap.run_code("x = 42")
r = cap.run_code("x * 2")
print(r.text) # "84"
cap.run_code("def greet(name): return f'hello {name}'")
r = cap.run_code("greet('world')")
print(r.text) # "'hello world'"
r = cap.run_code("1/0")
print(r.error) # "ZeroDivisionError: division by zero\n..."
```
**`CodeResult` fields:**
| Field | Type | Description |
|-------|------|-------------|
| `text` | `str \| None` | Plain text representation |
| `data` | `dict \| None` | Rich MIME bundle (e.g. `{"image/png": "..."}`) |
| `stdout` | `str` | Accumulated stdout |
| `stderr` | `str` | Accumulated stderr |
| `error` | `str \| None` | Error traceback string |
## Filesystem
Upload, download, and manage files inside capsules:
```python
# Upload / Download
cap.upload("/app/main.py", b"print('hello')")
content = cap.download("/app/main.py")
# Streaming (for large files)
def chunks():
yield b"chunk1"
yield b"chunk2"
cap.stream_upload("/data/large.bin", chunks())
for chunk in cap.stream_download("/data/large.bin"):
process(chunk)
# Directory operations
entries = cap.list_dir("/home/user", depth=1)
for entry in entries:
print(entry.name, entry.type, entry.size)
cap.mkdir("/home/user/data")
cap.remove("/home/user/old_data")
```
## Interactive Terminal (PTY)
Open a full interactive terminal session over WebSocket:
```python
with cap.pty(cmd="/bin/bash", cols=120, rows=40, cwd="/home/user") as term:
term.write(b"ls -la\n")
for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
```
**PtySession methods:**
| Method | Description |
|--------|-------------|
| `write(data: bytes)` | Send raw bytes to stdin |
| `resize(cols, rows)` | Resize the terminal |
| `kill()` | Send SIGKILL to the process |
| `tag` | Session tag (available after `started` event) |
| `pid` | Process PID (available after `started` event) |
Reconnect to an existing session using the tag:
```python
with cap.pty_connect(term.tag) as term:
term.write(b"echo reconnected\n")
```
## Lifecycle
Pause and resume capsules to save resources:
```python
cap = client.capsules.create(template="minimal")
cap.wait_ready(timeout=60)
# Pause (snapshots and releases resources)
cap.pause()
print(cap.status) # "paused"
# Resume (restores from snapshot)
cap.resume()
cap.wait_ready(timeout=60)
```
Keep a capsule alive with `ping()`:
```python
cap.ping() # Resets the inactivity timer
```
## Proxy URL
Access services running inside a capsule through the proxy:
```python
url = cap.get_url(8888)
# "wss://8888-cl-abc123.api.wrenn.dev"
# Pre-configured HTTP client targeting port 8888
resp = cap.http_client.get("/api/kernels")
```
## Snapshots
Create templates from running capsules:
```python
# Create a snapshot
template = client.snapshots.create(
capsule_id="cl-abc123",
name="my-template",
overwrite=True,
)
# List templates
for t in client.snapshots.list():
print(t.name, t.type)
# Delete
client.snapshots.delete("my-template")
```
## Hosts
Manage host machines:
```python
host = client.hosts.create(type="regular")
client.hosts.list()
client.hosts.get("h-1")
client.hosts.delete("h-1")
client.hosts.regenerate_token("h-1")
client.hosts.list_tags("h-1")
client.hosts.add_tag("h-1", "gpu")
client.hosts.remove_tag("h-1", "gpu")
```
## Async Support
All operations have async variants. Use `AsyncWrennClient` and prefix capsule methods with `async_`:
```python
from wrenn import AsyncWrennClient
async with AsyncWrennClient(api_key="wrn_...") as client:
cap = await client.capsules.create(template="minimal")
await cap.async_wait_ready(timeout=60)
result = await cap.async_exec("echo", args=["hello"])
await cap.async_upload("/app/file.txt", b"data")
entries = await cap.async_list_dir("/home/user")
r = await cap.async_run_code("42 * 2")
await cap.async_destroy()
```
**Async method mapping:**
| Sync | Async |
|------|-------|
| `exec()` | `async_exec()` |
| `upload()` | `async_upload()` |
| `download()` | `async_download()` |
| `stream_upload()` | `async_stream_upload()` |
| `stream_download()` | `async_stream_download()` |
| `list_dir()` | `async_list_dir()` |
| `mkdir()` | `async_mkdir()` |
| `remove()` | `async_remove()` |
| `wait_ready()` | `async_wait_ready()` |
| `pause()` | `async_pause()` |
| `resume()` | `async_resume()` |
| `destroy()` | `async_destroy()` |
| `ping()` | `async_ping()` |
| `run_code()` | `async_run_code()` |
## Error Handling
The SDK maps server error codes to typed exceptions:
```python
from wrenn import (
WrennError,
WrennValidationError, # 400
WrennAuthenticationError, # 401
WrennForbiddenError, # 403
WrennNotFoundError, # 404
WrennConflictError, # 409
WrennHostHasCapsulesError, # 409 — host has running capsules
WrennAgentError, # 502
WrennInternalError, # 500
WrennHostUnavailableError, # 503
)
try:
client.capsules.get("nonexistent")
except WrennNotFoundError as e:
print(e.code) # "not_found"
print(e.message) # "capsule not found"
print(e.status_code) # 404
```
All exceptions inherit from `WrennError` and expose `.code`, `.message`, and `.status_code`.
## Development
This project uses [uv](https://docs.astral.sh/uv/) for dependency management.
```bash
# Install dependencies
uv sync
# Run linting
make lint
# Run unit tests
make test
# Run all tests (including integration)
make test-integration
# Regenerate models from OpenAPI spec
make generate
```
### Running Integration Tests
Integration tests require a live Wrenn server. Set environment variables:
```bash
export WRENN_API_KEY="wrn_..."
export WRENN_BASE_URL="http://localhost:8080" # optional
make test-integration
```
## License
MIT

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,9 @@ authors = [
]
requires-python = ">=3.13"
dependencies = [
"email-validator>=2.3.0",
"httpx>=0.28.1",
"httpx-ws>=0.9.0",
"pydantic>=2.12.5",
]
@ -22,5 +24,11 @@ dev = [
"mypy>=1.20.0",
"pytest>=9.0.3",
"pytest-asyncio>=1.3.0",
"respx>=0.23.1",
"ruff>=0.15.10",
]
[tool.pytest.ini_options]
markers = [
"integration: integration tests (require live server)",
]

View File

@ -1,2 +1,82 @@
def hello() -> str:
return "Hello from wrenn!"
from wrenn.capsule import (
Capsule,
CodeResult,
ExecResult,
StreamErrorEvent,
StreamEvent,
StreamExitEvent,
StreamStartEvent,
StreamStderrEvent,
StreamStdoutEvent,
)
from wrenn.client import AsyncWrennClient, WrennClient
from wrenn.exceptions import (
WrennAgentError,
WrennAuthenticationError,
WrennConflictError,
WrennError,
WrennForbiddenError,
WrennHostHasCapsulesError,
WrennHostUnavailableError,
WrennInternalError,
WrennNotFoundError,
WrennValidationError,
)
from wrenn.models import FileEntry
from wrenn.pty import AsyncPtySession, PtyEvent, PtyEventType, PtySession
__version__ = "0.1.0"
__all__ = [
"__version__",
"AsyncPtySession",
"AsyncWrennClient",
"Capsule",
"CodeResult",
"ExecResult",
"FileEntry",
"PtyEvent",
"PtyEventType",
"PtySession",
"Sandbox",
"StreamErrorEvent",
"StreamEvent",
"StreamExitEvent",
"StreamStartEvent",
"StreamStderrEvent",
"StreamStdoutEvent",
"WrennAgentError",
"WrennAuthenticationError",
"WrennClient",
"WrennConflictError",
"WrennError",
"WrennForbiddenError",
"WrennHostHasCapsulesError",
"WrennHostHasSandboxesError",
"WrennHostUnavailableError",
"WrennInternalError",
"WrennNotFoundError",
"WrennValidationError",
]
def __getattr__(name: str) -> type:
if name == "Sandbox":
import warnings
warnings.warn(
"'Sandbox' is deprecated, use 'Capsule' instead",
DeprecationWarning,
stacklevel=2,
)
return Capsule
if name == "WrennHostHasSandboxesError":
import warnings
warnings.warn(
"'WrennHostHasSandboxesError' is deprecated, use 'WrennHostHasCapsulesError' instead",
DeprecationWarning,
stacklevel=2,
)
return WrennHostHasCapsulesError
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

1324
src/wrenn/capsule.py Normal file

File diff suppressed because it is too large Load Diff

1023
src/wrenn/client.py Normal file

File diff suppressed because it is too large Load Diff

126
src/wrenn/exceptions.py Normal file
View File

@ -0,0 +1,126 @@
from __future__ import annotations
import warnings
import httpx
class WrennError(Exception):
"""Base exception for all Wrenn SDK errors."""
def __init__(self, code: str, message: str, status_code: int) -> None:
self.code = code
self.message = message
self.status_code = status_code
super().__init__(message)
class WrennValidationError(WrennError):
"""400 — Invalid request parameters."""
class WrennAuthenticationError(WrennError):
"""401 — Invalid or missing authentication."""
class WrennForbiddenError(WrennError):
"""403 — Authenticated but not authorized."""
class WrennNotFoundError(WrennError):
"""404 — Resource not found."""
class WrennConflictError(WrennError):
"""409 — State conflict (e.g. invalid_state)."""
class WrennHostHasCapsulesError(WrennConflictError):
"""409 — Host still has running capsules."""
def __init__(
self, code: str, message: str, status_code: int, capsule_ids: list[str]
) -> None:
self.capsule_ids = capsule_ids
super().__init__(code, message, status_code)
@property
def sandbox_ids(self) -> list[str]:
warnings.warn(
"'sandbox_ids' is deprecated, use 'capsule_ids' instead",
DeprecationWarning,
stacklevel=2,
)
return self.capsule_ids
class WrennHostUnavailableError(WrennError):
"""503 — No suitable host available."""
class WrennAgentError(WrennError):
"""502 — Host agent returned an error."""
class WrennInternalError(WrennError):
"""500 — Unexpected server error."""
_ERROR_MAP: dict[str, type[WrennError]] = {
"invalid_request": WrennValidationError,
"unauthorized": WrennAuthenticationError,
"forbidden": WrennForbiddenError,
"not_found": WrennNotFoundError,
"invalid_state": WrennConflictError,
"conflict": WrennConflictError,
"host_has_sandboxes": WrennHostHasCapsulesError,
"host_has_capsules": WrennHostHasCapsulesError,
"host_unavailable": WrennHostUnavailableError,
"agent_error": WrennAgentError,
"internal_error": WrennInternalError,
}
def handle_response(resp: httpx.Response) -> dict | list:
if resp.status_code >= 400:
try:
body = resp.json()
except Exception:
resp.raise_for_status()
raise
err = body.get("error", {})
code = err.get("code", "internal_error")
message = err.get("message", resp.text)
exc_cls = _ERROR_MAP.get(code, WrennError)
if exc_cls is WrennHostHasCapsulesError:
raise WrennHostHasCapsulesError(
code=code,
message=message,
status_code=resp.status_code,
capsule_ids=body.get("sandbox_ids", []),
)
raise exc_cls(
code=code,
message=message,
status_code=resp.status_code,
)
if resp.status_code == 204:
return {}
return resp.json()
def __getattr__(name: str) -> type:
if name == "WrennHostHasSandboxesError":
warnings.warn(
"'WrennHostHasSandboxesError' is deprecated, use 'WrennHostHasCapsulesError' instead",
DeprecationWarning,
stacklevel=2,
)
return WrennHostHasCapsulesError
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@ -0,0 +1,113 @@
from wrenn.models._generated import (
APIKeyResponse,
AuthResponse,
BackgroundExecResponse,
Capsule,
CapsuleMetrics,
CapsuleStats,
ChangePasswordRequest,
ChannelResponse,
CreateAPIKeyRequest,
CreateCapsuleRequest,
CreateChannelRequest,
CreateHostRequest,
CreateHostResponse,
CreateSnapshotRequest,
Encoding,
Error,
Error1,
ExecRequest,
ExecResponse,
FileEntry,
Host,
HostDeletePreview,
ListDirRequest,
ListDirResponse,
LoginRequest,
MakeDirRequest,
MakeDirResponse,
MeResponse,
MetricPoint,
ProcessEntry,
ProcessListResponse,
ReadFileRequest,
RefreshHostTokenRequest,
RefreshHostTokenResponse,
RegisterHostRequest,
RegisterHostResponse,
RemoveRequest,
RotateConfigRequest,
SignupRequest,
SignupResponse,
Status,
Status1,
Template,
Team,
TeamDetail,
TeamMember,
TeamWithRole,
TestChannelRequest,
Type,
Type1,
Type2,
UpdateChannelRequest,
UsageResponse,
UserSearchResult,
)
__all__ = [
"APIKeyResponse",
"AuthResponse",
"BackgroundExecResponse",
"Capsule",
"CapsuleMetrics",
"CapsuleStats",
"ChangePasswordRequest",
"ChannelResponse",
"CreateAPIKeyRequest",
"CreateCapsuleRequest",
"CreateChannelRequest",
"CreateHostRequest",
"CreateHostResponse",
"CreateSnapshotRequest",
"Encoding",
"Error",
"Error1",
"ExecRequest",
"ExecResponse",
"FileEntry",
"Host",
"HostDeletePreview",
"ListDirRequest",
"ListDirResponse",
"LoginRequest",
"MakeDirRequest",
"MakeDirResponse",
"MeResponse",
"MetricPoint",
"ProcessEntry",
"ProcessListResponse",
"ReadFileRequest",
"RefreshHostTokenRequest",
"RefreshHostTokenResponse",
"RegisterHostRequest",
"RegisterHostResponse",
"RemoveRequest",
"RotateConfigRequest",
"SignupRequest",
"SignupResponse",
"Status",
"Status1",
"Template",
"Team",
"TeamDetail",
"TeamMember",
"TeamWithRole",
"TestChannelRequest",
"Type",
"Type1",
"Type2",
"UpdateChannelRequest",
"UsageResponse",
"UserSearchResult",
]

View File

@ -0,0 +1,627 @@
# generated by datamodel-codegen:
# filename: openapi.yaml
# timestamp: 2026-04-19T19:56:15+00:00
from __future__ import annotations
from datetime import date as date_aliased
from enum import StrEnum
from typing import Annotated
from pydantic import AwareDatetime, BaseModel, EmailStr, Field
class SignupRequest(BaseModel):
email: EmailStr
password: Annotated[str, Field(min_length=8)]
name: Annotated[str, Field(max_length=100)]
class LoginRequest(BaseModel):
email: EmailStr
password: str
class SignupResponse(BaseModel):
message: Annotated[
str | None,
Field(description="Confirmation message instructing user to check email"),
] = None
class AuthResponse(BaseModel):
token: Annotated[str | None, Field(description="JWT token (valid for 6 hours)")] = (
None
)
user_id: str | None = None
team_id: str | None = None
email: str | None = None
name: str | None = None
class CreateAPIKeyRequest(BaseModel):
name: str | None = "Unnamed API Key"
class APIKeyResponse(BaseModel):
id: str | None = None
team_id: str | None = None
name: str | None = None
key_prefix: Annotated[
str | None, Field(description='Display prefix (e.g. "wrn_ab12cd34...")')
] = None
created_at: AwareDatetime | None = None
last_used: AwareDatetime | None = None
key: Annotated[
str | None,
Field(
description="Full plaintext key. Only returned on creation, never again."
),
] = None
class CreateCapsuleRequest(BaseModel):
template: str | None = "minimal"
vcpus: int | None = 1
memory_mb: int | None = 512
timeout_sec: Annotated[
int | None,
Field(
description="Auto-pause TTL in seconds. The capsule is automatically paused after this duration of inactivity (no exec or ping). 0 means no auto-pause.\n"
),
] = 0
class Point(BaseModel):
date: date_aliased | None = None
cpu_minutes: float | None = None
ram_mb_minutes: float | None = None
class UsageResponse(BaseModel):
from_: Annotated[date_aliased | None, Field(alias="from")] = None
to: date_aliased | None = None
points: list[Point] | None = None
class Range(StrEnum):
field_5m = "5m"
field_1h = "1h"
field_6h = "6h"
field_24h = "24h"
field_30d = "30d"
class Current(BaseModel):
running_count: int | None = None
vcpus_reserved: int | None = None
memory_mb_reserved: int | None = None
sampled_at: AwareDatetime | None = None
class Peaks(BaseModel):
"""
Maximum values over the last 30 days.
"""
running_count: int | None = None
vcpus: int | None = None
memory_mb: int | None = None
class Series(BaseModel):
"""
Parallel arrays for chart rendering.
"""
labels: list[AwareDatetime] | None = None
running: list[int] | None = None
vcpus: list[int] | None = None
memory_mb: list[int] | None = None
class CapsuleStats(BaseModel):
range: Range | None = None
current: Current | None = None
peaks: Annotated[
Peaks | None, Field(description="Maximum values over the last 30 days.")
] = None
series: Annotated[
Series | None, Field(description="Parallel arrays for chart rendering.")
] = None
class Status(StrEnum):
pending = "pending"
starting = "starting"
running = "running"
paused = "paused"
hibernated = "hibernated"
stopped = "stopped"
missing = "missing"
error = "error"
class Capsule(BaseModel):
id: str | None = None
status: Status | None = None
template: str | None = None
vcpus: int | None = None
memory_mb: int | None = None
timeout_sec: int | None = None
guest_ip: str | None = None
host_ip: str | None = None
created_at: AwareDatetime | None = None
started_at: AwareDatetime | None = None
last_active_at: AwareDatetime | None = None
last_updated: AwareDatetime | None = None
class CreateSnapshotRequest(BaseModel):
sandbox_id: Annotated[
str, Field(description="ID of the running capsule to snapshot.")
]
name: Annotated[
str | None,
Field(description="Name for the snapshot template. Auto-generated if omitted."),
] = None
class Type(StrEnum):
base = "base"
snapshot = "snapshot"
class Template(BaseModel):
name: str | None = None
type: Type | None = None
vcpus: int | None = None
memory_mb: int | None = None
size_bytes: int | None = None
created_at: AwareDatetime | None = None
class ExecRequest(BaseModel):
cmd: str
args: list[str] | None = None
timeout_sec: Annotated[
int | None,
Field(description="Timeout in seconds (foreground exec only, default 30)"),
] = 30
background: Annotated[
bool | None,
Field(
description="If true, starts the process in the background and returns immediately with a PID and tag (HTTP 202)"
),
] = False
tag: Annotated[
str | None,
Field(
description="Optional user-chosen tag for the background process. Auto-generated if omitted. Only used when background is true."
),
] = None
envs: Annotated[
dict[str, str] | None,
Field(
description="Environment variables for the process (background exec only)"
),
] = None
cwd: Annotated[
str | None,
Field(description="Working directory for the process (background exec only)"),
] = None
class BackgroundExecResponse(BaseModel):
sandbox_id: str | None = None
cmd: str | None = None
pid: int | None = None
tag: str | None = None
class ProcessEntry(BaseModel):
pid: int | None = None
tag: str | None = None
cmd: str | None = None
args: list[str] | None = None
class ProcessListResponse(BaseModel):
processes: list[ProcessEntry] | None = None
class Encoding(StrEnum):
"""
Output encoding. "base64" when stdout/stderr contain binary data.
"""
utf_8 = "utf-8"
base64 = "base64"
class ExecResponse(BaseModel):
sandbox_id: str | None = None
cmd: str | None = None
stdout: str | None = None
stderr: str | None = None
exit_code: int | None = None
duration_ms: int | None = None
encoding: Annotated[
Encoding | None,
Field(
description='Output encoding. "base64" when stdout/stderr contain binary data.'
),
] = None
class ReadFileRequest(BaseModel):
path: Annotated[str, Field(description="Absolute file path inside the capsule")]
class ListDirRequest(BaseModel):
path: Annotated[str, Field(description="Directory path inside the capsule")]
depth: Annotated[
int | None,
Field(
description="Recursion depth (0 = non-recursive, 1 = immediate children)"
),
] = 1
class Type1(StrEnum):
file = "file"
directory = "directory"
symlink = "symlink"
class FileEntry(BaseModel):
name: str | None = None
path: str | None = None
type: Type1 | None = None
size: int | None = None
mode: int | None = None
permissions: Annotated[
str | None, Field(description='Human-readable permissions (e.g. "-rwxr-xr-x")')
] = None
owner: str | None = None
group: str | None = None
modified_at: Annotated[
int | None, Field(description="Unix timestamp (seconds)")
] = None
symlink_target: str | None = None
class MakeDirRequest(BaseModel):
path: Annotated[
str, Field(description="Directory path to create inside the capsule")
]
class MakeDirResponse(BaseModel):
entry: FileEntry | None = None
class RemoveRequest(BaseModel):
path: Annotated[str, Field(description="Path to remove inside the capsule")]
class Type2(StrEnum):
"""
Host type. Regular hosts are shared; BYOC hosts belong to a team.
"""
regular = "regular"
byoc = "byoc"
class CreateHostRequest(BaseModel):
type: Annotated[
Type2,
Field(
description="Host type. Regular hosts are shared; BYOC hosts belong to a team."
),
]
team_id: Annotated[str | None, Field(description="Required for BYOC hosts.")] = None
provider: Annotated[
str | None,
Field(description="Cloud provider (e.g. aws, gcp, hetzner, bare-metal)."),
] = None
availability_zone: Annotated[
str | None, Field(description="Availability zone (e.g. us-east, eu-west).")
] = None
class RegisterHostRequest(BaseModel):
token: Annotated[
str, Field(description="One-time registration token from POST /v1/hosts.")
]
arch: Annotated[
str | None, Field(description="CPU architecture (e.g. x86_64, aarch64).")
] = None
cpu_cores: int | None = None
memory_mb: int | None = None
disk_gb: int | None = None
address: Annotated[str, Field(description="Host agent address (ip:port).")]
class Type3(StrEnum):
regular = "regular"
byoc = "byoc"
class Status1(StrEnum):
pending = "pending"
online = "online"
offline = "offline"
draining = "draining"
unreachable = "unreachable"
class Host(BaseModel):
id: str | None = None
type: Type3 | None = None
team_id: str | None = None
provider: str | None = None
availability_zone: str | None = None
arch: str | None = None
cpu_cores: int | None = None
memory_mb: int | None = None
disk_gb: int | None = None
address: str | None = None
status: Status1 | None = None
last_heartbeat_at: AwareDatetime | None = None
created_by: str | None = None
created_at: AwareDatetime | None = None
updated_at: AwareDatetime | None = None
class RefreshHostTokenRequest(BaseModel):
refresh_token: Annotated[
str,
Field(
description="Refresh token obtained from registration or a previous refresh."
),
]
class RefreshHostTokenResponse(BaseModel):
host: Host | None = None
token: Annotated[
str | None, Field(description="New host JWT. Valid for 7 days.")
] = None
refresh_token: Annotated[
str | None,
Field(
description="New refresh token. Valid for 60 days; old token is revoked."
),
] = None
class HostDeletePreview(BaseModel):
host: Host | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of capsulees that would be destroyed on force-delete."),
] = None
class Error(BaseModel):
code: Annotated[str | None, Field(examples=["host_has_sandboxes"])] = None
message: str | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of active capsulees blocking deletion."),
] = None
class HostHasCapsulesError(BaseModel):
error: Error | None = None
class AddTagRequest(BaseModel):
tag: str
class UserSearchResult(BaseModel):
user_id: str | None = None
email: str | None = None
class Team(BaseModel):
id: str | None = None
name: str | None = None
slug: Annotated[
str | None, Field(description="Immutable 12-char hex slug (e.g. a1b2c3-d1e2f3)")
] = None
created_at: AwareDatetime | None = None
class Role(StrEnum):
owner = "owner"
admin = "admin"
member = "member"
class TeamWithRole(Team):
role: Role | None = None
class TeamMember(BaseModel):
user_id: str | None = None
email: str | None = None
role: Role | None = None
joined_at: AwareDatetime | None = None
class TeamDetail(BaseModel):
team: Team | None = None
members: list[TeamMember] | None = None
class Range1(StrEnum):
field_5m = "5m"
field_10m = "10m"
field_1h = "1h"
field_2h = "2h"
field_6h = "6h"
field_12h = "12h"
field_24h = "24h"
class MetricPoint(BaseModel):
timestamp_unix: int | None = None
cpu_pct: Annotated[
float | None,
Field(
description="CPU utilization percentage (0-100), normalized to vCPU count"
),
] = None
mem_bytes: Annotated[
int | None,
Field(description="Resident memory in bytes (VmRSS of Firecracker process)"),
] = None
disk_bytes: Annotated[
int | None, Field(description="Allocated disk bytes for the CoW sparse file")
] = None
class Provider(StrEnum):
discord = "discord"
slack = "slack"
teams = "teams"
googlechat = "googlechat"
telegram = "telegram"
matrix = "matrix"
webhook = "webhook"
class Event(StrEnum):
capsule_created = "capsule.created"
capsule_running = "capsule.running"
capsule_paused = "capsule.paused"
capsule_destroyed = "capsule.destroyed"
template_snapshot_created = "template.snapshot.created"
template_snapshot_deleted = "template.snapshot.deleted"
host_up = "host.up"
host_down = "host.down"
class CreateChannelRequest(BaseModel):
name: Annotated[str, Field(description="Unique channel name within the team.")]
provider: Provider
config: Annotated[
dict[str, str],
Field(
description='Provider-specific configuration fields. Discord/Slack/Teams/Google Chat: {"webhook_url": "..."}. Telegram: {"bot_token": "...", "chat_id": "..."}. Matrix: {"homeserver_url": "...", "access_token": "...", "room_id": "..."}. Webhook: {"url": "...", "secret": "..."} (secret is auto-generated if omitted).\n'
),
]
events: list[Event]
class TestChannelRequest(BaseModel):
provider: Provider
config: Annotated[
dict[str, str],
Field(
description="Provider-specific configuration fields (same as CreateChannelRequest.config)."
),
]
class RotateConfigRequest(BaseModel):
config: Annotated[
dict[str, str],
Field(
description="New provider configuration fields. Must include all required fields for the channel's provider. Replaces the existing config entirely.\n"
),
]
class UpdateChannelRequest(BaseModel):
name: str
events: list[Event]
class ChannelResponse(BaseModel):
id: str | None = None
team_id: str | None = None
name: str | None = None
provider: Provider | None = None
events: list[str] | None = None
created_at: AwareDatetime | None = None
updated_at: AwareDatetime | None = None
secret: Annotated[
str | None,
Field(description="Webhook secret. Only returned on creation, never again."),
] = None
class MeResponse(BaseModel):
name: str | None = None
email: EmailStr | None = None
has_password: Annotated[
bool | None,
Field(
description="Whether the user has a password set (false for OAuth-only accounts)"
),
] = None
providers: Annotated[
list[str] | None,
Field(description='List of linked OAuth provider names (e.g. ["github"])'),
] = None
class ChangePasswordRequest(BaseModel):
current_password: Annotated[
str | None, Field(description="Required when changing an existing password")
] = None
new_password: Annotated[str, Field(min_length=8)]
confirm_password: Annotated[
str | None,
Field(
description="Required when adding a password to an OAuth-only account (must match new_password)"
),
] = None
class Error2(BaseModel):
code: str | None = None
message: str | None = None
class Error1(BaseModel):
error: Error2 | None = None
class ListDirResponse(BaseModel):
entries: list[FileEntry] | None = None
class CreateHostResponse(BaseModel):
host: Host | None = None
registration_token: Annotated[
str | None,
Field(
description="One-time registration token for the host agent. Expires in 1 hour."
),
] = None
class RegisterHostResponse(BaseModel):
host: Host | None = None
token: Annotated[
str | None,
Field(description="Host JWT for X-Host-Token header. Valid for 7 days."),
] = None
refresh_token: Annotated[
str | None,
Field(
description="Refresh token for obtaining new JWTs. Valid for 60 days; rotated on each use."
),
] = None
class CapsuleMetrics(BaseModel):
sandbox_id: str | None = None
range: Range1 | None = None
points: list[MetricPoint] | None = None

306
src/wrenn/pty.py Normal file
View File

@ -0,0 +1,306 @@
from __future__ import annotations
import base64
import json
from collections.abc import AsyncIterator, Iterator
from enum import StrEnum
from typing import Any
import httpx_ws
from pydantic import BaseModel
class PtyEventType(StrEnum):
started = "started"
output = "output"
exit = "exit"
error = "error"
ping = "ping"
class PtyEvent(BaseModel):
type: PtyEventType
pid: int | None = None
tag: str | None = None
data: bytes | str | None = None
exit_code: int | None = None
fatal: bool | None = None
def _parse_pty_event(raw: dict[str, Any]) -> PtyEvent:
msg_type = raw.get("type", "")
if msg_type == "started":
return PtyEvent(
type=PtyEventType.started,
pid=raw.get("pid"),
tag=raw.get("tag"),
)
if msg_type == "output":
raw_data = raw.get("data", "")
decoded = base64.b64decode(raw_data) if raw_data else b""
return PtyEvent(type=PtyEventType.output, data=decoded)
if msg_type == "exit":
return PtyEvent(type=PtyEventType.exit, exit_code=raw.get("exit_code", -1))
if msg_type == "error":
return PtyEvent(
type=PtyEventType.error,
data=raw.get("data", ""),
fatal=raw.get("fatal", False),
)
if msg_type == "ping":
return PtyEvent(type=PtyEventType.ping)
return PtyEvent(type=PtyEventType(msg_type) if msg_type else PtyEventType.ping)
class PtySession:
"""Interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over events::
with sb.pty(cmd="/bin/bash") as term:
term.write(b"ls -la\\n")
for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
"""
def __init__(self, ws: httpx_ws.WebSocketSession, capsule_id: str) -> None:
self._ws = ws
self._capsule_id = capsule_id
self._tag: str | None = None
self._pid: int | None = None
self._done = False
@property
def tag(self) -> str | None:
"""Session tag. Available after the ``started`` event."""
return self._tag
@property
def pid(self) -> int | None:
"""Process PID. Available after the ``started`` event."""
return self._pid
def _send_start(
self,
cmd: str = "/bin/bash",
args: list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None,
) -> None:
msg: dict[str, Any] = {
"type": "start",
"cmd": cmd,
"cols": cols or 80,
"rows": rows or 24,
}
if args:
msg["args"] = args
if envs:
msg["envs"] = envs
if cwd:
msg["cwd"] = cwd
self._ws.send_text(json.dumps(msg))
def _send_connect(self, tag: str) -> None:
self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
Args:
data: Raw bytes to send. Base64-encoded internally.
"""
encoded = base64.b64encode(data).decode("ascii")
self._ws.send_text(json.dumps({"type": "input", "data": encoded}))
def resize(self, cols: int, rows: int) -> None:
"""Resize the PTY terminal.
Args:
cols: New column count. Must be > 0.
rows: New row count. Must be > 0.
Raises:
ValueError: If cols or rows is 0.
"""
if cols <= 0 or rows <= 0:
raise ValueError("cols and rows must be greater than 0")
self._ws.send_text(json.dumps({"type": "resize", "cols": cols, "rows": rows}))
def kill(self) -> None:
"""Send SIGKILL to the PTY process."""
self._ws.send_text(json.dumps({"type": "kill"}))
def __iter__(self) -> Iterator[PtyEvent]:
return self
def __next__(self) -> PtyEvent:
if self._done:
raise StopIteration
try:
raw = self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
raise StopIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
if event.tag is not None:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.exit:
raise StopIteration
if event.type == PtyEventType.error and event.fatal:
self._done = True
return event
return event
def __enter__(self) -> PtySession:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
try:
self.kill()
except Exception:
pass
try:
self._ws.close()
except Exception:
pass
class AsyncPtySession:
"""Async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over events::
async with sb.pty(cmd="/bin/bash") as term:
await term.write(b"ls -la\\n")
async for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
"""
def __init__(self, ws: httpx_ws.AsyncWebSocketSession, capsule_id: str) -> None:
self._ws = ws
self._capsule_id = capsule_id
self._tag: str | None = None
self._pid: int | None = None
self._done = False
@property
def tag(self) -> str | None:
"""Session tag. Available after the ``started`` event."""
return self._tag
@property
def pid(self) -> int | None:
"""Process PID. Available after the ``started`` event."""
return self._pid
async def _send_start(
self,
cmd: str = "/bin/bash",
args: list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None,
) -> None:
msg: dict[str, Any] = {
"type": "start",
"cmd": cmd,
"cols": cols or 80,
"rows": rows or 24,
}
if args:
msg["args"] = args
if envs:
msg["envs"] = envs
if cwd:
msg["cwd"] = cwd
await self._ws.send_text(json.dumps(msg))
async def _send_connect(self, tag: str) -> None:
await self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
async def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
Args:
data: Raw bytes to send. Base64-encoded internally.
"""
encoded = base64.b64encode(data).decode("ascii")
await self._ws.send_text(json.dumps({"type": "input", "data": encoded}))
async def resize(self, cols: int, rows: int) -> None:
"""Resize the PTY terminal.
Args:
cols: New column count. Must be > 0.
rows: New row count. Must be > 0.
Raises:
ValueError: If cols or rows is 0.
"""
if cols <= 0 or rows <= 0:
raise ValueError("cols and rows must be greater than 0")
await self._ws.send_text(
json.dumps({"type": "resize", "cols": cols, "rows": rows})
)
async def kill(self) -> None:
"""Send SIGKILL to the PTY process."""
await self._ws.send_text(json.dumps({"type": "kill"}))
def __aiter__(self) -> AsyncIterator[PtyEvent]:
return self
async def __anext__(self) -> PtyEvent:
if self._done:
raise StopAsyncIteration
try:
raw = await self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
raise StopAsyncIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
if event.tag is not None:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.exit:
raise StopAsyncIteration
if event.type == PtyEventType.error and event.fatal:
self._done = True
return event
return event
async def __aenter__(self) -> AsyncPtySession:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
try:
await self.kill()
except Exception:
pass
try:
await self._ws.close()
except Exception:
pass

26
src/wrenn/sandbox.py Normal file
View File

@ -0,0 +1,26 @@
import warnings as _warnings
from wrenn.capsule import ( # noqa: F401
CodeResult,
ExecResult,
StreamErrorEvent,
StreamEvent,
StreamExitEvent,
StreamStartEvent,
StreamStderrEvent,
StreamStdoutEvent,
_build_proxy_url,
_parse_stream_event,
)
from wrenn.capsule import Capsule
def __getattr__(name: str) -> type:
if name == "Sandbox":
_warnings.warn(
"'Sandbox' is deprecated, use 'Capsule' instead",
DeprecationWarning,
stacklevel=2,
)
return Capsule
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

View File

@ -0,0 +1,95 @@
from __future__ import annotations
import os
from typing import Generator
import pytest
import pytest_asyncio
from typing_extensions import AsyncGenerator
from wrenn.capsule import Capsule
from wrenn.client import AsyncWrennClient, WrennClient
WRENN_API_KEY = os.environ.get("WRENN_API_KEY")
WRENN_TOKEN = os.environ.get("WRENN_TOKEN")
WRENN_BASE_URL = os.environ.get("WRENN_BASE_URL", "http://localhost:8080")
WRENN_TEST_EMAIL = os.environ.get("WRENN_TEST_EMAIL")
WRENN_TEST_PASSWORD = os.environ.get("WRENN_TEST_PASSWORD")
def _has_auth() -> bool:
return bool(WRENN_API_KEY or WRENN_TOKEN)
requires_auth = pytest.mark.skipif(
not _has_auth(),
reason="Set WRENN_API_KEY or WRENN_TOKEN to run integration tests",
)
@pytest.fixture
def client() -> Generator[WrennClient, None, None]:
with WrennClient(
api_key=WRENN_API_KEY,
token=WRENN_TOKEN,
base_url=WRENN_BASE_URL,
) as c:
yield c
@pytest_asyncio.fixture
async def async_client() -> AsyncGenerator[AsyncWrennClient, None]:
async with AsyncWrennClient(
api_key=WRENN_API_KEY, token=WRENN_TOKEN, base_url=WRENN_BASE_URL
) as c:
yield c
@pytest.fixture
def bearer_client() -> Generator[WrennClient, None, None]:
if WRENN_TOKEN:
with WrennClient(token=WRENN_TOKEN, base_url=WRENN_BASE_URL) as c:
yield c
elif WRENN_TEST_EMAIL and WRENN_TEST_PASSWORD:
with WrennClient(api_key=WRENN_API_KEY, base_url=WRENN_BASE_URL) as c:
resp = c.auth.login(WRENN_TEST_EMAIL, WRENN_TEST_PASSWORD)
with WrennClient(token=resp.token, base_url=WRENN_BASE_URL) as c:
yield c
else:
pytest.skip(
"Set WRENN_TOKEN or WRENN_TEST_EMAIL+WRENN_TEST_PASSWORD for bearer-auth tests"
)
@pytest_asyncio.fixture
async def async_minimal_capsule(
async_client: AsyncWrennClient,
) -> AsyncGenerator[Capsule, None]:
"""Provides a ready-to-use minimal capsule and cleans it up afterward."""
cap = await async_client.capsules.create(template="minimal", timeout_sec=120)
await cap.async_wait_ready(timeout=60, interval=1)
yield cap
await cap.async_destroy()
@pytest_asyncio.fixture
async def async_python_capsule(
async_client: AsyncWrennClient,
) -> AsyncGenerator[Capsule, None]:
"""Provides a ready-to-use Python interpreter capsule."""
cap = await async_client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
)
await cap.async_wait_ready(timeout=60, interval=1)
yield cap
await cap.async_destroy()
@pytest.fixture
def minimal_capsule(
client: WrennClient,
) -> Generator[Capsule, None, None]:
"""Provides a ready-to-use minimal capsule and cleans it up afterward."""
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
yield cap

View File

@ -0,0 +1,79 @@
from __future__ import annotations
import pytest
from wrenn.capsule import Capsule, ExecResult
from .conftest import requires_auth
# --- Tests ---
@requires_auth
class TestAsyncCapsuleLifecycle:
@pytest.mark.asyncio
async def test_async_create_exec_destroy(self, async_minimal_capsule: Capsule):
result = await async_minimal_capsule.async_exec("echo", args=["async_hello"])
assert isinstance(result, ExecResult)
assert result.exit_code == 0
assert "async_hello" in result.stdout
@pytest.mark.asyncio
async def test_async_upload_download(self, async_minimal_capsule: Capsule):
content = b"Async upload test"
await async_minimal_capsule.async_upload("/tmp/async_test.txt", content)
downloaded = await async_minimal_capsule.async_download("/tmp/async_test.txt")
assert downloaded == content
@pytest.mark.asyncio
async def test_async_run_code(self, async_python_capsule: Capsule):
r = await async_python_capsule.async_run_code("42 * 2")
assert r.text == "84"
@requires_auth
class TestAsyncFilesystem:
@pytest.mark.asyncio
async def test_async_list_dir(self, async_minimal_capsule: Capsule):
await async_minimal_capsule.async_mkdir("/tmp/async_ls_test")
await async_minimal_capsule.async_upload("/tmp/async_ls_test/file.txt", b"data")
entries = await async_minimal_capsule.async_list_dir("/tmp/async_ls_test")
assert isinstance(entries, list)
assert any(e.name == "file.txt" for e in entries)
@pytest.mark.asyncio
async def test_async_mkdir(self, async_minimal_capsule: Capsule):
entry = await async_minimal_capsule.async_mkdir("/tmp/async_mkdir_test")
assert entry.type == "directory"
assert entry.name == "async_mkdir_test"
@pytest.mark.asyncio
async def test_async_remove(self, async_minimal_capsule: Capsule):
await async_minimal_capsule.async_upload("/tmp/async_rm.txt", b"bye")
entries = await async_minimal_capsule.async_list_dir("/tmp")
assert any(e.name == "async_rm.txt" for e in entries)
await async_minimal_capsule.async_remove("/tmp/async_rm.txt")
entries = await async_minimal_capsule.async_list_dir("/tmp")
assert not any(e.name == "async_rm.txt" for e in entries)
@pytest.mark.asyncio
async def test_async_full_filesystem_roundtrip(
self, async_minimal_capsule: Capsule
):
await async_minimal_capsule.async_mkdir("/tmp/async_rt")
await async_minimal_capsule.async_upload(
"/tmp/async_rt/file.txt", b"async content"
)
entries = await async_minimal_capsule.async_list_dir("/tmp/async_rt")
assert any(e.name == "file.txt" for e in entries)
data = await async_minimal_capsule.async_download("/tmp/async_rt/file.txt")
assert data == b"async content"
await async_minimal_capsule.async_remove("/tmp/async_rt/file.txt")
entries = await async_minimal_capsule.async_list_dir("/tmp/async_rt")
assert not any(e.name == "file.txt" for e in entries)

View File

@ -0,0 +1,28 @@
from __future__ import annotations
from wrenn.client import WrennClient
from .conftest import requires_auth
@requires_auth
class TestSnapshots:
def test_list_templates(self, client: WrennClient):
templates = client.snapshots.list()
assert isinstance(templates, list)
@requires_auth
class TestAPIKeys:
def test_create_list_delete(self, bearer_client: WrennClient):
key_resp = bearer_client.api_keys.create(name="integration-test-key")
assert key_resp.name == "integration-test-key"
assert key_resp.key is not None
assert key_resp.id is not None
try:
keys = bearer_client.api_keys.list()
ids = [k.id for k in keys]
assert key_resp.id in ids
finally:
bearer_client.api_keys.delete(key_resp.id)

View File

@ -0,0 +1,91 @@
from __future__ import annotations
import pytest
from wrenn.capsule import Capsule
from wrenn.client import WrennClient
from wrenn.exceptions import WrennNotFoundError, WrennValidationError
from .conftest import requires_auth
@requires_auth
class TestCapsuleLifecycle:
def test_create_exec_destroy(self, minimal_capsule: Capsule):
result = minimal_capsule.exec("echo", args=["hello"])
assert result.exit_code == 0
assert "hello" in result.stdout
def test_exec_with_args(self, minimal_capsule: Capsule):
result = minimal_capsule.exec("echo", args=["hello", "world"])
assert result.exit_code == 0
assert "hello world" in result.stdout
def test_exec_nonzero_exit(self, minimal_capsule: Capsule):
result = minimal_capsule.exec("sh", args=["-c", "exit 42"])
assert result.exit_code == 42
def test_exec_stderr(self, minimal_capsule: Capsule):
result = minimal_capsule.exec("sh", args=["-c", "echo err>&2"])
assert result.exit_code == 0
assert "err" in result.stderr
def test_context_manager_cleanup(self, client: WrennClient):
# This test explicitly requires manual management to verify the context manager
cap = client.capsules.create(template="minimal", timeout_sec=120)
cap_id = cap.id
with cap:
cap.wait_ready(timeout=60, interval=1)
fetched = client.capsules.get(cap_id)
assert fetched.status in ("stopped", "destroyed")
@requires_auth
class TestPauseResume:
def test_pause_and_resume(self, minimal_capsule: Capsule):
minimal_capsule.pause()
assert minimal_capsule.status == "paused"
minimal_capsule.resume()
minimal_capsule.wait_ready(timeout=60, interval=1)
result = minimal_capsule.exec("echo", args=["resumed"])
assert result.exit_code == 0
assert "resumed" in result.stdout
@requires_auth
class TestPing:
def test_ping_resets_timer(self, minimal_capsule: Capsule):
minimal_capsule.ping()
result = minimal_capsule.exec("echo", args=["still_alive"])
assert result.exit_code == 0
assert "still_alive" in result.stdout
@requires_auth
class TestProxy:
def test_get_url(self, minimal_capsule: Capsule):
url = minimal_capsule.get_url(8888)
assert minimal_capsule.id in url
assert "8888" in url
@requires_auth
class TestListAndGet:
def test_list_capsules(self, client: WrennClient, minimal_capsule: Capsule):
# Require minimal_capsule to ensure one exists, use client to list
boxes = client.capsules.list()
ids = [b.id for b in boxes]
assert minimal_capsule.id in ids
def test_get_existing_capsule(self, client: WrennClient, minimal_capsule: Capsule):
fetched = client.capsules.get(minimal_capsule.id)
assert fetched.id == minimal_capsule.id
assert fetched.status == "running"
def test_get_nonexistent_capsule(self, client: WrennClient):
with pytest.raises((WrennNotFoundError, WrennValidationError)):
client.capsules.get("cl-nonexistent00000000000000000")

View File

@ -0,0 +1,133 @@
from __future__ import annotations
import pytest
from wrenn.client import WrennClient
from .conftest import requires_auth
@requires_auth
class TestFileIO:
def test_upload_and_download(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
content = b"Hello from integration test!"
cap.upload("/tmp/test_file.txt", content)
downloaded = cap.download("/tmp/test_file.txt")
assert downloaded == content
def test_download_nonexistent_file(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with pytest.raises(Exception):
cap.download("/tmp/no_such_file_12345")
@requires_auth
class TestFilesystemListDir:
def test_list_dir_root(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/ls_test_root")
cap.upload("/tmp/ls_test_root/hello.txt", b"hello")
entries = cap.list_dir("/tmp/ls_test_root")
assert isinstance(entries, list)
names = [e.name for e in entries]
assert "hello.txt" in names
def test_list_dir_after_mkdir(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/fs_test_dir")
entries = cap.list_dir("/tmp")
names = [e.name for e in entries]
assert "fs_test_dir" in names
def test_list_dir_file_metadata(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.upload("/tmp/meta_test.txt", b"hello world")
entries = cap.list_dir("/tmp")
match = [e for e in entries if e.name == "meta_test.txt"]
assert len(match) == 1
f = match[0]
assert f.type == "file"
assert f.size == 11
assert f.permissions is not None
assert f.owner is not None
assert f.group is not None
assert f.modified_at is not None
def test_list_dir_depth(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/depth_a/depth_b")
cap.upload("/tmp/depth_a/depth_b/nested.txt", b"deep")
entries = cap.list_dir("/tmp/depth_a", depth=2)
paths = [e.path for e in entries]
assert any("nested.txt" in p for p in paths)
def test_list_dir_empty_directory(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/empty_dir_test")
entries = cap.list_dir("/tmp/empty_dir_test")
assert entries == []
@requires_auth
class TestFilesystemMkdir:
def test_mkdir_creates_directory(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
entry = cap.mkdir("/tmp/mkdir_test")
assert entry.name == "mkdir_test"
assert entry.type == "directory"
assert entry.path == "/tmp/mkdir_test"
def test_mkdir_creates_parents(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
entry = cap.mkdir("/tmp/a/b/c/d")
assert entry.type == "directory"
def test_mkdir_already_exists(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/exist_test")
entry = cap.mkdir("/tmp/exist_test")
assert entry.type == "directory"
@requires_auth
class TestFilesystemRemove:
def test_remove_file(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.upload("/tmp/rm_test.txt", b"delete me")
entries_before = cap.list_dir("/tmp")
assert any(e.name == "rm_test.txt" for e in entries_before)
cap.remove("/tmp/rm_test.txt")
entries_after = cap.list_dir("/tmp")
assert not any(e.name == "rm_test.txt" for e in entries_after)
def test_remove_directory(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/rm_dir_test")
cap.upload("/tmp/rm_dir_test/file.txt", b"inside")
cap.remove("/tmp/rm_dir_test")
entries = cap.list_dir("/tmp")
assert not any(e.name == "rm_dir_test" for e in entries)
def test_upload_download_remove_roundtrip(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
content = b"round trip test data " * 100
cap.upload("/tmp/rt.txt", content)
downloaded = cap.download("/tmp/rt.txt")
assert downloaded == content
cap.remove("/tmp/rt.txt")
with pytest.raises(Exception):
cap.download("/tmp/rt.txt")

View File

@ -0,0 +1,77 @@
from __future__ import annotations
from wrenn.client import WrennClient
from wrenn.pty import PtyEventType
from .conftest import requires_auth
@requires_auth
class TestPty:
def test_pty_basic_output(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", cwd="/tmp") as term:
term.write(b"echo pty_hello\n")
output = b""
for event in term:
if event.type == PtyEventType.output:
output += event.data
elif event.type == PtyEventType.exit:
break
if b"pty_hello" in output:
term.write(b"exit\n")
assert b"pty_hello" in output
def test_pty_tag_and_pid(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh") as term:
started = False
for event in term:
if event.type == PtyEventType.started:
started = True
assert term.tag is not None
assert term.pid is not None
assert term.tag.startswith("pty-")
elif event.type == PtyEventType.output:
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
assert started
def test_pty_exit_on_command_exit(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/echo", args=["immediate"]) as term:
events = list(term)
types = [e.type for e in events]
assert PtyEventType.started in types
assert PtyEventType.output in types or PtyEventType.exit in types
def test_pty_resize(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", cols=80, rows=24) as term:
for event in term:
if event.type == PtyEventType.started:
term.resize(120, 40)
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
def test_pty_envs(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", envs={"MY_VAR": "hello_env"}) as term:
output = b""
for event in term:
if event.type == PtyEventType.started:
term.write(b"echo $MY_VAR\n")
elif event.type == PtyEventType.output:
output += event.data
if b"hello_env" in output:
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
assert b"hello_env" in output

View File

@ -0,0 +1,49 @@
from __future__ import annotations
from wrenn.client import WrennClient
from .conftest import requires_auth
@requires_auth
class TestRunCode:
def test_basic_execution(self, client: WrennClient):
with client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
) as cap:
cap.wait_ready(timeout=60, interval=1)
r = cap.run_code("x = 42")
assert r.error is None
r = cap.run_code("x * 2")
assert r.text == "84"
def test_state_persists(self, client: WrennClient):
with client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.run_code("def greet(name): return f'hello {name}'")
r = cap.run_code("greet('capsule')")
assert "hello capsule" in (r.text or "")
def test_error_traceback(self, client: WrennClient):
with client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
) as cap:
cap.wait_ready(timeout=60, interval=1)
r = cap.run_code("1/0")
assert r.error is not None
assert "ZeroDivisionError" in r.error
def test_stdout_capture(self, client: WrennClient):
with client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
) as cap:
cap.wait_ready(timeout=60, interval=1)
r = cap.run_code("print('hello from kernel')")
assert "hello from kernel" in r.stdout

View File

@ -0,0 +1,30 @@
from __future__ import annotations
from wrenn.client import WrennClient
from .conftest import requires_auth
@requires_auth
class TestStreamUploadDownload:
def test_stream_upload_and_download(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
chunks = [b"chunk0_", b"chunk1_", b"chunk2"]
def data_gen():
yield from chunks
cap.stream_upload("/tmp/stream_test.bin", data_gen())
downloaded = cap.download("/tmp/stream_test.bin")
assert downloaded == b"chunk0_chunk1_chunk2"
def test_stream_download_large(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
content = b"x" * 65536 * 3
cap.upload("/tmp/large.bin", content)
collected = b""
for chunk in cap.stream_download("/tmp/large.bin"):
collected += chunk
assert collected == content

View File

@ -0,0 +1,208 @@
from __future__ import annotations
import pytest
import respx
from wrenn.capsule import Capsule, CodeResult, _build_proxy_url
from wrenn.client import WrennClient
@pytest.fixture
def client():
with WrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-test-token-abc123"
) as c:
yield c
class TestBuildProxyUrl:
def test_https_production(self):
url = _build_proxy_url("https://api.wrenn.dev", "cl-abc123", 8888)
assert url == "wss://8888-cl-abc123.api.wrenn.dev"
def test_http_localhost(self):
url = _build_proxy_url("http://localhost:8080", "cl-abc123", 3000)
assert url == "ws://3000-cl-abc123.localhost:8080"
def test_https_custom_port(self):
url = _build_proxy_url("https://api.example.com:9443", "sb-1", 8080)
assert url == "wss://8080-sb-1.api.example.com:9443"
def test_http_no_port(self):
url = _build_proxy_url("http://192.168.1.1", "sb-2", 5000)
assert url == "ws://5000-sb-2.192.168.1.1"
class TestCapsuleGetUrl:
@respx.mock
def test_get_url_returns_proxy_url(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "cl-abc", "status": "pending"}
)
cap = client.capsules.create(template="minimal")
url = cap.get_url(8888)
assert url == "wss://8888-cl-abc.api.wrenn.dev"
@respx.mock
def test_get_url_localhost(self):
with WrennClient(
api_key="wrn_test1234567890abcdef12345678",
base_url="http://localhost:8080",
) as c:
respx.post("http://localhost:8080/v1/capsules").respond(
201, json={"id": "cl-xyz", "status": "pending"}
)
cap = c.capsules.create()
url = cap.get_url(3000)
assert url == "ws://3000-cl-xyz.localhost:8080"
class TestCapsuleHttpClient:
@respx.mock
def test_http_client_has_api_key_header(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "cl-abc", "status": "pending"}
)
cap = client.capsules.create()
hc = cap.http_client
assert hc.headers["X-API-Key"] == "wrn_test1234567890abcdef12345678"
@respx.mock
def test_http_client_sends_to_proxy(self, client):
route = respx.get("https://8888-cl-abc.api.wrenn.dev/api/kernels").respond(
200, json=[]
)
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "cl-abc", "status": "pending"}
)
cap = client.capsules.create()
resp = cap.http_client.get("/api/kernels")
assert resp.status_code == 200
assert route.called
def test_jwt_only_get_url_works(self):
with WrennClient(token="jwt-abc") as c:
cap = Capsule(id="cl-abc")
assert c._mgmt_http is not None
cap._bind(
c._mgmt_http, str(c._mgmt_http.base_url), api_key=None, token="jwt-abc"
)
url = cap.get_url(8888)
assert "8888-cl-abc" in url
def test_jwt_only_http_client_has_bearer_header(self):
with WrennClient(token="jwt-abc") as c:
cap = Capsule(id="cl-abc")
assert c._mgmt_http is not None
cap._bind(
c._mgmt_http, str(c._mgmt_http.base_url), api_key=None, token="jwt-abc"
)
hc = cap.http_client
assert hc.headers["Authorization"] == "Bearer jwt-abc"
class TestCreateReturnsBoundCapsule:
@respx.mock
def test_create_returns_capsule_subclass(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending", "template": "minimal"}
)
cap = client.capsules.create(template="minimal")
assert isinstance(cap, Capsule)
assert cap.id == "cl-1"
assert hasattr(cap, "exec")
assert hasattr(cap, "run_code")
assert hasattr(cap, "get_url")
@respx.mock
def test_create_context_manager(self, client):
route = respx.delete("https://api.wrenn.dev/v1/capsules/cl-1").respond(204)
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending"}
)
cap = client.capsules.create()
with cap:
assert cap.id == "cl-1"
assert route.called
class TestCodeResult:
def test_defaults(self):
r = CodeResult()
assert r.text is None
assert r.data is None
assert r.stdout == ""
assert r.stderr == ""
assert r.error is None
def test_with_values(self):
r = CodeResult(
text="84",
data={"text/plain": "84"},
stdout="",
stderr="",
error=None,
)
assert r.text == "84"
assert r.data is not None
assert r.data["text/plain"] == "84"
def test_error_result(self):
r = CodeResult(error="ZeroDivisionError: division by zero\n...")
assert r.error is not None
assert "ZeroDivisionError" in r.error
class TestJupyterMessageFormat:
def test_execute_request_structure(self):
cap = Capsule(id="test")
msg = cap._jupyter_execute_request("x = 42")
assert msg["msg_type"] == "execute_request"
assert msg["content"]["code"] == "x = 42"
assert msg["content"]["silent"] is False
assert "msg_id" in msg
assert "header" in msg
assert msg["header"]["msg_type"] == "execute_request"
def test_execute_request_unique_ids(self):
cap = Capsule(id="test")
m1 = cap._jupyter_execute_request("a")
m2 = cap._jupyter_execute_request("b")
assert m1["msg_id"] != m2["msg_id"]
class TestDeprecationWarnings:
def test_import_sandbox_from_capsule_warns(self):
import warnings
import wrenn.capsule as capsule_mod
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
klass = capsule_mod.Sandbox
assert klass is Capsule
assert len(w) == 1
assert issubclass(w[0].category, DeprecationWarning)
assert "Sandbox" in str(w[0].message)
def test_import_sandbox_from_wrenn_warns(self):
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
from wrenn import Sandbox
assert Sandbox is Capsule
assert any(issubclass(x.category, DeprecationWarning) for x in w)
def test_client_sandboxes_property_warns(self):
import warnings
with WrennClient(api_key="wrn_test1234567890abcdef12345678") as c:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
resource = c.sandboxes
assert resource is c.capsules
assert len(w) == 1
assert issubclass(w[0].category, DeprecationWarning)
assert "sandboxes" in str(w[0].message)

527
tests/test_client.py Normal file
View File

@ -0,0 +1,527 @@
from __future__ import annotations
import pytest
import respx
from wrenn.client import AsyncWrennClient, WrennClient
from wrenn.exceptions import (
WrennAgentError,
WrennAuthenticationError,
WrennConflictError,
WrennForbiddenError,
WrennHostHasCapsulesError,
WrennInternalError,
WrennNotFoundError,
WrennValidationError,
)
from wrenn.models import (
APIKeyResponse,
Capsule,
CreateHostResponse,
Host,
SignupResponse,
Status,
Template,
UsageResponse,
)
@pytest.fixture
def client():
with WrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-test-token-abc123"
) as c:
yield c
@pytest.fixture
def async_client():
return AsyncWrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-test-token-abc123"
)
class TestAuth:
@respx.mock
def test_signup(self, client):
respx.post("https://api.wrenn.dev/v1/auth/signup").respond(
201,
json={"message": "Account created. Check your email to activate."},
)
resp = client.auth.signup("a@b.com", "password123", "Test User")
assert isinstance(resp, SignupResponse)
assert resp.message is not None
@respx.mock
def test_signup_no_creds(self):
respx.post("https://api.wrenn.dev/v1/auth/signup").respond(
201,
json={"message": "Account created."},
)
with WrennClient() as c:
resp = c.auth.signup("a@b.com", "password123", "Test User")
assert isinstance(resp, SignupResponse)
@respx.mock
def test_login(self, client):
respx.post("https://api.wrenn.dev/v1/auth/login").respond(
200,
json={"token": "jwt-token", "email": "a@b.com"},
)
resp = client.auth.login("a@b.com", "password123")
assert resp.token == "jwt-token"
class TestAPIKeys:
@respx.mock
def test_create(self, client):
respx.post("https://api.wrenn.dev/v1/api-keys").respond(
201,
json={
"id": "key-1",
"name": "my-key",
"key_prefix": "wrn_ab12cd34",
"key": "wrn_ab12cd34fullkey",
},
)
resp = client.api_keys.create(name="my-key")
assert isinstance(resp, APIKeyResponse)
assert resp.name == "my-key"
assert resp.key == "wrn_ab12cd34fullkey"
@respx.mock
def test_list(self, client):
respx.get("https://api.wrenn.dev/v1/api-keys").respond(
200,
json=[{"id": "key-1", "name": "k1"}, {"id": "key-2", "name": "k2"}],
)
keys = client.api_keys.list()
assert len(keys) == 2
assert keys[0].id == "key-1"
@respx.mock
def test_delete(self, client):
route = respx.delete("https://api.wrenn.dev/v1/api-keys/key-1").respond(204)
client.api_keys.delete("key-1")
assert route.called
class TestCapsules:
@respx.mock
def test_create(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201,
json={
"id": "sb-1",
"status": "pending",
"template": "base-python",
"vcpus": 2,
"memory_mb": 1024,
},
)
resp = client.capsules.create(template="base-python", vcpus=2, memory_mb=1024)
assert isinstance(resp, Capsule)
assert resp.id == "sb-1"
assert resp.status == Status.pending
@respx.mock
def test_create_defaults(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "sb-2", "status": "pending"}
)
resp = client.capsules.create()
assert resp.id == "sb-2"
@respx.mock
def test_list(self, client):
respx.get("https://api.wrenn.dev/v1/capsules").respond(
200, json=[{"id": "sb-1", "status": "running"}]
)
boxes = client.capsules.list()
assert len(boxes) == 1
assert boxes[0].status == Status.running
@respx.mock
def test_get(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/sb-1").respond(
200, json={"id": "sb-1", "status": "running"}
)
resp = client.capsules.get("sb-1")
assert resp.id == "sb-1"
@respx.mock
def test_destroy(self, client):
route = respx.delete("https://api.wrenn.dev/v1/capsules/sb-1").respond(204)
client.capsules.destroy("sb-1")
assert route.called
@respx.mock
def test_usage(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/usage").respond(
200,
json={
"from": "2026-03-21",
"to": "2026-04-20",
"points": [
{
"date": "2026-04-19",
"cpu_minutes": 12.5,
"ram_mb_minutes": 640.0,
},
{"date": "2026-04-20", "cpu_minutes": 8.0, "ram_mb_minutes": 512.0},
],
},
)
resp = client.capsules.usage()
assert isinstance(resp, UsageResponse)
assert resp.points is not None
assert len(resp.points) == 2
assert resp.points[0].cpu_minutes == 12.5
@respx.mock
def test_usage_with_dates(self, client):
route = respx.get("https://api.wrenn.dev/v1/capsules/usage").respond(
200,
json={"from": "2026-04-01", "to": "2026-04-15", "points": []},
)
client.capsules.usage(from_date="2026-04-01", to_date="2026-04-15")
req = route.calls[0].request
assert "from=2026-04-01" in str(req.url)
assert "to=2026-04-15" in str(req.url)
class TestSnapshots:
@respx.mock
def test_create(self, client):
respx.post("https://api.wrenn.dev/v1/snapshots").respond(
201,
json={"name": "snap-1", "type": "snapshot", "vcpus": 1},
)
resp = client.snapshots.create(capsule_id="sb-1", name="snap-1")
assert isinstance(resp, Template)
assert resp.name == "snap-1"
@respx.mock
def test_create_with_overwrite(self, client):
route = respx.post("https://api.wrenn.dev/v1/snapshots").respond(
201, json={"name": "snap-1", "type": "snapshot"}
)
client.snapshots.create(capsule_id="sb-1", overwrite=True)
req = route.calls[0].request
assert "overwrite=true" in str(req.url)
@respx.mock
def test_list(self, client):
respx.get("https://api.wrenn.dev/v1/snapshots").respond(
200, json=[{"name": "base-python", "type": "base"}]
)
snaps = client.snapshots.list()
assert len(snaps) == 1
@respx.mock
def test_list_with_filter(self, client):
route = respx.get("https://api.wrenn.dev/v1/snapshots").respond(200, json=[])
client.snapshots.list(type="snapshot")
req = route.calls[0].request
assert "type=snapshot" in str(req.url)
@respx.mock
def test_delete(self, client):
route = respx.delete("https://api.wrenn.dev/v1/snapshots/snap-1").respond(204)
client.snapshots.delete("snap-1")
assert route.called
class TestHosts:
@respx.mock
def test_create(self, client):
respx.post("https://api.wrenn.dev/v1/hosts").respond(
201,
json={
"host": {"id": "h-1", "type": "regular", "status": "pending"},
"registration_token": "reg-tok-123",
},
)
resp = client.hosts.create(type="regular")
assert isinstance(resp, CreateHostResponse)
assert resp.registration_token == "reg-tok-123"
@respx.mock
def test_list(self, client):
respx.get("https://api.wrenn.dev/v1/hosts").respond(
200, json=[{"id": "h-1", "status": "online"}]
)
hosts = client.hosts.list()
assert len(hosts) == 1
assert isinstance(hosts[0], Host)
@respx.mock
def test_get(self, client):
respx.get("https://api.wrenn.dev/v1/hosts/h-1").respond(
200, json={"id": "h-1", "status": "online"}
)
resp = client.hosts.get("h-1")
assert resp.id == "h-1"
@respx.mock
def test_delete(self, client):
route = respx.delete("https://api.wrenn.dev/v1/hosts/h-1").respond(204)
client.hosts.delete("h-1")
assert route.called
@respx.mock
def test_regenerate_token(self, client):
respx.post("https://api.wrenn.dev/v1/hosts/h-1/token").respond(
201,
json={
"host": {"id": "h-1"},
"registration_token": "new-tok",
},
)
resp = client.hosts.regenerate_token("h-1")
assert resp.registration_token == "new-tok"
@respx.mock
def test_list_tags(self, client):
respx.get("https://api.wrenn.dev/v1/hosts/h-1/tags").respond(
200, json=["gpu", "high-mem"]
)
tags = client.hosts.list_tags("h-1")
assert tags == ["gpu", "high-mem"]
@respx.mock
def test_add_tag(self, client):
route = respx.post("https://api.wrenn.dev/v1/hosts/h-1/tags").respond(204)
client.hosts.add_tag("h-1", "gpu")
assert route.called
@respx.mock
def test_remove_tag(self, client):
route = respx.delete("https://api.wrenn.dev/v1/hosts/h-1/tags/gpu").respond(204)
client.hosts.remove_tag("h-1", "gpu")
assert route.called
class TestErrorHandling:
@respx.mock
def test_validation_error(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
400,
json={"error": {"code": "invalid_request", "message": "bad input"}},
)
with pytest.raises(WrennValidationError) as exc_info:
client.capsules.create()
assert exc_info.value.code == "invalid_request"
assert exc_info.value.status_code == 400
@respx.mock
def test_auth_error(self, client):
respx.get("https://api.wrenn.dev/v1/capsules").respond(
401,
json={"error": {"code": "unauthorized", "message": "bad key"}},
)
with pytest.raises(WrennAuthenticationError):
client.capsules.list()
@respx.mock
def test_forbidden_error(self, client):
respx.post("https://api.wrenn.dev/v1/hosts").respond(
403,
json={"error": {"code": "forbidden", "message": "nope"}},
)
with pytest.raises(WrennForbiddenError):
client.hosts.create(type="regular")
@respx.mock
def test_not_found_error(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/nope").respond(
404,
json={"error": {"code": "not_found", "message": "capsule not found"}},
)
with pytest.raises(WrennNotFoundError):
client.capsules.get("nope")
@respx.mock
def test_conflict_error(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/sb-1").respond(
409,
json={"error": {"code": "invalid_state", "message": "not running"}},
)
with pytest.raises(WrennConflictError):
client.capsules.get("sb-1")
@respx.mock
def test_host_has_capsules_error(self, client):
respx.delete("https://api.wrenn.dev/v1/hosts/h-1").respond(
409,
json={
"error": {
"code": "host_has_capsules",
"message": "host has running capsules",
},
"sandbox_ids": ["sb-1", "sb-2"],
},
)
with pytest.raises(WrennHostHasCapsulesError) as exc_info:
client.hosts.delete("h-1")
assert exc_info.value.capsule_ids == ["sb-1", "sb-2"]
@respx.mock
def test_agent_error(self, client):
respx.post("https://api.wrenn.dev/v1/capsules").respond(
502,
json={"error": {"code": "agent_error", "message": "host agent failed"}},
)
with pytest.raises(WrennAgentError):
client.capsules.create()
@respx.mock
def test_internal_error(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/sb-1").respond(
500,
json={"error": {"code": "internal_error", "message": "oops"}},
)
with pytest.raises(WrennInternalError):
client.capsules.get("sb-1")
@respx.mock
def test_unknown_error_code_falls_back(self, client):
respx.get("https://api.wrenn.dev/v1/capsules/sb-1").respond(
418,
json={"error": {"code": "teapot", "message": "I'm a teapot"}},
)
from wrenn.exceptions import WrennError
with pytest.raises(WrennError) as exc_info:
client.capsules.get("sb-1")
assert exc_info.value.code == "teapot"
class TestAuthModes:
def test_api_key_only_creates_data_client(self):
with WrennClient(api_key="wrn_test1234567890abcdef12345678") as c:
assert c._data_http is not None
assert (
c._data_http.headers["X-API-Key"] == "wrn_test1234567890abcdef12345678"
)
assert c._mgmt_http is None
def test_token_only_creates_mgmt_client(self):
with WrennClient(token="jwt-token-abc") as c:
assert c._mgmt_http is not None
assert c._mgmt_http.headers["Authorization"] == "Bearer jwt-token-abc"
assert c._data_http is None
def test_no_auth_allowed(self):
with WrennClient() as c:
assert c._data_http is None
assert c._mgmt_http is None
assert c._public_http is not None
def test_both_creds_creates_both_clients(self):
with WrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-abc"
) as c:
assert c._data_http is not None
assert c._mgmt_http is not None
def test_capsule_ops_require_api_key(self):
with WrennClient(token="jwt-abc") as c:
with pytest.raises(ValueError, match="API key"):
c.capsules.list()
def test_snapshot_ops_require_api_key(self):
with WrennClient(token="jwt-abc") as c:
with pytest.raises(ValueError, match="API key"):
c.snapshots.list()
def test_mgmt_ops_require_token(self):
with WrennClient(api_key="wrn_test1234567890abcdef12345678") as c:
with pytest.raises(ValueError, match="JWT token"):
c.api_keys.list()
with pytest.raises(ValueError, match="JWT token"):
c.teams.list()
with pytest.raises(ValueError, match="JWT token"):
c.hosts.list()
with pytest.raises(ValueError, match="JWT token"):
c.channels.list()
with pytest.raises(ValueError, match="JWT token"):
c.users.search("a@b.com")
with pytest.raises(ValueError, match="JWT token"):
c.account.get()
with pytest.raises(ValueError, match="JWT token"):
c.auth.switch_team("team-1")
@respx.mock
def test_mgmt_sends_bearer_only(self):
route = respx.get("https://api.wrenn.dev/v1/api-keys").respond(200, json=[])
with WrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-abc"
) as c:
c.api_keys.list()
req = route.calls[0].request
assert req.headers["Authorization"] == "Bearer jwt-abc"
assert "X-API-Key" not in req.headers
@respx.mock
def test_data_sends_api_key_only(self):
route = respx.get("https://api.wrenn.dev/v1/capsules").respond(200, json=[])
with WrennClient(
api_key="wrn_test1234567890abcdef12345678", token="jwt-abc"
) as c:
c.capsules.list()
req = route.calls[0].request
assert req.headers["X-API-Key"] == "wrn_test1234567890abcdef12345678"
assert "Authorization" not in req.headers
@respx.mock
def test_public_sends_no_auth(self):
route = respx.post("https://api.wrenn.dev/v1/auth/signup").respond(
201, json={"message": "ok"}
)
with WrennClient() as c:
c.auth.signup("a@b.com", "password123", "Test")
req = route.calls[0].request
assert "X-API-Key" not in req.headers
assert "Authorization" not in req.headers
class TestAsyncClient:
@pytest.mark.asyncio
@respx.mock
async def test_async_capsules_create(self, async_client):
async with async_client:
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": "sb-1", "status": "pending"}
)
resp = await async_client.capsules.create(template="base-python")
assert resp.id == "sb-1"
@pytest.mark.asyncio
@respx.mock
async def test_async_capsules_list(self, async_client):
async with async_client:
respx.get("https://api.wrenn.dev/v1/capsules").respond(
200, json=[{"id": "sb-1"}]
)
boxes = await async_client.capsules.list()
assert len(boxes) == 1
@pytest.mark.asyncio
@respx.mock
async def test_async_hosts_list(self, async_client):
async with async_client:
respx.get("https://api.wrenn.dev/v1/hosts").respond(200, json=[])
hosts = await async_client.hosts.list()
assert hosts == []
@pytest.mark.asyncio
@respx.mock
async def test_async_error_handling(self, async_client):
async with async_client:
respx.get("https://api.wrenn.dev/v1/capsules/nope").respond(
404,
json={"error": {"code": "not_found", "message": "not found"}},
)
with pytest.raises(WrennNotFoundError):
await async_client.capsules.get("nope")

View File

@ -0,0 +1,507 @@
from __future__ import annotations
import base64
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
import respx
from wrenn.capsule import Capsule
from wrenn.client import WrennClient
from wrenn.models import FileEntry
from wrenn.pty import (
AsyncPtySession,
PtyEventType,
PtySession,
_parse_pty_event,
)
@pytest.fixture
def client():
with WrennClient(api_key="wrn_test1234567890abcdef12345678") as c:
yield c
def _make_capsule(client: WrennClient, cap_id: str = "cl-abc") -> Capsule:
respx.post("https://api.wrenn.dev/v1/capsules").respond(
201, json={"id": cap_id, "status": "running"}
)
return client.capsules.create()
class TestListDir:
@respx.mock
def test_list_dir_returns_entries(self, client):
cap = _make_capsule(client)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/list").respond(
200,
json={
"entries": [
{
"name": "main.py",
"path": "/home/user/main.py",
"type": "file",
"size": 1024,
"mode": 33188,
"permissions": "-rw-r--r--",
"owner": "root",
"group": "root",
"modified_at": 1712899200,
"symlink_target": None,
},
{
"name": "config",
"path": "/home/user/config",
"type": "directory",
"size": 4096,
"mode": 16877,
"permissions": "drwxr-xr-x",
"owner": "root",
"group": "root",
"modified_at": 1712899100,
"symlink_target": None,
},
]
},
)
entries = cap.list_dir("/home/user")
assert len(entries) == 2
assert isinstance(entries[0], FileEntry)
assert entries[0].name == "main.py"
assert entries[0].type == "file"
assert entries[1].name == "config"
assert entries[1].type == "directory"
@respx.mock
def test_list_dir_with_depth(self, client):
cap = _make_capsule(client)
route = respx.post(
"https://api.wrenn.dev/v1/capsules/cl-abc/files/list"
).respond(200, json={"entries": []})
cap.list_dir("/home/user", depth=3)
body = json.loads(route.calls[0].request.content)
assert body["depth"] == 3
@respx.mock
def test_list_dir_empty(self, client):
cap = _make_capsule(client)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/list").respond(
200, json={"entries": []}
)
entries = cap.list_dir("/empty")
assert entries == []
@respx.mock
def test_list_dir_symlink(self, client):
cap = _make_capsule(client)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/list").respond(
200,
json={
"entries": [
{
"name": "link",
"path": "/home/user/link",
"type": "symlink",
"size": 4,
"mode": 41471,
"permissions": "lrwxrwxrwx",
"owner": "root",
"group": "root",
"modified_at": 1712899000,
"symlink_target": "/bin",
}
]
},
)
entries = cap.list_dir("/home/user")
assert len(entries) == 1
assert entries[0].type == "symlink"
assert entries[0].symlink_target == "/bin"
class TestMkdir:
@respx.mock
def test_mkdir_returns_entry(self, client):
cap = _make_capsule(client)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/mkdir").respond(
200,
json={
"entry": {
"name": "data",
"path": "/home/user/data",
"type": "directory",
"size": 4096,
"mode": 16877,
"permissions": "drwxr-xr-x",
"owner": "root",
"group": "root",
"modified_at": 1712899200,
"symlink_target": None,
}
},
)
entry = cap.mkdir("/home/user/data")
assert isinstance(entry, FileEntry)
assert entry.name == "data"
assert entry.type == "directory"
@respx.mock
def test_mkdir_existing_returns_gracefully(self, client):
cap = _make_capsule(client)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/mkdir").respond(
409,
json={"error": {"code": "conflict", "message": "already exists"}},
)
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/list").respond(
200,
json={
"entries": [
{
"name": "data",
"path": "/home/user/data",
"type": "directory",
"size": 4096,
"mode": 16877,
"permissions": "drwxr-xr-x",
"owner": "root",
"group": "root",
"modified_at": 1712899200,
"symlink_target": None,
}
]
},
)
entry = cap.mkdir("/home/user/data")
assert entry.name == "data"
class TestRemove:
@respx.mock
def test_remove_succeeds(self, client):
cap = _make_capsule(client)
route = respx.post(
"https://api.wrenn.dev/v1/capsules/cl-abc/files/remove"
).respond(204)
cap.remove("/home/user/old_data")
assert route.called
@respx.mock
def test_remove_sends_path(self, client):
cap = _make_capsule(client)
route = respx.post(
"https://api.wrenn.dev/v1/capsules/cl-abc/files/remove"
).respond(204)
cap.remove("/tmp/test.txt")
body = json.loads(route.calls[0].request.content)
assert body["path"] == "/tmp/test.txt"
class TestUpload:
@respx.mock
def test_upload_sends_multipart(self, client):
cap = _make_capsule(client)
route = respx.post(
"https://api.wrenn.dev/v1/capsules/cl-abc/files/write"
).respond(204)
cap.upload("/app/main.py", b"print('hello')")
assert route.called
req = route.calls[0].request
assert b"multipart/form-data" in req.headers.get("content-type", "").encode()
@respx.mock
def test_download_returns_bytes(self, client):
cap = _make_capsule(client)
content = b"file contents here"
respx.post("https://api.wrenn.dev/v1/capsules/cl-abc/files/read").respond(
200, content=content
)
data = cap.download("/app/main.py")
assert data == content
class TestPtyEventParsing:
def test_started_event(self):
raw = {"type": "started", "tag": "pty-a1b2c3d4", "pid": 42}
event = _parse_pty_event(raw)
assert event.type == PtyEventType.started
assert event.pid == 42
assert event.tag == "pty-a1b2c3d4"
def test_output_event_base64(self):
encoded = base64.b64encode(b"ls -la\n").decode()
raw = {"type": "output", "data": encoded}
event = _parse_pty_event(raw)
assert event.type == PtyEventType.output
assert event.data == b"ls -la\n"
def test_output_event_empty(self):
raw = {"type": "output", "data": ""}
event = _parse_pty_event(raw)
assert event.data == b""
def test_exit_event(self):
raw = {"type": "exit", "exit_code": 0}
event = _parse_pty_event(raw)
assert event.type == PtyEventType.exit
assert event.exit_code == 0
def test_error_event(self):
raw = {"type": "error", "data": "process not found", "fatal": True}
event = _parse_pty_event(raw)
assert event.type == PtyEventType.error
assert event.data == "process not found"
assert event.fatal is True
def test_error_event_non_fatal(self):
raw = {"type": "error", "data": "something", "fatal": False}
event = _parse_pty_event(raw)
assert event.fatal is False
def test_ping_event(self):
raw = {"type": "ping"}
event = _parse_pty_event(raw)
assert event.type == PtyEventType.ping
class TestPtySessionWrite:
def test_write_sends_base64_input(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session.write(b"ls -la\n")
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "input"
assert base64.b64decode(sent["data"]) == b"ls -la\n"
class TestPtySessionResize:
def test_resize_sends_dimensions(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session.resize(120, 40)
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "resize"
assert sent["cols"] == 120
assert sent["rows"] == 40
def test_resize_zero_raises(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
with pytest.raises(ValueError, match="greater than 0"):
session.resize(0, 40)
with pytest.raises(ValueError, match="greater than 0"):
session.resize(80, 0)
class TestPtySessionKill:
def test_kill_sends_message(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session.kill()
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "kill"
class TestPtySessionIteration:
def test_iter_yields_events_until_exit(self):
ws = MagicMock()
messages = [
json.dumps({"type": "started", "tag": "pty-abc12345", "pid": 1}),
json.dumps({"type": "output", "data": base64.b64encode(b"hello").decode()}),
json.dumps({"type": "exit", "exit_code": 0}),
]
ws.receive_text.side_effect = messages
session = PtySession(ws, "cl-abc")
events = list(session)
assert len(events) == 2
assert events[0].type == PtyEventType.started
assert session.tag == "pty-abc12345"
assert session.pid == 1
assert events[1].type == PtyEventType.output
assert events[1].data == b"hello"
def test_iter_stops_on_fatal_error(self):
ws = MagicMock()
messages = [
json.dumps({"type": "error", "data": "fatal", "fatal": True}),
]
ws.receive_text.side_effect = messages
session = PtySession(ws, "cl-abc")
events = list(session)
assert len(events) == 1
assert events[0].type == PtyEventType.error
def test_iter_stops_on_disconnect(self):
import httpx_ws
ws = MagicMock()
ws.receive_text.side_effect = httpx_ws.WebSocketDisconnect()
session = PtySession(ws, "cl-abc")
events = list(session)
assert events == []
class TestPtySessionContextManager:
def test_exit_kills_and_closes(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
with session:
pass
ws.send_text.assert_called()
ws.close.assert_called()
def test_exit_ignores_errors(self):
ws = MagicMock()
ws.send_text.side_effect = Exception("already closed")
session = PtySession(ws, "cl-abc")
with session:
pass
class TestPtySessionSendStart:
def test_send_start_with_defaults(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session._send_start()
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "start"
assert sent["cmd"] == "/bin/bash"
assert sent["cols"] == 80
assert sent["rows"] == 24
def test_send_start_with_all_params(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session._send_start(
cmd="/bin/zsh",
args=["-l"],
cols=120,
rows=40,
envs={"TERM": "xterm-256color"},
cwd="/home/user",
)
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["cmd"] == "/bin/zsh"
assert sent["args"] == ["-l"]
assert sent["cols"] == 120
assert sent["rows"] == 40
assert sent["envs"] == {"TERM": "xterm-256color"}
assert sent["cwd"] == "/home/user"
class TestPtySessionSendConnect:
def test_send_connect(self):
ws = MagicMock()
session = PtySession(ws, "cl-abc")
session._send_connect("pty-abc12345")
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "connect"
assert sent["tag"] == "pty-abc12345"
class TestAsyncPtySession:
@pytest.mark.asyncio
async def test_async_write_sends_base64(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
await session.write(b"hello")
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "input"
assert base64.b64decode(sent["data"]) == b"hello"
@pytest.mark.asyncio
async def test_async_resize(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
await session.resize(100, 30)
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "resize"
assert sent["cols"] == 100
assert sent["rows"] == 30
@pytest.mark.asyncio
async def test_async_resize_zero_raises(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
with pytest.raises(ValueError):
await session.resize(0, 10)
@pytest.mark.asyncio
async def test_async_kill(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
await session.kill()
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "kill"
@pytest.mark.asyncio
async def test_async_context_manager(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
async with session:
pass
ws.send_text.assert_called()
ws.close.assert_called()
@pytest.mark.asyncio
async def test_async_send_start(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
await session._send_start(cmd="/bin/zsh", cols=100, rows=30)
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "start"
assert sent["cmd"] == "/bin/zsh"
assert sent["cols"] == 100
assert sent["rows"] == 30
@pytest.mark.asyncio
async def test_async_send_connect(self):
ws = AsyncMock()
session = AsyncPtySession(ws, "cl-abc")
await session._send_connect("pty-abc12345")
sent = json.loads(ws.send_text.call_args[0][0])
assert sent["type"] == "connect"
assert sent["tag"] == "pty-abc12345"
@pytest.mark.asyncio
async def test_async_iteration(self):
ws = AsyncMock()
messages = [
json.dumps({"type": "started", "tag": "pty-xyz", "pid": 5}),
json.dumps({"type": "output", "data": base64.b64encode(b"hi").decode()}),
json.dumps({"type": "exit", "exit_code": 0}),
]
ws.receive_text.side_effect = messages
session = AsyncPtySession(ws, "cl-abc")
events = []
async for event in session:
events.append(event)
assert len(events) == 2
assert events[0].type == PtyEventType.started
assert session.tag == "pty-xyz"
assert session.pid == 5
class TestExports:
def test_file_entry_importable(self):
from wrenn import FileEntry as FE
assert FE is not None
def test_pty_session_importable(self):
from wrenn import PtySession as PS
assert PS is not None
def test_async_pty_session_importable(self):
from wrenn import AsyncPtySession as APS
assert APS is not None
def test_pty_event_importable(self):
from wrenn import PtyEvent as PE
from wrenn import PtyEventType as PET
assert PE is not None
assert PET is not None

67
uv.lock generated
View File

@ -112,6 +112,28 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/3a/7f169ffc7a2d69a4f9158b1ac083f685b7f4a1a8a1db5d1e4abbb4e741b7/datamodel_code_generator-0.56.0-py3-none-any.whl", hash = "sha256:a0559683fbe90cdf2ce9b6637e3adae3e3a8056a8d0516df581d486e2834ead2", size = 256545, upload-time = "2026-04-04T09:46:17.582Z" },
]
[[package]]
name = "dnspython"
version = "2.8.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8c/8b/57666417c0f90f08bcafa776861060426765fdb422eb10212086fb811d26/dnspython-2.8.0.tar.gz", hash = "sha256:181d3c6996452cb1189c4046c61599b84a5a86e099562ffde77d26984ff26d0f", size = 368251, upload-time = "2025-09-07T18:58:00.022Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ba/5a/18ad964b0086c6e62e2e7500f7edc89e3faa45033c71c1893d34eed2b2de/dnspython-2.8.0-py3-none-any.whl", hash = "sha256:01d9bbc4a2d76bf0db7c1f729812ded6d912bd318d3b1cf81d30c0f845dbf3af", size = 331094, upload-time = "2025-09-07T18:57:58.071Z" },
]
[[package]]
name = "email-validator"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "dnspython" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f5/22/900cb125c76b7aaa450ce02fd727f452243f2e91a61af068b40adba60ea9/email_validator-2.3.0.tar.gz", hash = "sha256:9fc05c37f2f6cf439ff414f8fc46d917929974a82244c20eb10231ba60c54426", size = 51238, upload-time = "2025-08-26T13:09:06.831Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/de/15/545e2b6cf2e3be84bc1ed85613edd75b8aea69807a71c26f4ca6a9258e82/email_validator-2.3.0-py3-none-any.whl", hash = "sha256:80f13f623413e6b197ae73bb10bf4eb0908faf509ad8362c5edeb0be7fd450b4", size = 35604, upload-time = "2025-08-26T13:09:05.858Z" },
]
[[package]]
name = "genson"
version = "1.3.0"
@ -158,6 +180,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "httpx-ws"
version = "0.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "httpcore" },
{ name = "httpx" },
{ name = "wsproto" },
]
sdist = { url = "https://files.pythonhosted.org/packages/cd/cd/ca91a07ae446451f7476bf3fcc909e98cb942ff032ebfda0e3fe449aca7b/httpx_ws-0.9.0.tar.gz", hash = "sha256:797373326f70eec1ae96f6e43ae9f12002fd7d73aee139a4985eaab964338a08", size = 107105, upload-time = "2026-03-28T14:11:10.781Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/f8/a6bc80313a9e93c888fa10534dfce2ad76ff86911b6f485777ce6de6a073/httpx_ws-0.9.0-py3-none-any.whl", hash = "sha256:71640d2fb1bf9a225775015b33cd755cfd4c5f7e21c885192fe3adc4c387b248", size = 15759, upload-time = "2026-03-28T14:11:11.887Z" },
]
[[package]]
name = "idna"
version = "3.11"
@ -564,6 +601,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
]
[[package]]
name = "respx"
version = "0.23.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
]
sdist = { url = "https://files.pythonhosted.org/packages/43/98/4e55c9c486404ec12373708d015ebce157966965a5ebe7f28ff2c784d41b/respx-0.23.1.tar.gz", hash = "sha256:242dcc6ce6b5b9bf621f5870c82a63997e8e82bc7c947f9ffe272b8f3dd5a780", size = 29243, upload-time = "2026-04-08T14:37:16.008Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1d/4a/221da6ca167db45693d8d26c7dc79ccfc978a440251bf6721c9aaf251ac0/respx-0.23.1-py2.py3-none-any.whl", hash = "sha256:b18004b029935384bccfa6d7d9d74b4ec9af73a081cc28600fffc0447f4b8c1a", size = 25557, upload-time = "2026-04-08T14:37:14.613Z" },
]
[[package]]
name = "ruff"
version = "0.15.10"
@ -627,7 +676,9 @@ name = "wrenn"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "email-validator" },
{ name = "httpx" },
{ name = "httpx-ws" },
{ name = "pydantic" },
]
@ -637,12 +688,15 @@ dev = [
{ name = "mypy" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "respx" },
{ name = "ruff" },
]
[package.metadata]
requires-dist = [
{ name = "email-validator", specifier = ">=2.3.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "httpx-ws", specifier = ">=0.9.0" },
{ name = "pydantic", specifier = ">=2.12.5" },
]
@ -652,5 +706,18 @@ dev = [
{ name = "mypy", specifier = ">=1.20.0" },
{ name = "pytest", specifier = ">=9.0.3" },
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
{ name = "respx", specifier = ">=0.23.1" },
{ name = "ruff", specifier = ">=0.15.10" },
]
[[package]]
name = "wsproto"
version = "1.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c7/79/12135bdf8b9c9367b8701c2c19a14c913c120b882d50b014ca0d38083c2c/wsproto-1.3.2.tar.gz", hash = "sha256:b86885dcf294e15204919950f666e06ffc6c7c114ca900b060d6e16293528294", size = 50116, upload-time = "2025-11-20T18:18:01.871Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" },
]