41 Commits

Author SHA1 Message Date
fce514c49c test: expand command/PTY/git coverage, fix WebSocket close handling
Some checks failed
ci/woodpecker/pr/check Pipeline failed
Tests:
- tests/test_commands.py: unit coverage for Commands/AsyncCommands —
  payload construction (cwd, envs, tag, timeout), background dispatch,
  base64 response decoding, stream-event parsing, stream/connect iterators.
- tests/test_integration_advanced.py: live tests for cwd/env handling,
  long-running commands (apt-get), PTY sessions, streaming exec,
  process connect, and git workflows including cloning wrennhq/wrenn.
- test_filesystem_pty.py: PTY ping/pong reply tests.
- test_integration.py: poll for async process-registry prune in
  test_kill_process instead of asserting on a zero-delay list().

Fixes:
- commands.py / pty.py: stream(), connect() and the PTY iterators only
  caught WebSocketDisconnect. The server closes exec/process streams
  abruptly, raising WebSocketNetworkError — a sibling under
  HTTPXWSException — which crashed connect() entirely. Both are now
  caught via _WS_CLOSED so abrupt closes end iteration cleanly.
- pty.py: reply to the server keepalive ping with a pong so idle PTY
  sessions stay open.
2026-05-19 17:12:52 +06:00
87cc16e9e2 chore: merge origin/dev, bump version to 0.1.4
Resolve conflicts in api/openapi.yaml and src/wrenn/models/_generated.py
by keeping the fix/0.2-compatibility versions (v0.2 API is authoritative).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 15:25:22 +06:00
08f6a1ab84 Merge branch 'main' of git.omukk.dev:wrenn/python-sdk into dev
All checks were successful
ci/woodpecker/pr/check Pipeline was successful
2026-05-19 15:22:46 +06:00
51c6987515 fix: sync SDK with v0.2 API, add wait kwargs to lifecycle ops
- Drop AuthResponse from models __init__ (renamed SessionResponse server-side; SDK auths via API key, doesn't need either)
- Regenerate models from updated 0.2 openapi spec
- Add wait: bool = False kwarg to Capsule/AsyncCapsule destroy/pause/resume (instance + _static_*); 500ms poll for resume/destroy, 2s for pause
- Unify polling into _poll_until / _apoll_until + _wait_for_status helper; remove duplicated _POLL_INTERVALS tables
- wait_ready: drop implicit paused->resume side effect; treat missing as fail
- Capsule.connect: handle transient pausing (wait for paused first) before resuming, fixes hang when caller pauses then connects immediately
- Drop dead "if self._id is None" branch in Capsule.__init__ after assigning from already-truthy _capsule_id
- files.make_dir: detect already_exists across 409/wrapped error messages via shared _is_already_exists helper
- tests/test_integration.py: assertions on final lifecycle state use wait=True
2026-05-19 15:06:49 +06:00
e057ec2407 Merge branch 'main' into dev
All checks were successful
ci/woodpecker/pr/check Pipeline was successful
2026-05-19 07:10:17 +00:00
e5e4e1a85b fix: update SDK for v0.2.0 API compatibility
Some checks failed
ci/woodpecker/pr/check Pipeline failed
Sync OpenAPI spec to v0.2.0, fix type annotation shadowing by using
builtins.list in annotated signatures, guard poll interval lookup
against None status, and reorder capsule ID assignment to validate
before storing.
2026-05-16 17:57:20 +06:00
6112c71abc test: make process kill integration test resilient
Some checks failed
ci/woodpecker/pr/check Pipeline failed
2026-05-16 17:02:25 +06:00
d9c028564e Merge branch 'bugfix/timeout-related-issues' into dev
Some checks failed
ci/woodpecker/pr/check Pipeline failed
2026-05-02 21:53:33 +06:00
06b4a8cbcb Merge issues fixed
All checks were successful
ci/woodpecker/pr/check Pipeline was successful
2026-05-02 21:46:16 +06:00
04e5dc652f Fix error handling, resource leaks, and logic bugs across the SDK
Bugs fixed:
- files.py: use typed error checking (_raise_for_status) instead of raw
  raise_for_status(), ensuring WrennNotFoundError etc. are raised
  correctly
- exceptions.py: check both "capsule_ids" and "sandbox_ids" response
  keys
  for backwards compatibility
- code_interpreter: retry _ensure_kernel on 5xx errors (only fail on
  4xx),
  remove redundant TimeoutError in bare except, clean up non-standard
  top-level msg_id/msg_type from Jupyter messages

Resource leaks fixed:
- capsule.py: close WrennClient if capsule creation or init fails
- code_interpreter: add close()/__del__ for _proxy_client cleanup when
  not using context manager

Logic fixes:
- pty.py: yield exit events to callers instead of silently discarding
  them
- capsule.py: auto-resume paused capsules in wait_ready instead of
  failing
- capsule.py: log warnings on destroy failure in __exit__ instead of
  silently swallowing errors
2026-05-02 21:34:02 +06:00
4a7db8e204 fix: set httpx read timeout for long-running commands and handle
non-JSON error responses
- Set per-request httpx timeout (command timeout + 10s buffer) in
  Commands.run() and AsyncCommands.run() for foreground exec calls,
  preventing HTTP read timeouts on long-running commands
- Raise WrennInternalError instead of raw httpx.HTTPStatusError when
  handle_response() encounters a non-JSON error body (e.g. 502 from
  a reverse proxy)
2026-05-02 19:02:39 +06:00
a76be96682 Merge branch 'main' of git.omukk.dev:wrenn/python-sdk into dev 2026-05-02 05:07:13 +06:00
dc66ac24d5 Updated woodpecker def
All checks were successful
ci/woodpecker/pr/check Pipeline was successful
2026-05-02 04:50:11 +06:00
b5e2b12ef1 Version bump and other minor changes 2026-05-02 04:45:05 +06:00
213af4aee7 Increased timeout for long running API calls and updated typehints 2026-05-02 04:44:26 +06:00
aa9477ffe8 Added doc generator for SDK
All checks were successful
ci/woodpecker/push/check Pipeline was successful
2026-04-24 00:01:20 +06:00
2bb3dbd71d Merge branch 'main' of git.omukk.dev:wrenn/python-sdk into dev 2026-04-23 23:53:15 +06:00
3f26a2fbcf Merge branch 'main' into dev
Some checks failed
ci/woodpecker/push/check Pipeline was canceled
2026-04-23 12:38:41 +00:00
2faf0dd0ae Updated woodpecker config
All checks were successful
ci/woodpecker/push/check Pipeline was successful
2026-04-23 18:36:35 +06:00
68c7d0de42 ci: add test pipeline, PyPI release workflow, and lint fixes
- Update Woodpecker to run unit and integration tests in parallel
- Add GitHub Actions workflow for PyPI trusted publishing on main
- Add license, classifiers, keywords, and URLs to pyproject.toml
- Fix ruff lint errors (unused imports, duplicate class name) and formatting
2026-04-23 18:32:59 +06:00
ad64c85393 Merge pull request 'Feat: Added git support' (#5) from feat/git-support into dev
Some checks failed
ci/woodpecker/push/check Pipeline failed
Reviewed-on: #5
2026-04-22 23:45:36 +00:00
bab53aedbe Updated readme 2026-04-23 05:44:49 +06:00
82e181dd7e test: add integration tests for capsule lifecycle, commands, files, and git
43 tests across 4 classes hitting the live API. Shared capsule per class
to minimize VM boot overhead. All capsules destroyed in teardown.
Skips automatically when WRENN_API_KEY is not available.
2026-04-23 05:40:06 +06:00
ee1f55635f fix: wrap commands in /bin/sh -c for proper server-side argv expansion
The server-side agent runs commands through a nice wrapper that uses
"${@}" expansion. Sending the full command string as a single cmd field
caused nice to treat it as one executable name. Now Commands.run sends
cmd=/bin/sh args=["-c", cmd_string] so "${@}" expands into proper argv.
2026-04-23 05:16:08 +06:00
6bdf28e2ae Added git integration 2026-04-23 04:46:57 +06:00
61bc040098 Minor patches
Some checks failed
ci/woodpecker/push/check Pipeline failed
2026-04-23 02:31:47 +06:00
7b35ffb60c docs: add Google-style docstrings to all public SDK methods
Some checks failed
ci/woodpecker/push/check Pipeline failed
2026-04-17 04:29:34 +06:00
42bcc792d6 Updated dependency
Some checks failed
ci/woodpecker/push/check Pipeline failed
2026-04-17 03:29:45 +06:00
3f97c73b2f feat: redesign code interpreter with structured Execution model
Some checks failed
ci/woodpecker/push/check Pipeline failed
Replace flat CodeResult with a proper model hierarchy: Execution
(top-level), Result (per-output with typed MIME fields), Logs
(stdout/stderr as lists), and ExecutionError (structured
name/value/traceback). Handle display_data messages for rich output,
add streaming callbacks (on_result, on_stdout, on_stderr, on_error),
and remove the misleading stdout-to-text fallback.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 03:16:39 +06:00
7e7ecbd48a Merge pull request 'feat: implement client architecture and sandbox environment' (#3) from feat/client-and-sandbox-support into dev
Some checks failed
ci/woodpecker/push/check Pipeline failed
Reviewed-on: #3
2026-04-15 15:35:40 +00:00
7b9a06d1b5 chore: add python-dotenv dependency
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 21:33:53 +06:00
3d0eda5c60 feat: rename kill to destroy, improve code interpreter, update README
- Rename Capsule.kill/AsyncCapsule.kill to destroy for frontend consistency
- Add Sandbox deprecation alias to wrenn.code_interpreter module
- run_code text falls back to stripped stdout when no expression result
- Strip quotes from string expression results (matching e2b behavior)
- _ensure_kernel reuses existing Jupyter kernels before creating new ones
- Rewrite README with complete examples for capsules and code interpreter
- Remove stale AGENTS.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 18:58:59 +06:00
eecf1dc65b chore: update OpenAPI schema, generated models, and build config
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:31:07 +06:00
3cced768a4 feat: redesign SDK with e2b-compatible interface
Replace the WrennClient-centric API with a top-level Capsule class that
mirrors e2b's Sandbox interface, enabling drop-in migration. Key changes:

- Capsule/AsyncCapsule with direct construction (reads WRENN_API_KEY and
  WRENN_BASE_URL env vars), namespaced sub-objects (capsule.commands,
  capsule.files), dual instance/static lifecycle methods via _DualMethod
  descriptor (capsule.kill() and Capsule.kill(id))
- WrennClient simplified to API-key-only endpoints (capsules, snapshots);
  JWT-based resources (auth, hosts, teams) removed
- wrenn.code_interpreter submodule with Capsule subclass defaulting to
  code-runner-beta template and run_code() support
- Sandbox alias emits FutureWarning instead of DeprecationWarning

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 15:19:23 +06:00
0ac9bf79ee feat: created README 2026-04-13 03:16:44 +06:00
bf5914c0a8 fix: renamed sandbox to capsule 2026-04-13 03:16:27 +06:00
976af9a209 ci: woodpecker doesn't support variable expansions outside of commands 2026-04-12 03:08:34 +06:00
f3fd6865f9 ci: bug fixes 2026-04-12 03:03:33 +06:00
340ed46df6 CI for linting and testing 2026-04-12 02:51:14 +06:00
a5bf66c199 feat: add sandbox filesystem and terminal support
Add sandbox filesystem methods (list_dir, mkdir, remove, upload,
download, stream_upload, stream_download) and interactive PTY sessions
(PtySession, AsyncPtySession) with reconnect support per
FILE_TERMINAL.md spec. Refactor error handling into exceptions.py as
shared handle_response(). Replace API-key-only proxy auth with unified
_proxy_headers() supporting both API key and JWT. Fix stream_upload to
build multipart manually instead of relying on httpx files= with
generators. Switch Makefile SPEC_URL from main to dev branch. Regenerate
models from updated OpenAPI spec (adds teams, channels, metrics, PTY
endpoints). Add comprehensive unit and integration tests. Trim AGENTS.md
to verified facts only.
2026-04-12 02:35:20 +06:00
f51a962fff feat: implement client architecture and sandbox environment
Introduces the core Wrenn client and a dedicated sandbox execution
environment. This includes automated model generation and a custom
exception hierarchy to support robust integration.

- Add `WrennClient` in `src/wrenn/client.py` for API interaction.
- Implement `Sandbox` in `src/wrenn/sandbox.py` for isolated execution.
- Add Pydantic/model support via `_generated.py`.
- Define project-specific error types in `exceptions.py`.
- Include AGENTS.md documentation for specialized logic.
- Add comprehensive unit and integration tests.
- Update build system (Makefile, uv.lock, pyproject.toml) and LICENSE.
2026-04-10 22:24:50 +06:00
22 changed files with 2814 additions and 357 deletions

1
.gitignore vendored
View File

@ -181,3 +181,4 @@ CODE_EXECUTION.md
.code-review-graph/
.claude
.mcp.json
AGENTS.md

View File

@ -1,56 +0,0 @@
# AGENTS.md
## Project
Wrenn Python SDK — a client library for the Wrenn microVM platform. e2b drop-in replacement.
Package name: `wrenn`. Python 3.13+, managed with [uv](https://docs.astral.sh/uv/).
## Commands
```bash
uv sync # install deps
make lint # ruff check + format check (no auto-fix)
make test # unit tests only (tests/test_client.py)
make test-integration # all tests including integration (needs live server)
make generate # regenerate models from OpenAPI spec (fetches from remote)
make check # lint + unit test
```
- `make test` only runs `tests/test_client.py`, not all unit tests. To run a specific test file: `uv run pytest tests/test_capsule_features.py -v`
- No typecheck step in Makefile or CI. `mypy` is a dev dependency but not wired up — do not assume it runs.
## Architecture
- `src/wrenn/` — the library package
- `capsule.py` / `async_capsule.py` — high-level `Capsule` / `AsyncCapsule` (main user-facing classes)
- `client.py` — low-level `WrennClient` / `AsyncWrennClient`
- `commands.py` — command execution and streaming
- `files.py` — filesystem operations
- `pty.py` — interactive terminal (PTY) over WebSocket
- `exceptions.py` — typed error hierarchy (`WrennError` base)
- `models/_generated.py`**auto-generated** from OpenAPI spec via `datamodel-codegen` (never edit directly; run `make generate`)
- `sandbox.py` — deprecated `Sandbox` alias for `Capsule`
- `code_interpreter/` — specialized capsule for stateful Jupyter kernel execution
- `tests/` — unit tests use `respx` to mock `httpx`; integration tests are in `tests/integration/`
- `api/openapi.yaml` — downloaded OpenAPI spec used for code generation
## Key Conventions
- Generated code lives in `src/wrenn/models/_generated.py`. Never edit it. Run `make generate` to update.
- `Sandbox` is a deprecated alias for `Capsule`. New code should use `Capsule` / `AsyncCapsule`.
- Dual sync/async API: every major class has an `Async` counterpart.
- Uses `httpx` for HTTP, `httpx-ws` for WebSockets, `pydantic` for models.
- `__init__.py` uses `__getattr__` for lazy deprecated aliases (`Sandbox`, `WrennHostHasSandboxesError`).
## Testing
- Unit tests mock HTTP via `respx` (httpx mocking library).
- Integration tests require env vars: `WRENN_API_KEY` (or `WRENN_TOKEN`), optionally `WRENN_BASE_URL`.
- Integration test fixtures in `tests/integration/conftest.py` create real capsules and clean them up.
- `pytest` marker: `@pytest.mark.integration` for tests needing a live server.
## CI
Woodpecker CI (`.woodpecker/check.yml`) runs on push to `main` and `dev`:
1. `make lint`
2. `make test` (unit tests only — integration tests are not in CI)

File diff suppressed because it is too large Load Diff

View File

@ -1964,15 +1964,17 @@ inactivity TTL is set.
#### wait\_ready
```python
async def wait_ready(timeout: float = 30, interval: float = 0.5) -> None
async def wait_ready(timeout: float = 30) -> None
```
Await until the capsule status is ``running``.
Polling interval adapts to the current transient status:
0.5 s for starting/resuming, 2 s for pausing, 1 s for stopping.
**Arguments**:
- `timeout` _float_ - Maximum seconds to wait. Defaults to ``30``.
- `interval` _float_ - Polling interval in seconds. Defaults to ``0.5``.
**Raises**:
@ -2534,15 +2536,17 @@ inactivity TTL is set.
#### wait\_ready
```python
def wait_ready(timeout: float = 30, interval: float = 0.5) -> None
def wait_ready(timeout: float = 30) -> None
```
Block until the capsule status is ``running``.
Polling interval adapts to the current transient status:
0.5 s for starting/resuming, 2 s for pausing, 1 s for stopping.
**Arguments**:
- `timeout` _float_ - Maximum seconds to wait. Defaults to ``30``.
- `interval` _float_ - Polling interval in seconds. Defaults to ``0.5``.
**Raises**:
@ -2700,17 +2704,6 @@ Create a snapshot template from this capsule's current state.
# wrenn.\_config
<a id="wrenn._config.ConnectionConfig"></a>
## ConnectionConfig Objects
```python
@dataclass(frozen=True)
class ConnectionConfig()
```
Resolved credentials and base URL for Wrenn API calls.
<a id="wrenn._git._auth"></a>
# wrenn.\_git.\_auth

View File

@ -1,6 +1,6 @@
[project]
name = "wrenn"
version = "0.1.3"
version = "0.1.4"
description = "Python SDK for Wrenn"
readme = "README.md"
license = "MIT"

View File

@ -37,7 +37,7 @@ from wrenn.exceptions import (
from wrenn.models import FileEntry
from wrenn.pty import AsyncPtySession, PtyEvent, PtyEventType, PtySession
__version__ = "0.1.0"
__version__ = "0.1.4"
__all__ = [
"__version__",

View File

@ -1,8 +1,8 @@
from __future__ import annotations
import asyncio
import logging
import builtins
import logging
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
@ -10,15 +10,54 @@ from contextlib import asynccontextmanager
import httpx_ws
from wrenn._git import AsyncGit
from wrenn.capsule import _DualMethod, _build_proxy_url
from wrenn.capsule import (
_DEFAULT_WAIT_TIMEOUT,
_DESTROY_INTERVAL,
_FAIL_STATUSES,
_PAUSE_INTERVAL,
_RESUME_INTERVAL,
_START_INTERVAL,
_DualMethod,
_build_proxy_url,
)
from wrenn.client import AsyncWrennClient
from wrenn.commands import AsyncCommands
from wrenn.exceptions import WrennNotFoundError
from wrenn.files import AsyncFiles
from wrenn.models import Capsule as CapsuleModel
from wrenn.models import Status, Template
from wrenn.pty import AsyncPtySession
async def _apoll_until(
fetch,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
fail_on: set[Status] | None = None,
) -> CapsuleModel:
fail = fail_on if fail_on is not None else _FAIL_STATUSES
treat_missing_as_target = Status.missing in targets
deadline = time.monotonic() + timeout
last: CapsuleModel | None = None
while time.monotonic() < deadline:
try:
last = await fetch()
except WrennNotFoundError:
if treat_missing_as_target:
return CapsuleModel(status=Status.missing)
raise
if last.status in targets:
return last
if last.status is not None and last.status in fail:
raise RuntimeError(f"Capsule entered {last.status} state while waiting")
await asyncio.sleep(interval)
raise TimeoutError(
f"Capsule did not reach {targets} within {timeout}s "
f"(last status: {last.status if last else 'unknown'})"
)
class AsyncCapsule:
"""Async Wrenn capsule with e2b-compatible interface.
@ -139,15 +178,21 @@ class AsyncCapsule:
client = AsyncWrennClient(api_key=api_key, base_url=base_url)
info = await client.capsules.get(capsule_id)
if info.status == Status.paused:
info = await client.capsules.resume(capsule_id)
return cls(
capsule = cls(
_capsule_id=capsule_id,
_client=client,
_info=info,
)
if info.status == Status.pausing:
info = await capsule._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
if info.status == Status.paused:
await client.capsules.resume(capsule_id)
if info.status != Status.running:
await capsule.wait_ready()
return capsule
# ── Dual instance/static lifecycle ──────────────────────────
destroy = _DualMethod("_instance_destroy", "_static_destroy")
@ -155,22 +200,35 @@ class AsyncCapsule:
resume = _DualMethod("_instance_resume", "_static_resume")
get_info = _DualMethod("_instance_get_info", "_static_get_info")
async def _instance_destroy(self) -> None:
async def _instance_destroy(self, wait: bool = False) -> None:
await self._client.capsules.destroy(self._id)
if wait:
await self._wait_for_status(
{Status.stopped, Status.missing}, _DESTROY_INTERVAL
)
@classmethod
async def _static_destroy(
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> None:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
await client.capsules.destroy(capsule_id)
if wait:
await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.stopped, Status.missing},
_DESTROY_INTERVAL,
)
async def _instance_pause(self) -> CapsuleModel:
async def _instance_pause(self, wait: bool = False) -> CapsuleModel:
self._info = await self._client.capsules.pause(self._id)
if wait:
self._info = await self._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
return self._info
@classmethod
@ -178,14 +236,24 @@ class AsyncCapsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
return await client.capsules.pause(capsule_id)
info = await client.capsules.pause(capsule_id)
if wait:
info = await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.paused},
_PAUSE_INTERVAL,
)
return info
async def _instance_resume(self) -> CapsuleModel:
async def _instance_resume(self, wait: bool = False) -> CapsuleModel:
self._info = await self._client.capsules.resume(self._id)
if wait:
self._info = await self._wait_for_status({Status.running}, _RESUME_INTERVAL)
return self._info
@classmethod
@ -193,11 +261,19 @@ class AsyncCapsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
return await client.capsules.resume(capsule_id)
info = await client.capsules.resume(capsule_id)
if wait:
info = await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.running},
_RESUME_INTERVAL,
)
return info
async def _instance_get_info(self) -> CapsuleModel:
self._info = await self._client.capsules.get(self._id)
@ -224,31 +300,30 @@ class AsyncCapsule:
"""
await self._client.capsules.ping(self._id)
async def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Await until the capsule status is ``running``.
async def _wait_for_status(
self,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
) -> CapsuleModel:
info = await _apoll_until(
lambda: self._client.capsules.get(self._id),
targets,
interval,
timeout,
fail_on={Status.error, Status.stopped, Status.missing} - targets,
)
self._info = info
return info
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
async def wait_ready(self, timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None:
"""Await until capsule status is ``running``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
TimeoutError: If capsule does not reach ``running`` within ``timeout``.
RuntimeError: If capsule enters error/stopped/missing while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = await self._client.capsules.get(self._id)
if info.status == Status.running:
self._info = info
return
if info.status in (Status.error, Status.stopped):
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
if info.status == Status.paused:
info = await self._client.capsules.resume(self._id)
await asyncio.sleep(interval)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
await self._wait_for_status({Status.running}, _START_INTERVAL, timeout)
async def is_running(self) -> bool:
"""Check whether the capsule is currently running.

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import logging
import builtins
import logging
import time
from collections.abc import Iterator
from contextlib import contextmanager
@ -13,6 +13,7 @@ import httpx_ws
from wrenn._git import Git
from wrenn.client import WrennClient
from wrenn.commands import Commands
from wrenn.exceptions import WrennNotFoundError
from wrenn.files import Files
from wrenn.models import Capsule as CapsuleModel
from wrenn.models import Status, Template
@ -28,6 +29,44 @@ def _build_proxy_url(base_url: str, capsule_id: str | None, port: int) -> str:
return f"{scheme}://{port}-{capsule_id}.{host}"
_RESUME_INTERVAL = 0.5
_DESTROY_INTERVAL = 0.5
_PAUSE_INTERVAL = 2.0
_START_INTERVAL = 0.5
_DEFAULT_WAIT_TIMEOUT = 30.0
_FAIL_STATUSES = {Status.error}
def _poll_until(
fetch,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
fail_on: set[Status] | None = None,
) -> CapsuleModel:
"""Poll ``fetch()`` until status ∈ ``targets``. Raise on ``fail_on``/timeout."""
fail = fail_on if fail_on is not None else _FAIL_STATUSES
treat_missing_as_target = Status.missing in targets
deadline = time.monotonic() + timeout
last: CapsuleModel | None = None
while time.monotonic() < deadline:
try:
last = fetch()
except WrennNotFoundError:
if treat_missing_as_target:
return CapsuleModel(status=Status.missing)
raise
if last.status in targets:
return last
if last.status is not None and last.status in fail:
raise RuntimeError(f"Capsule entered {last.status} state while waiting")
time.sleep(interval)
raise TimeoutError(
f"Capsule did not reach {targets} within {timeout}s "
f"(last status: {last.status if last else 'unknown'})"
)
class _DualMethod:
"""Descriptor that dispatches to instance method or classmethod depending on call site."""
@ -100,9 +139,6 @@ class Capsule:
self._id: str = _capsule_id
self._client = _client
self._info = _info
if self._id is None:
self._client.close()
raise RuntimeError("API returned a capsule without an ID")
else:
self._client = WrennClient(api_key=api_key, base_url=base_url)
try:
@ -112,9 +148,9 @@ class Capsule:
memory_mb=memory_mb,
timeout_sec=timeout,
)
self._id = self._info.id
if self._id is None:
if self._info.id is None:
raise RuntimeError("API returned a capsule without an ID")
self._id = self._info.id
except Exception:
self._client.close()
raise
@ -213,15 +249,21 @@ class Capsule:
client = WrennClient(api_key=api_key, base_url=base_url)
info = client.capsules.get(capsule_id)
if info.status == Status.paused:
info = client.capsules.resume(capsule_id)
return cls(
capsule = cls(
_capsule_id=capsule_id,
_client=client,
_info=info,
)
if info.status == Status.pausing:
info = capsule._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
if info.status == Status.paused:
client.capsules.resume(capsule_id)
if info.status != Status.running:
capsule.wait_ready()
return capsule
# ── Dual instance/static lifecycle ──────────────────────────
destroy = _DualMethod("_instance_destroy", "_static_destroy")
@ -229,25 +271,36 @@ class Capsule:
resume = _DualMethod("_instance_resume", "_static_resume")
get_info = _DualMethod("_instance_get_info", "_static_get_info")
def _instance_destroy(self) -> None:
"""Destroy this capsule."""
def _instance_destroy(self, wait: bool = False) -> None:
"""Destroy this capsule. If ``wait``, poll until stopped/missing."""
self._client.capsules.destroy(self._id)
if wait:
self._wait_for_status({Status.stopped, Status.missing}, _DESTROY_INTERVAL)
@classmethod
def _static_destroy(
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> None:
"""Destroy a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
client.capsules.destroy(capsule_id)
if wait:
_poll_until(
lambda: client.capsules.get(capsule_id),
{Status.stopped, Status.missing},
_DESTROY_INTERVAL,
)
def _instance_pause(self) -> CapsuleModel:
"""Pause this capsule."""
def _instance_pause(self, wait: bool = False) -> CapsuleModel:
"""Pause this capsule. If ``wait``, poll until ``paused``."""
self._info = self._client.capsules.pause(self._id)
if wait:
self._info = self._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
return self._info
@classmethod
@ -255,16 +308,26 @@ class Capsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
"""Pause a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
return client.capsules.pause(capsule_id)
info = client.capsules.pause(capsule_id)
if wait:
info = _poll_until(
lambda: client.capsules.get(capsule_id),
{Status.paused},
_PAUSE_INTERVAL,
)
return info
def _instance_resume(self) -> CapsuleModel:
"""Resume this capsule."""
def _instance_resume(self, wait: bool = False) -> CapsuleModel:
"""Resume this capsule. If ``wait``, poll until ``running``."""
self._info = self._client.capsules.resume(self._id)
if wait:
self._info = self._wait_for_status({Status.running}, _RESUME_INTERVAL)
return self._info
@classmethod
@ -272,12 +335,20 @@ class Capsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
"""Resume a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
return client.capsules.resume(capsule_id)
info = client.capsules.resume(capsule_id)
if wait:
info = _poll_until(
lambda: client.capsules.get(capsule_id),
{Status.running},
_RESUME_INTERVAL,
)
return info
def _instance_get_info(self) -> CapsuleModel:
"""Get current info for this capsule."""
@ -306,31 +377,30 @@ class Capsule:
"""
self._client.capsules.ping(self._id)
def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Block until the capsule status is ``running``.
def _wait_for_status(
self,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
) -> CapsuleModel:
info = _poll_until(
lambda: self._client.capsules.get(self._id),
targets,
interval,
timeout,
fail_on={Status.error, Status.stopped, Status.missing} - targets,
)
self._info = info
return info
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
def wait_ready(self, timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None:
"""Block until capsule status is ``running``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
TimeoutError: If capsule does not reach ``running`` within ``timeout``.
RuntimeError: If capsule enters error/stopped/missing while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = self._client.capsules.get(self._id)
if info.status == Status.running:
self._info = info
return
if info.status in (Status.error, Status.stopped):
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
if info.status == Status.paused:
info = self._client.capsules.resume(self._id)
time.sleep(interval)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
self._wait_for_status({Status.running}, _START_INTERVAL, timeout)
def is_running(self) -> bool:
"""Check whether the capsule is currently running.

View File

@ -111,7 +111,7 @@ class CapsulesResource:
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.post(f"/v1/capsules/{id}/pause", timeout=_LONG_TIMEOUT)
resp = self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
def resume(self, id: str) -> CapsuleModel:
@ -227,7 +227,7 @@ class AsyncCapsulesResource:
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.post(f"/v1/capsules/{id}/pause", timeout=_LONG_TIMEOUT)
resp = await self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
async def resume(self, id: str) -> CapsuleModel:

View File

@ -12,6 +12,11 @@ import httpx_ws
from wrenn.exceptions import handle_response
# Both signal a terminated WebSocket: ``WebSocketDisconnect`` is a clean close,
# ``WebSocketNetworkError`` an abrupt one. The Wrenn server closes exec/process
# streams abruptly, so iterators must treat either as end-of-stream.
_WS_CLOSED = (httpx_ws.WebSocketDisconnect, httpx_ws.WebSocketNetworkError)
@dataclass
class CommandResult:
@ -271,7 +276,7 @@ class Commands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
break
def stream(
@ -306,7 +311,7 @@ class Commands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
break
@ -462,7 +467,7 @@ class AsyncCommands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
pass
async def stream(
@ -497,5 +502,5 @@ class AsyncCommands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
pass

View File

@ -150,6 +150,9 @@ def handle_response(resp: httpx.Response) -> dict | list:
if resp.status_code == 204:
return {}
if not resp.content:
return {}
return resp.json()

View File

@ -9,6 +9,36 @@ from wrenn.exceptions import WrennNotFoundError, _raise_for_status, handle_respo
from wrenn.models import FileEntry, ListDirResponse, MakeDirResponse
def _is_already_exists(resp: httpx.Response) -> bool:
"""Detect server's already-exists reply across status codes / code strings.
Server may return 409 with code "conflict"/"already_exists" or wrap
"already_exists" inside an "internal" 500 message.
"""
if resp.status_code < 400:
return False
try:
body = resp.json()
except Exception:
return False
err = body.get("error", {}) if isinstance(body, dict) else {}
code = err.get("code", "")
msg = err.get("message", "") or ""
return code in {"conflict", "already_exists"} or "already_exists" in msg
def _find_entry(list_fn, path: str) -> FileEntry | None:
parent = os.path.dirname(path)
name = os.path.basename(path)
try:
for entry in list_fn(parent, depth=1):
if entry.name == name:
return entry
except WrennNotFoundError:
return None
return None
class Files:
"""Sync filesystem interface. Accessed via ``capsule.files``."""
@ -118,17 +148,10 @@ class Files:
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
)
if resp.status_code == 409:
try:
body = resp.json()
if body.get("error", {}).get("code") == "conflict":
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in self.list(parent, depth=1):
if entry.name == name:
return entry
except Exception:
pass
if _is_already_exists(resp):
existing = _find_entry(self.list, path)
if existing is not None:
return existing
parsed = MakeDirResponse.model_validate(handle_response(resp))
if parsed.entry is None:
raise RuntimeError("mkdir response missing entry")
@ -315,17 +338,12 @@ class AsyncFiles:
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
)
if resp.status_code == 409:
try:
body = resp.json()
if body.get("error", {}).get("code") == "conflict":
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in await self.list(parent, depth=1):
if entry.name == name:
return entry
except Exception:
pass
if _is_already_exists(resp):
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in await self.list(parent, depth=1):
if entry.name == name:
return entry
parsed = MakeDirResponse.model_validate(handle_response(resp))
if parsed.entry is None:
raise RuntimeError("mkdir response missing entry")

View File

@ -1,6 +1,5 @@
from wrenn.models._generated import (
APIKeyResponse,
AuthResponse,
Capsule,
CreateAPIKeyRequest,
CreateCapsuleRequest,
@ -34,7 +33,6 @@ from wrenn.models._generated import (
__all__ = [
"APIKeyResponse",
"AuthResponse",
"CreateAPIKeyRequest",
"CreateHostRequest",
"CreateHostResponse",

View File

@ -1,10 +1,10 @@
# generated by datamodel-codegen:
# filename: openapi.yaml
# timestamp: 2026-05-04T20:57:00+00:00
# timestamp: 2026-05-19T08:54:50+00:00
from __future__ import annotations
from pydantic import AwareDatetime, BaseModel, EmailStr, Field
from typing import Annotated
from typing import Annotated, Any
from datetime import date as date_aliased
from enum import StrEnum
@ -27,14 +27,20 @@ class SignupResponse(BaseModel):
] = None
class AuthResponse(BaseModel):
token: Annotated[str | None, Field(description="JWT token (valid for 6 hours)")] = (
None
)
class SessionResponse(BaseModel):
"""
Returned by login, activate, and switch-team. The actual auth credential
is the wrenn_sid cookie set on the response. The body carries identity
data the SPA needs to bootstrap.
"""
user_id: str | None = None
team_id: str | None = None
email: str | None = None
name: str | None = None
role: str | None = None
is_admin: bool | None = None
class CreateAPIKeyRequest(BaseModel):
@ -62,10 +68,17 @@ class CreateCapsuleRequest(BaseModel):
template: str | None = "minimal"
vcpus: int | None = 1
memory_mb: int | None = 512
disk_size_mb: Annotated[
int | None,
Field(
description="Maximum size of the per-capsule copy-on-write disk in MB. Capped at 5 GB by default; the actual size is max(disk_size_mb, origin rootfs size).\n"
),
] = 5120
timeout_sec: Annotated[
int | None,
Field(
description="Auto-pause TTL in seconds. The capsule is automatically paused after this duration of inactivity (no exec or ping). 0 means no auto-pause.\n"
description="Auto-pause TTL in seconds. The capsule is automatically paused after this duration of inactivity (no exec or ping). 0 means no auto-pause. Positive values below 60 are silently clamped to 60 (the agent's startup envelope).\n",
ge=0,
),
] = 0
@ -133,7 +146,10 @@ class Status(StrEnum):
pending = "pending"
starting = "starting"
running = "running"
pausing = "pausing"
paused = "paused"
resuming = "resuming"
stopping = "stopping"
hibernated = "hibernated"
stopped = "stopped"
missing = "missing"
@ -153,6 +169,13 @@ class Capsule(BaseModel):
started_at: AwareDatetime | None = None
last_active_at: AwareDatetime | None = None
last_updated: AwareDatetime | None = None
metadata: Annotated[
dict[str, str] | None,
Field(
description="Free-form key/value labels attached at create-time. Also carries\nagent-side version info (kernel_version, vmm_version,\nagent_version, envd_version) when running.\n"
),
] = None
disk_size_mb: int | None = None
class CreateSnapshotRequest(BaseModel):
@ -177,6 +200,13 @@ class Template(BaseModel):
memory_mb: int | None = None
size_bytes: int | None = None
created_at: AwareDatetime | None = None
platform: Annotated[
bool | None,
Field(
description="True when the template is platform-managed (visible to all teams,\ne.g. the built-in `minimal` rootfs). False for team-owned\nsnapshot templates.\n"
),
] = None
metadata: dict[str, str] | None = None
class ExecRequest(BaseModel):
@ -399,7 +429,7 @@ class HostDeletePreview(BaseModel):
host: Host | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of capsulees that would be destroyed on force-delete."),
Field(description="IDs of capsules that would be destroyed on force-delete."),
] = None
@ -407,8 +437,7 @@ class Error(BaseModel):
code: Annotated[str | None, Field(examples=["host_has_sandboxes"])] = None
message: str | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of active capsulees blocking deletion."),
list[str] | None, Field(description="IDs of active capsules blocking deletion.")
] = None
@ -476,7 +505,9 @@ class MetricPoint(BaseModel):
] = None
mem_bytes: Annotated[
int | None,
Field(description="Resident memory in bytes (VmRSS of Firecracker process)"),
Field(
description="Resident memory in bytes (VmRSS of Cloud Hypervisor process)"
),
] = None
disk_bytes: Annotated[
int | None, Field(description="Allocated disk bytes for the CoW sparse file")
@ -494,12 +525,12 @@ class Provider(StrEnum):
class Event(StrEnum):
capsule_created = "capsule.created"
capsule_running = "capsule.running"
capsule_paused = "capsule.paused"
capsule_destroyed = "capsule.destroyed"
template_snapshot_created = "template.snapshot.created"
template_snapshot_deleted = "template.snapshot.deleted"
capsule_create = "capsule.create"
capsule_pause = "capsule.pause"
capsule_resume = "capsule.resume"
capsule_destroy = "capsule.destroy"
template_snapshot_create = "template.snapshot.create"
template_snapshot_delete = "template.snapshot.delete"
host_up = "host.up"
host_down = "host.down"
@ -591,6 +622,106 @@ class Error1(BaseModel):
error: Error2 | None = None
class ActorType(StrEnum):
user = "user"
api_key = "api_key"
host = "host"
system = "system"
class Status2(StrEnum):
success = "success"
failure = "failure"
class AuditLogEntry(BaseModel):
id: str | None = None
actor_type: ActorType | None = None
actor_id: str | None = None
actor_name: str | None = None
resource_type: str | None = None
resource_id: str | None = None
action: str | None = None
scope: str | None = None
status: Status2 | None = None
metadata: dict[str, Any] | None = None
created_at: AwareDatetime | None = None
class Event2(StrEnum):
connected = "connected"
capsule_create = "capsule.create"
capsule_pause = "capsule.pause"
capsule_resume = "capsule.resume"
capsule_destroy = "capsule.destroy"
capsule_state_changed = "capsule.state.changed"
template_snapshot_create = "template.snapshot.create"
template_snapshot_delete = "template.snapshot.delete"
host_up = "host.up"
host_down = "host.down"
class Outcome(StrEnum):
"""
Present for action events (capsule.* except state.changed,
template.snapshot.*). Absent for host.up/down, capsule.state.changed,
and the connected sentinel.
"""
success = "success"
error = "error"
class Resource(BaseModel):
id: str | None = None
type: str | None = None
class Type4(StrEnum):
user = "user"
api_key = "api_key"
system = "system"
class Actor(BaseModel):
type: Type4 | None = None
id: str | None = None
name: str | None = None
class SSEEvent(BaseModel):
"""
Wire format of one SSE message body. The event name (`event:` line) is
the `kind` and the JSON below is the `data:` line.
"""
event: Event2 | None = None
outcome: Annotated[
Outcome | None,
Field(
description="Present for action events (capsule.* except state.changed,\ntemplate.snapshot.*). Absent for host.up/down, capsule.state.changed,\nand the connected sentinel.\n"
),
] = None
resource: Resource | None = None
actor: Actor | None = None
metadata: Annotated[
dict[str, str] | None,
Field(
description="Event-specific context. Examples: `reason` (ttl_expired,\nhost_failure, cleanup_after_create_error, orphaned),\n`host_ip`, `from`/`to` (for capsule.state.changed).\n"
),
] = None
error: Annotated[
str | None, Field(description="Failure reason; only set when outcome=error.")
] = None
sandbox: Annotated[
Capsule | None,
Field(description="Populated for capsule.* events; null if DB lookup failed."),
] = None
timestamp: AwareDatetime | None = None
class ListDirResponse(BaseModel):
entries: list[FileEntry] | None = None

View File

@ -9,6 +9,10 @@ from typing import Any
import httpx_ws
from pydantic import BaseModel
# A clean (``WebSocketDisconnect``) or abrupt (``WebSocketNetworkError``) close
# both mean the PTY stream has ended; iteration must stop on either.
_WS_CLOSED = (httpx_ws.WebSocketDisconnect, httpx_ws.WebSocketNetworkError)
class PtyEventType(StrEnum):
started = "started"
@ -109,6 +113,13 @@ class PtySession:
def _send_connect(self, tag: str) -> None:
self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
def _send_pong(self) -> None:
"""Reply to a server keepalive ``ping`` so the session stays open."""
try:
self._ws.send_text(json.dumps({"type": "pong"}))
except _WS_CLOSED:
pass
def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
@ -144,7 +155,7 @@ class PtySession:
raise StopIteration
try:
raw = self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
raise StopIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
@ -152,6 +163,8 @@ class PtySession:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.ping:
self._send_pong()
if event.type == PtyEventType.exit:
self._done = True
return event
@ -236,6 +249,13 @@ class AsyncPtySession:
async def _send_connect(self, tag: str) -> None:
await self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
async def _send_pong(self) -> None:
"""Reply to a server keepalive ``ping`` so the session stays open."""
try:
await self._ws.send_text(json.dumps({"type": "pong"}))
except _WS_CLOSED:
pass
async def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
@ -273,7 +293,7 @@ class AsyncPtySession:
raise StopAsyncIteration
try:
raw = await self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
raise StopAsyncIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
@ -281,6 +301,8 @@ class AsyncPtySession:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.ping:
await self._send_pong()
if event.type == PtyEventType.exit:
self._done = True
return event

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import httpx
import respx
from wrenn.capsule import Capsule, _build_proxy_url
@ -30,9 +31,13 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_constructor_creates(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending", "template": "minimal"}
202, json={"id": "cl-1", "status": "starting", "template": "minimal"}
)
cap = Capsule(
template="minimal",
api_key="wrn_test1234567890abcdef12345678",
base_url=BASE,
)
cap = Capsule(template="minimal", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-1"
assert hasattr(cap, "commands")
assert hasattr(cap, "files")
@ -40,7 +45,7 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_create_classmethod(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-2", "status": "pending"}
202, json={"id": "cl-2", "status": "starting"}
)
cap = Capsule.create(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-2"
@ -48,9 +53,9 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_context_manager_kills(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending"}
202, json={"id": "cl-1", "status": "starting"}
)
kill_route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(204)
kill_route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
with Capsule(api_key="wrn_test1234567890abcdef12345678", base_url=BASE) as cap:
assert cap.capsule_id == "cl-1"
assert kill_route.called
@ -59,7 +64,7 @@ class TestCapsuleCreate:
def test_capsule_env_var(self, monkeypatch):
monkeypatch.setenv("WRENN_API_KEY", "wrn_from_env_key")
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-3", "status": "pending"}
202, json={"id": "cl-3", "status": "starting"}
)
cap = Capsule(base_url=BASE)
assert cap.capsule_id == "cl-3"
@ -68,17 +73,21 @@ class TestCapsuleCreate:
class TestCapsuleStaticMethods:
@respx.mock
def test_static_destroy(self):
route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(204)
Capsule._static_destroy("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
Capsule._static_destroy(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert route.called
@respx.mock
def test_static_pause(self):
respx.post(f"{BASE}/v1/capsules/cl-1/pause").respond(
200, json={"id": "cl-1", "status": "paused"}
202, json={"id": "cl-1", "status": "pausing"}
)
info = Capsule._static_pause("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert info.status.value == "paused"
info = Capsule._static_pause(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert info.status.value == "pausing"
@respx.mock
def test_static_list(self):
@ -106,18 +115,24 @@ class TestCapsuleConnect:
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "running"}
)
cap = Capsule.connect("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert cap.capsule_id == "cl-1"
@respx.mock
def test_connect_paused_resumes(self):
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "paused"}
)
get_route = respx.get(f"{BASE}/v1/capsules/cl-1")
get_route.side_effect = [
httpx.Response(200, json={"id": "cl-1", "status": "paused"}),
httpx.Response(200, json={"id": "cl-1", "status": "running"}),
]
respx.post(f"{BASE}/v1/capsules/cl-1/resume").respond(
200, json={"id": "cl-1", "status": "running"}
202, json={"id": "cl-1", "status": "resuming"}
)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
cap = Capsule.connect("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-1"

View File

@ -36,10 +36,10 @@ class TestCapsules:
@respx.mock
def test_create(self, client):
respx.post(f"{BASE}/v1/capsules").respond(
201,
202,
json={
"id": "sb-1",
"status": "pending",
"status": "starting",
"template": "base-python",
"vcpus": 2,
"memory_mb": 1024,
@ -48,12 +48,12 @@ class TestCapsules:
resp = client.capsules.create(template="base-python", vcpus=2, memory_mb=1024)
assert isinstance(resp, Capsule)
assert resp.id == "sb-1"
assert resp.status == Status.pending
assert resp.status == Status.starting
@respx.mock
def test_create_defaults(self, client):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "sb-2", "status": "pending"}
202, json={"id": "sb-2", "status": "starting"}
)
resp = client.capsules.create()
assert resp.id == "sb-2"
@ -77,25 +77,25 @@ class TestCapsules:
@respx.mock
def test_destroy(self, client):
route = respx.delete(f"{BASE}/v1/capsules/sb-1").respond(204)
route = respx.delete(f"{BASE}/v1/capsules/sb-1").respond(202)
client.capsules.destroy("sb-1")
assert route.called
@respx.mock
def test_pause(self, client):
respx.post(f"{BASE}/v1/capsules/sb-1/pause").respond(
200, json={"id": "sb-1", "status": "paused"}
202, json={"id": "sb-1", "status": "pausing"}
)
resp = client.capsules.pause("sb-1")
assert resp.status == Status.paused
assert resp.status == Status.pausing
@respx.mock
def test_resume(self, client):
respx.post(f"{BASE}/v1/capsules/sb-1/resume").respond(
200, json={"id": "sb-1", "status": "running"}
202, json={"id": "sb-1", "status": "resuming"}
)
resp = client.capsules.resume("sb-1")
assert resp.status == Status.running
assert resp.status == Status.resuming
@respx.mock
def test_ping(self, client):
@ -238,7 +238,7 @@ class TestAsyncClient:
async def test_async_capsules_create(self, async_client):
async with async_client:
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "sb-1", "status": "pending"}
202, json={"id": "sb-1", "status": "starting"}
)
resp = await async_client.capsules.create(template="base-python")
assert resp.id == "sb-1"

490
tests/test_commands.py Normal file
View File

@ -0,0 +1,490 @@
"""Unit tests for wrenn.commands — Commands / AsyncCommands.
Covers payload construction (cwd, envs, tag, timeout), foreground/background
dispatch, base64 response decoding, stream-event parsing, and the
WebSocket-backed ``stream`` / ``connect`` iterators (with a fake WS).
"""
from __future__ import annotations
import base64
import json
from contextlib import asynccontextmanager, contextmanager
import httpx_ws
import pytest
import respx
from wrenn.client import AsyncWrennClient, WrennClient
from wrenn.commands import (
AsyncCommands,
CommandHandle,
CommandResult,
Commands,
ProcessInfo,
StreamErrorEvent,
StreamEvent,
StreamExitEvent,
StreamStartEvent,
StreamStderrEvent,
StreamStdoutEvent,
_decode_exec_response,
_parse_stream_event,
)
BASE = "https://app.wrenn.dev/api"
CAPSULE_ID = "cl-cmd123"
EXEC_URL = f"{BASE}/v1/capsules/{CAPSULE_ID}/exec"
PROC_URL = f"{BASE}/v1/capsules/{CAPSULE_ID}/processes"
def _make_commands() -> Commands:
client = WrennClient(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
return Commands(CAPSULE_ID, client.http)
def _make_async_commands() -> AsyncCommands:
client = AsyncWrennClient(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
return AsyncCommands(CAPSULE_ID, client.http)
# ── _decode_exec_response ─────────────────────────────────────────
class TestDecodeExecResponse:
def test_plain_text(self):
result = _decode_exec_response(
{"stdout": "hello\n", "stderr": "", "exit_code": 0, "duration_ms": 12}
)
assert isinstance(result, CommandResult)
assert result.stdout == "hello\n"
assert result.exit_code == 0
assert result.duration_ms == 12
def test_base64_stdout(self):
encoded = base64.b64encode(b"binary\xff\x00out").decode()
result = _decode_exec_response(
{"stdout": encoded, "encoding": "base64", "exit_code": 0}
)
assert "binary" in result.stdout
def test_base64_stderr(self):
out = base64.b64encode(b"ok").decode()
err = base64.b64encode(b"warning").decode()
result = _decode_exec_response(
{"stdout": out, "stderr": err, "encoding": "base64", "exit_code": 1}
)
assert result.stdout == "ok"
assert result.stderr == "warning"
assert result.exit_code == 1
def test_missing_fields_default(self):
result = _decode_exec_response({})
assert result.stdout == ""
assert result.stderr == ""
assert result.exit_code == -1
assert result.duration_ms is None
def test_null_stdout_coerced_to_empty(self):
result = _decode_exec_response({"stdout": None, "stderr": None})
assert result.stdout == ""
assert result.stderr == ""
# ── _parse_stream_event ───────────────────────────────────────────
class TestParseStreamEvent:
def test_start(self):
event = _parse_stream_event({"type": "start", "pid": 99})
assert isinstance(event, StreamStartEvent)
assert event.type == "start"
assert event.pid == 99
def test_stdout(self):
event = _parse_stream_event({"type": "stdout", "data": "out"})
assert isinstance(event, StreamStdoutEvent)
assert event.data == "out"
def test_stderr(self):
event = _parse_stream_event({"type": "stderr", "data": "err"})
assert isinstance(event, StreamStderrEvent)
assert event.data == "err"
def test_exit(self):
event = _parse_stream_event({"type": "exit", "exit_code": 7})
assert isinstance(event, StreamExitEvent)
assert event.exit_code == 7
def test_error(self):
event = _parse_stream_event({"type": "error", "data": "boom"})
assert isinstance(event, StreamErrorEvent)
assert event.data == "boom"
def test_unknown_type(self):
event = _parse_stream_event({"type": "weird"})
assert isinstance(event, StreamEvent)
assert event.type == "weird"
def test_missing_type(self):
event = _parse_stream_event({})
assert event.type == "unknown"
def test_exit_missing_code_defaults(self):
event = _parse_stream_event({"type": "exit"})
assert isinstance(event, StreamExitEvent)
assert event.exit_code == -1
# ── Commands.run — payload construction ───────────────────────────
class TestRunPayload:
@respx.mock
def test_foreground_basic_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"stdout": "hi", "exit_code": 0})
result = _make_commands().run("echo hi")
body = json.loads(route.calls[0].request.content)
assert body["cmd"] == "/bin/sh"
assert body["args"] == ["-c", "echo hi"]
assert body["background"] is False
assert body["timeout_sec"] == 30
assert result.stdout == "hi"
@respx.mock
def test_cwd_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("pwd", cwd="/tmp/work")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/tmp/work"
@respx.mock
def test_cwd_omitted_when_none(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("pwd")
body = json.loads(route.calls[0].request.content)
assert "cwd" not in body
@respx.mock
def test_envs_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("env", envs={"FOO": "bar", "BAZ": "qux"})
body = json.loads(route.calls[0].request.content)
assert body["envs"] == {"FOO": "bar", "BAZ": "qux"}
@respx.mock
def test_empty_envs_still_sent(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("env", envs={})
body = json.loads(route.calls[0].request.content)
assert body["envs"] == {}
@respx.mock
def test_tag_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", tag="my-tag")
body = json.loads(route.calls[0].request.content)
assert body["tag"] == "my-tag"
@respx.mock
def test_custom_timeout_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("sleep 1", timeout=120)
body = json.loads(route.calls[0].request.content)
assert body["timeout_sec"] == 120
@respx.mock
def test_timeout_none_omits_field(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", timeout=None)
body = json.loads(route.calls[0].request.content)
assert "timeout_sec" not in body
@respx.mock
def test_all_kwargs_combined(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", timeout=60, envs={"A": "1"}, cwd="/srv", tag="t")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/srv"
assert body["envs"] == {"A": "1"}
assert body["tag"] == "t"
assert body["timeout_sec"] == 60
class TestRunBackground:
@respx.mock
def test_background_returns_handle(self):
respx.post(EXEC_URL).respond(200, json={"pid": 1234, "tag": "bg"})
handle = _make_commands().run("sleep 100", background=True)
assert isinstance(handle, CommandHandle)
assert handle.pid == 1234
assert handle.tag == "bg"
assert handle.capsule_id == CAPSULE_ID
@respx.mock
def test_background_omits_timeout_sec(self):
route = respx.post(EXEC_URL).respond(200, json={"pid": 1, "tag": "x"})
_make_commands().run("sleep 100", background=True, timeout=30)
body = json.loads(route.calls[0].request.content)
assert "timeout_sec" not in body
assert body["background"] is True
@respx.mock
def test_background_carries_cwd_and_envs(self):
route = respx.post(EXEC_URL).respond(200, json={"pid": 5, "tag": "t"})
_make_commands().run(
"server", background=True, cwd="/app", envs={"PORT": "80"}, tag="srv"
)
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/app"
assert body["envs"] == {"PORT": "80"}
assert body["tag"] == "srv"
@respx.mock
def test_background_missing_pid_defaults_zero(self):
respx.post(EXEC_URL).respond(200, json={"tag": "x"})
handle = _make_commands().run("x", background=True)
assert handle.pid == 0
class TestListAndKill:
@respx.mock
def test_list_parses_processes(self):
respx.get(PROC_URL).respond(
200,
json={
"processes": [
{
"pid": 10,
"tag": "web",
"cmd": "/bin/sh",
"args": ["-c", "serve"],
},
{"pid": 11},
]
},
)
procs = _make_commands().list()
assert len(procs) == 2
assert isinstance(procs[0], ProcessInfo)
assert procs[0].pid == 10
assert procs[0].tag == "web"
assert procs[0].args == ["-c", "serve"]
assert procs[1].pid == 11
assert procs[1].tag is None
@respx.mock
def test_list_empty(self):
respx.get(PROC_URL).respond(200, json={"processes": []})
assert _make_commands().list() == []
@respx.mock
def test_list_missing_key(self):
respx.get(PROC_URL).respond(200, json={})
assert _make_commands().list() == []
@respx.mock
def test_kill_sends_delete(self):
route = respx.delete(f"{PROC_URL}/42").respond(204)
_make_commands().kill(42)
assert route.called
@respx.mock
def test_kill_unknown_pid_raises(self):
from wrenn.exceptions import WrennNotFoundError
respx.delete(f"{PROC_URL}/999").respond(
404, json={"error": {"code": "not_found", "message": "no such process"}}
)
with pytest.raises(WrennNotFoundError):
_make_commands().kill(999)
# ── Fake WebSocket plumbing for stream / connect ──────────────────
class _FakeWS:
"""Synchronous fake WebSocket session."""
def __init__(self, messages: list) -> None:
self._messages = list(messages)
self.sent: list[str] = []
def send_text(self, text: str) -> None:
self.sent.append(text)
def receive_json(self) -> dict:
if not self._messages:
raise httpx_ws.WebSocketDisconnect()
msg = self._messages.pop(0)
if isinstance(msg, Exception):
raise msg
return msg
class _AsyncFakeWS:
"""Asynchronous fake WebSocket session."""
def __init__(self, messages: list) -> None:
self._messages = list(messages)
self.sent: list[str] = []
async def send_text(self, text: str) -> None:
self.sent.append(text)
async def receive_json(self) -> dict:
if not self._messages:
raise httpx_ws.WebSocketDisconnect()
msg = self._messages.pop(0)
if isinstance(msg, Exception):
raise msg
return msg
def _patch_sync_ws(monkeypatch, ws: _FakeWS) -> None:
@contextmanager
def _fake_connect(url, client):
yield ws
monkeypatch.setattr("wrenn.commands.httpx_ws.connect_ws", _fake_connect)
def _patch_async_ws(monkeypatch, ws: _AsyncFakeWS) -> None:
@asynccontextmanager
async def _fake_aconnect(url, client):
yield ws
monkeypatch.setattr("wrenn.commands.httpx_ws.aconnect_ws", _fake_aconnect)
# ── Commands.stream ───────────────────────────────────────────────
class TestStream:
def test_stream_sends_shell_wrapped_start(self, monkeypatch):
ws = _FakeWS([{"type": "exit", "exit_code": 0}])
_patch_sync_ws(monkeypatch, ws)
list(_make_commands().stream("echo hi"))
start = json.loads(ws.sent[0])
assert start == {"type": "start", "cmd": "/bin/sh", "args": ["-c", "echo hi"]}
def test_stream_with_explicit_args(self, monkeypatch):
ws = _FakeWS([{"type": "exit", "exit_code": 0}])
_patch_sync_ws(monkeypatch, ws)
list(_make_commands().stream("/usr/bin/env", args=["python", "-V"]))
start = json.loads(ws.sent[0])
assert start == {
"type": "start",
"cmd": "/usr/bin/env",
"args": ["python", "-V"],
}
def test_stream_yields_events_until_exit(self, monkeypatch):
ws = _FakeWS(
[
{"type": "start", "pid": 3},
{"type": "stdout", "data": "line1"},
{"type": "stderr", "data": "warn"},
{"type": "exit", "exit_code": 0},
{"type": "stdout", "data": "after-exit-ignored"},
]
)
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("echo line1"))
assert [e.type for e in events] == ["start", "stdout", "stderr", "exit"]
def test_stream_stops_on_error(self, monkeypatch):
ws = _FakeWS([{"type": "error", "data": "fatal"}])
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("bad"))
assert len(events) == 1
assert events[0].type == "error"
def test_stream_handles_disconnect(self, monkeypatch):
ws = _FakeWS([{"type": "stdout", "data": "x"}]) # then disconnect
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("echo x"))
assert [e.type for e in events] == ["stdout"]
# ── Commands.connect ──────────────────────────────────────────────
class TestConnect:
def test_connect_yields_until_exit(self, monkeypatch):
ws = _FakeWS(
[
{"type": "stdout", "data": "tick"},
{"type": "exit", "exit_code": 0},
]
)
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().connect(55))
assert [e.type for e in events] == ["stdout", "exit"]
def test_connect_handles_disconnect(self, monkeypatch):
ws = _FakeWS([]) # immediate disconnect
_patch_sync_ws(monkeypatch, ws)
assert list(_make_commands().connect(1)) == []
# ── AsyncCommands ─────────────────────────────────────────────────
class TestAsyncCommands:
@pytest.mark.asyncio
@respx.mock
async def test_async_run_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"stdout": "hi", "exit_code": 0})
cmds = _make_async_commands()
result = await cmds.run("echo hi", cwd="/tmp", envs={"K": "v"}, tag="z")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/tmp"
assert body["envs"] == {"K": "v"}
assert body["tag"] == "z"
assert result.stdout == "hi"
@pytest.mark.asyncio
@respx.mock
async def test_async_run_background(self):
respx.post(EXEC_URL).respond(200, json={"pid": 7, "tag": "bg"})
handle = await _make_async_commands().run("sleep 1", background=True)
assert isinstance(handle, CommandHandle)
assert handle.pid == 7
@pytest.mark.asyncio
@respx.mock
async def test_async_list(self):
respx.get(PROC_URL).respond(200, json={"processes": [{"pid": 1, "tag": "a"}]})
procs = await _make_async_commands().list()
assert len(procs) == 1
assert procs[0].pid == 1
@pytest.mark.asyncio
@respx.mock
async def test_async_kill(self):
route = respx.delete(f"{PROC_URL}/3").respond(204)
await _make_async_commands().kill(3)
assert route.called
@pytest.mark.asyncio
async def test_async_stream(self, monkeypatch):
ws = _AsyncFakeWS(
[
{"type": "start", "pid": 1},
{"type": "stdout", "data": "out"},
{"type": "exit", "exit_code": 0},
]
)
_patch_async_ws(monkeypatch, ws)
events = [e async for e in _make_async_commands().stream("echo out")]
assert [e.type for e in events] == ["start", "stdout", "exit"]
start = json.loads(ws.sent[0])
assert start["cmd"] == "/bin/sh"
@pytest.mark.asyncio
async def test_async_connect(self, monkeypatch):
ws = _AsyncFakeWS([{"type": "exit", "exit_code": 0}])
_patch_async_ws(monkeypatch, ws)
events = [e async for e in _make_async_commands().connect(9)]
assert [e.type for e in events] == ["exit"]

View File

@ -341,6 +341,39 @@ class TestPtySessionIteration:
assert events == []
class TestPtySessionPong:
def test_ping_triggers_pong(self):
ws = MagicMock()
ws.receive_text.side_effect = [
json.dumps({"type": "ping"}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = PtySession(ws, "cl-abc")
events = list(session)
assert events[0].type == PtyEventType.ping
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} in sent
def test_no_pong_without_ping(self):
ws = MagicMock()
ws.receive_text.side_effect = [
json.dumps({"type": "output", "data": ""}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = PtySession(ws, "cl-abc")
list(session)
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} not in sent
def test_send_pong_swallows_closed_ws(self):
import httpx_ws
ws = MagicMock()
ws.send_text.side_effect = httpx_ws.WebSocketNetworkError()
session = PtySession(ws, "cl-abc")
session._send_pong() # must not raise
class TestPtySessionContextManager:
def test_exit_kills_and_closes(self):
ws = MagicMock()
@ -450,6 +483,28 @@ class TestAsyncPtySession:
assert sent["cmd"] == "/bin/zsh"
assert sent["cols"] == 100
@pytest.mark.asyncio
async def test_async_ping_triggers_pong(self):
ws = AsyncMock()
ws.receive_text.side_effect = [
json.dumps({"type": "ping"}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = AsyncPtySession(ws, "cl-abc")
events = [e async for e in session]
assert events[0].type == PtyEventType.ping
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} in sent
@pytest.mark.asyncio
async def test_async_send_pong_swallows_closed_ws(self):
import httpx_ws
ws = AsyncMock()
ws.send_text.side_effect = httpx_ws.WebSocketNetworkError()
session = AsyncPtySession(ws, "cl-abc")
await session._send_pong() # must not raise
@pytest.mark.asyncio
async def test_async_iteration(self):
ws = AsyncMock()

View File

@ -15,17 +15,6 @@ pytestmark = pytest.mark.integration
_env_loaded = False
def _wait_for_pid_dead(capsule: Capsule, pid: int, timeout: float = 5.0) -> bool:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = capsule.commands.run(f"ps -p {pid} -o stat= 2>/dev/null || true")
state = result.stdout.strip()
if not state or state.startswith("Z"):
return True
time.sleep(0.2)
return False
def _ensure_env() -> None:
global _env_loaded
if _env_loaded:
@ -57,7 +46,7 @@ class TestCapsuleLifecycle:
assert capsule_id
assert capsule.info is not None
finally:
capsule.destroy()
capsule.destroy(wait=True)
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
@ -76,7 +65,7 @@ class TestCapsuleLifecycle:
assert capsule.is_running()
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
assert info.status in (Status.stopping, Status.stopped, Status.missing)
def test_get_info(self):
capsule = Capsule(wait=True)
@ -91,11 +80,11 @@ class TestCapsuleLifecycle:
def test_pause_and_resume(self):
capsule = Capsule(wait=True)
try:
paused = capsule.pause()
paused = capsule.pause(wait=True)
assert paused.status == Status.paused
assert not capsule.is_running()
resumed = capsule.resume()
resumed = capsule.resume(wait=True)
assert resumed.status == Status.running
finally:
capsule.destroy()
@ -104,7 +93,7 @@ class TestCapsuleLifecycle:
capsule = Capsule(wait=True)
capsule_id = capsule.capsule_id
try:
Capsule.destroy(capsule_id)
Capsule.destroy(capsule_id, wait=True)
except Exception:
capsule.destroy()
raise
@ -229,7 +218,14 @@ class TestCommands:
def test_kill_process(self):
handle = self.capsule.commands.run("sleep 30", background=True)
self.capsule.commands.kill(handle.pid)
assert _wait_for_pid_dead(self.capsule, handle.pid)
# Registry prune runs asynchronously after the process end event,
# so poll rather than asserting on a zero-delay list().
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if handle.pid not in [p.pid for p in self.capsule.commands.list()]:
break
time.sleep(0.2)
assert handle.pid not in [p.pid for p in self.capsule.commands.list()]
def test_run_duration_ms(self):
result = self.capsule.commands.run("sleep 1")

View File

@ -0,0 +1,499 @@
"""Advanced integration tests against a live Wrenn server.
Skipped automatically when ``WRENN_API_KEY`` is not set (see conftest.py).
Covers working-directory / environment handling, long-running commands
(``apt-get``), interactive PTY sessions, streaming exec, and real ``git``
workflows including cloning ``github.com/wrennhq/wrenn``.
"""
from __future__ import annotations
import os
import time
import uuid
from pathlib import Path
import pytest
from wrenn import Capsule
from wrenn.commands import StreamExitEvent, StreamStartEvent
from wrenn.exceptions import WrennError
from wrenn.pty import PtyEventType
pytestmark = pytest.mark.integration
WRENN_REPO = "https://github.com/wrennhq/wrenn"
_env_loaded = False
def _ensure_env() -> None:
global _env_loaded
if _env_loaded:
return
_env_loaded = True
env_file = Path(__file__).resolve().parent.parent / ".env"
if not env_file.exists():
return
for line in env_file.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key, value = key.strip(), value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
# ══════════════════════════════════════════════════════════════════
# Working directory & environment
# ══════════════════════════════════════════════════════════════════
class TestCommandEnvironment:
"""cwd / envs handling for foreground commands."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_cwd_changes_working_directory(self):
result = self.capsule.commands.run("pwd", cwd="/tmp")
assert result.exit_code == 0
assert result.stdout.strip() == "/tmp"
def test_default_cwd_is_home(self):
result = self.capsule.commands.run("pwd")
assert result.stdout.strip() == "/root"
def test_cwd_resolves_relative_paths(self):
self.capsule.files.make_dir("/tmp/cwd_probe/sub")
result = self.capsule.commands.run("ls", cwd="/tmp/cwd_probe")
assert "sub" in result.stdout
def test_cwd_nonexistent_raises(self):
with pytest.raises(WrennError):
self.capsule.commands.run("pwd", cwd="/no/such/dir/xyz")
def test_cwd_does_not_persist_between_calls(self):
# Each run is a fresh process — `cd` in one does not affect the next.
self.capsule.commands.run("cd /tmp")
result = self.capsule.commands.run("pwd")
assert result.stdout.strip() == "/root"
def test_single_env_var(self):
result = self.capsule.commands.run("echo $GREETING", envs={"GREETING": "hi"})
assert result.stdout.strip() == "hi"
def test_multiple_env_vars(self):
result = self.capsule.commands.run(
"echo $A-$B-$C", envs={"A": "1", "B": "2", "C": "3"}
)
assert result.stdout.strip() == "1-2-3"
def test_env_vars_do_not_leak_between_calls(self):
self.capsule.commands.run("echo $SECRET", envs={"SECRET": "leaky"})
result = self.capsule.commands.run("echo [$SECRET]")
assert result.stdout.strip() == "[]"
def test_env_var_with_special_chars(self):
value = "a b&c|d;e"
result = self.capsule.commands.run('printf "%s" "$X"', envs={"X": value})
assert result.stdout == value
def test_base_environment_present(self):
result = self.capsule.commands.run("echo $HOME; echo $PATH")
lines = result.stdout.strip().splitlines()
assert lines[0] == "/root"
assert "/usr/bin" in lines[1]
# ══════════════════════════════════════════════════════════════════
# Long-running commands
# ══════════════════════════════════════════════════════════════════
class TestLongRunningCommands:
"""apt-get installs and other slow commands."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_apt_get_install(self):
result = self.capsule.commands.run(
"apt-get update -qq && apt-get install -y -qq cowsay", timeout=300
)
assert result.exit_code == 0
def test_apt_installed_binary_runs(self):
# Depends on test_apt_get_install having installed the package.
self.capsule.commands.run("apt-get install -y -qq cowsay", timeout=300)
result = self.capsule.commands.run("/usr/games/cowsay moo")
assert result.exit_code == 0
assert "moo" in result.stdout
def test_foreground_timeout_raises(self):
# A command exceeding its timeout surfaces as a server-side error.
with pytest.raises(WrennError):
self.capsule.commands.run("sleep 20", timeout=2)
def test_long_sleep_in_background_returns_immediately(self):
start = time.monotonic()
handle = self.capsule.commands.run(
"sleep 60", background=True, tag="long-sleep"
)
elapsed = time.monotonic() - start
assert elapsed < 10
assert handle.pid > 0
self.capsule.commands.kill(handle.pid)
def test_slow_command_within_timeout(self):
result = self.capsule.commands.run("sleep 3 && echo done", timeout=30)
assert result.exit_code == 0
assert result.stdout.strip() == "done"
# ══════════════════════════════════════════════════════════════════
# PTY sessions
# ══════════════════════════════════════════════════════════════════
def _drain_pty(term, *, max_events: int = 200) -> tuple[bytes, int | None]:
"""Collect PTY output until exit; return (output, exit_code)."""
output = b""
exit_code: int | None = None
for i, event in enumerate(term):
if event.type == PtyEventType.output and event.data:
output += event.data
elif event.type == PtyEventType.exit:
exit_code = event.exit_code
break
elif event.type == PtyEventType.error and event.fatal:
break
if i >= max_events:
break
return output, exit_code
class TestPty:
"""Interactive PTY behaviour."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_pty_runs_command_and_exits(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"echo pty-result-$((6*7))\n")
term.write(b"exit\n")
output, exit_code = _drain_pty(term)
assert b"pty-result-42" in output
assert exit_code is not None
def test_pty_started_event_sets_tag_and_pid(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"exit\n")
_drain_pty(term)
assert term.tag is not None
assert term.tag.startswith("pty-")
assert term.pid is not None and term.pid > 0
def test_pty_respects_cwd(self):
with self.capsule.pty(cmd="/bin/bash", cwd="/tmp") as term:
term.write(b"pwd\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"/tmp" in output
def test_pty_respects_envs(self):
with self.capsule.pty(cmd="/bin/bash", envs={"PTY_VAR": "xyzzy"}) as term:
term.write(b"echo marker-$PTY_VAR\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"marker-xyzzy" in output
def test_pty_resize(self):
with self.capsule.pty(cmd="/bin/bash", cols=80, rows=24) as term:
term.resize(120, 40)
term.write(b"echo resized\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"resized" in output
def test_pty_explicit_command(self):
with self.capsule.pty(cmd="/bin/echo", args=["hello-from-argv"]) as term:
output, exit_code = _drain_pty(term)
assert b"hello-from-argv" in output
def test_pty_exit_code_nonzero(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"exit 3\n")
_, exit_code = _drain_pty(term)
assert exit_code == 3
def test_pty_survives_idle_ping_cycle(self):
# The server emits a keepalive `ping` (~every 30s); the SDK must
# auto-reply `pong` and the session must stay usable afterwards.
with self.capsule.pty(cmd="/bin/bash") as term:
saw_ping = False
for event in term:
if event.type == PtyEventType.ping:
saw_ping = True
break
if event.type == PtyEventType.exit:
break
if event.type == PtyEventType.error and event.fatal:
break
assert saw_ping, "no keepalive ping received"
term.write(b"echo still-alive\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"still-alive" in output
# ══════════════════════════════════════════════════════════════════
# Streaming exec
# ══════════════════════════════════════════════════════════════════
class TestStreamingExec:
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_stream_emits_start_and_exit(self):
events = list(self.capsule.commands.stream("echo streamed"))
types = [e.type for e in events]
assert "exit" in types
starts = [e for e in events if isinstance(e, StreamStartEvent)]
exits = [e for e in events if isinstance(e, StreamExitEvent)]
assert exits and exits[0].exit_code == 0
if starts:
assert starts[0].pid > 0
def test_stream_captures_stdout(self):
events = list(self.capsule.commands.stream("for i in 1 2 3; do echo n$i; done"))
out = "".join(
e.data for e in events if e.type == "stdout" and getattr(e, "data", None)
)
assert "n1" in out and "n3" in out
def test_stream_nonzero_exit(self):
events = list(self.capsule.commands.stream("exit 5"))
exits = [e for e in events if isinstance(e, StreamExitEvent)]
assert exits and exits[0].exit_code == 5
# ══════════════════════════════════════════════════════════════════
# Process connect — attach to a background process over WebSocket
# ══════════════════════════════════════════════════════════════════
class TestProcessConnect:
"""commands.connect — must survive the server's abrupt WebSocket close."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_connect_streams_running_process(self):
handle = self.capsule.commands.run(
"for i in $(seq 1 5); do echo tick$i; sleep 1; done",
background=True,
tag="connect-run",
)
time.sleep(0.3)
events = list(self.capsule.commands.connect(handle.pid))
types = [e.type for e in events]
assert "exit" in types
# connect streams output from the attach point onward, so early
# ticks may be missed — assert it captured the live tail.
out = "".join(
e.data for e in events if e.type == "stdout" and getattr(e, "data", None)
)
assert "tick" in out
def test_connect_to_finished_process_does_not_raise(self):
handle = self.capsule.commands.run("echo quick", background=True)
time.sleep(2)
# Process already exited — server closes the WebSocket abruptly;
# the iterator must terminate cleanly rather than raise.
events = list(self.capsule.commands.connect(handle.pid))
assert isinstance(events, list)
# ══════════════════════════════════════════════════════════════════
# Git — real workflows including cloning wrennhq/wrenn
# ══════════════════════════════════════════════════════════════════
class TestGitClone:
"""Clone github.com/wrennhq/wrenn and operate on it."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
cls.capsule.git.clone(WRENN_REPO, "/root/wrenn", depth=1, timeout=300)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_clone_created_repo(self):
assert self.capsule.files.exists("/root/wrenn/.git")
def test_clone_checked_out_files(self):
entries = self.capsule.files.list("/root/wrenn")
names = [e.name for e in entries]
assert "README.md" in names
def test_status_of_clone_is_clean(self):
status = self.capsule.git.status(cwd="/root/wrenn")
assert status.branch == "main"
assert status.is_clean
def test_branches_lists_main(self):
branches = self.capsule.git.branches(cwd="/root/wrenn")
names = [b.name for b in branches]
assert "main" in names
assert any(b.is_current for b in branches)
def test_remote_get_origin(self):
url = self.capsule.git.remote_get("origin", cwd="/root/wrenn")
assert url is not None
assert "wrennhq/wrenn" in url
def test_git_log_has_commit(self):
result = self.capsule.commands.run("git log --oneline -1", cwd="/root/wrenn")
assert result.exit_code == 0
assert result.stdout.strip()
def test_modify_add_commit(self):
marker = uuid.uuid4().hex
self.capsule.git.configure_user(
"CI Bot", "ci@example.com", cwd="/root/wrenn", scope="local"
)
self.capsule.files.write(f"/root/wrenn/sdk_probe_{marker}.txt", marker)
self.capsule.git.add([f"sdk_probe_{marker}.txt"], cwd="/root/wrenn")
staged = self.capsule.git.status(cwd="/root/wrenn")
assert staged.has_staged
result = self.capsule.git.commit("probe commit", cwd="/root/wrenn")
assert result.exit_code == 0
after = self.capsule.git.status(cwd="/root/wrenn")
assert after.is_clean
assert after.ahead >= 1
def test_create_and_checkout_branch_in_clone(self):
self.capsule.git.create_branch("sdk-feature", cwd="/root/wrenn")
branches = self.capsule.git.branches(cwd="/root/wrenn")
current = [b for b in branches if b.is_current]
assert current and current[0].name == "sdk-feature"
self.capsule.git.checkout_branch("main", cwd="/root/wrenn")
def test_diff_via_commands(self):
self.capsule.files.write("/root/wrenn/README.md", "overwritten\n")
try:
result = self.capsule.commands.run("git diff --stat", cwd="/root/wrenn")
assert "README.md" in result.stdout
finally:
self.capsule.git.restore(["README.md"], worktree=True, cwd="/root/wrenn")
class TestGitErrors:
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_clone_nonexistent_repo_raises(self):
from wrenn._git import GitError
with pytest.raises(GitError):
self.capsule.git.clone(
"https://github.com/wrennhq/this-repo-does-not-exist-xyz",
"/root/missing",
timeout=120,
)
def test_status_outside_repo_raises(self):
from wrenn._git import GitError
with pytest.raises(GitError):
self.capsule.git.status(cwd="/tmp")
def test_clone_with_branch(self):
self.capsule.git.clone(
WRENN_REPO, "/root/wrenn-main", branch="main", depth=1, timeout=300
)
status = self.capsule.git.status(cwd="/root/wrenn-main")
assert status.branch == "main"

4
uv.lock generated
View File

@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
resolution-markers = [
"python_full_version >= '3.14'",
@ -1121,7 +1121,7 @@ wheels = [
[[package]]
name = "wrenn"
version = "0.1.1"
version = "0.1.4"
source = { editable = "." }
dependencies = [
{ name = "email-validator" },