Merge pull request 'fix: update SDK for v0.2.0 API compatibility' (#10) from fix/0.2-compatibility into dev
Some checks failed
ci/woodpecker/pr/check Pipeline failed

Reviewed-on: #10
This commit is contained in:
2026-05-19 11:16:20 +00:00
22 changed files with 2814 additions and 357 deletions

1
.gitignore vendored
View File

@ -181,3 +181,4 @@ CODE_EXECUTION.md
.code-review-graph/
.claude
.mcp.json
AGENTS.md

View File

@ -1,56 +0,0 @@
# AGENTS.md
## Project
Wrenn Python SDK — a client library for the Wrenn microVM platform. e2b drop-in replacement.
Package name: `wrenn`. Python 3.13+, managed with [uv](https://docs.astral.sh/uv/).
## Commands
```bash
uv sync # install deps
make lint # ruff check + format check (no auto-fix)
make test # unit tests only (tests/test_client.py)
make test-integration # all tests including integration (needs live server)
make generate # regenerate models from OpenAPI spec (fetches from remote)
make check # lint + unit test
```
- `make test` only runs `tests/test_client.py`, not all unit tests. To run a specific test file: `uv run pytest tests/test_capsule_features.py -v`
- No typecheck step in Makefile or CI. `mypy` is a dev dependency but not wired up — do not assume it runs.
## Architecture
- `src/wrenn/` — the library package
- `capsule.py` / `async_capsule.py` — high-level `Capsule` / `AsyncCapsule` (main user-facing classes)
- `client.py` — low-level `WrennClient` / `AsyncWrennClient`
- `commands.py` — command execution and streaming
- `files.py` — filesystem operations
- `pty.py` — interactive terminal (PTY) over WebSocket
- `exceptions.py` — typed error hierarchy (`WrennError` base)
- `models/_generated.py`**auto-generated** from OpenAPI spec via `datamodel-codegen` (never edit directly; run `make generate`)
- `sandbox.py` — deprecated `Sandbox` alias for `Capsule`
- `code_interpreter/` — specialized capsule for stateful Jupyter kernel execution
- `tests/` — unit tests use `respx` to mock `httpx`; integration tests are in `tests/integration/`
- `api/openapi.yaml` — downloaded OpenAPI spec used for code generation
## Key Conventions
- Generated code lives in `src/wrenn/models/_generated.py`. Never edit it. Run `make generate` to update.
- `Sandbox` is a deprecated alias for `Capsule`. New code should use `Capsule` / `AsyncCapsule`.
- Dual sync/async API: every major class has an `Async` counterpart.
- Uses `httpx` for HTTP, `httpx-ws` for WebSockets, `pydantic` for models.
- `__init__.py` uses `__getattr__` for lazy deprecated aliases (`Sandbox`, `WrennHostHasSandboxesError`).
## Testing
- Unit tests mock HTTP via `respx` (httpx mocking library).
- Integration tests require env vars: `WRENN_API_KEY` (or `WRENN_TOKEN`), optionally `WRENN_BASE_URL`.
- Integration test fixtures in `tests/integration/conftest.py` create real capsules and clean them up.
- `pytest` marker: `@pytest.mark.integration` for tests needing a live server.
## CI
Woodpecker CI (`.woodpecker/check.yml`) runs on push to `main` and `dev`:
1. `make lint`
2. `make test` (unit tests only — integration tests are not in CI)

File diff suppressed because it is too large Load Diff

View File

@ -1964,15 +1964,17 @@ inactivity TTL is set.
#### wait\_ready
```python
async def wait_ready(timeout: float = 30, interval: float = 0.5) -> None
async def wait_ready(timeout: float = 30) -> None
```
Await until the capsule status is ``running``.
Polling interval adapts to the current transient status:
0.5 s for starting/resuming, 2 s for pausing, 1 s for stopping.
**Arguments**:
- `timeout` _float_ - Maximum seconds to wait. Defaults to ``30``.
- `interval` _float_ - Polling interval in seconds. Defaults to ``0.5``.
**Raises**:
@ -2534,15 +2536,17 @@ inactivity TTL is set.
#### wait\_ready
```python
def wait_ready(timeout: float = 30, interval: float = 0.5) -> None
def wait_ready(timeout: float = 30) -> None
```
Block until the capsule status is ``running``.
Polling interval adapts to the current transient status:
0.5 s for starting/resuming, 2 s for pausing, 1 s for stopping.
**Arguments**:
- `timeout` _float_ - Maximum seconds to wait. Defaults to ``30``.
- `interval` _float_ - Polling interval in seconds. Defaults to ``0.5``.
**Raises**:
@ -2700,17 +2704,6 @@ Create a snapshot template from this capsule's current state.
# wrenn.\_config
<a id="wrenn._config.ConnectionConfig"></a>
## ConnectionConfig Objects
```python
@dataclass(frozen=True)
class ConnectionConfig()
```
Resolved credentials and base URL for Wrenn API calls.
<a id="wrenn._git._auth"></a>
# wrenn.\_git.\_auth

View File

@ -1,6 +1,6 @@
[project]
name = "wrenn"
version = "0.1.3"
version = "0.1.4"
description = "Python SDK for Wrenn"
readme = "README.md"
license = "MIT"

View File

@ -37,7 +37,7 @@ from wrenn.exceptions import (
from wrenn.models import FileEntry
from wrenn.pty import AsyncPtySession, PtyEvent, PtyEventType, PtySession
__version__ = "0.1.0"
__version__ = "0.1.4"
__all__ = [
"__version__",

View File

@ -1,8 +1,8 @@
from __future__ import annotations
import asyncio
import logging
import builtins
import logging
import time
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
@ -10,15 +10,54 @@ from contextlib import asynccontextmanager
import httpx_ws
from wrenn._git import AsyncGit
from wrenn.capsule import _DualMethod, _build_proxy_url
from wrenn.capsule import (
_DEFAULT_WAIT_TIMEOUT,
_DESTROY_INTERVAL,
_FAIL_STATUSES,
_PAUSE_INTERVAL,
_RESUME_INTERVAL,
_START_INTERVAL,
_DualMethod,
_build_proxy_url,
)
from wrenn.client import AsyncWrennClient
from wrenn.commands import AsyncCommands
from wrenn.exceptions import WrennNotFoundError
from wrenn.files import AsyncFiles
from wrenn.models import Capsule as CapsuleModel
from wrenn.models import Status, Template
from wrenn.pty import AsyncPtySession
async def _apoll_until(
fetch,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
fail_on: set[Status] | None = None,
) -> CapsuleModel:
fail = fail_on if fail_on is not None else _FAIL_STATUSES
treat_missing_as_target = Status.missing in targets
deadline = time.monotonic() + timeout
last: CapsuleModel | None = None
while time.monotonic() < deadline:
try:
last = await fetch()
except WrennNotFoundError:
if treat_missing_as_target:
return CapsuleModel(status=Status.missing)
raise
if last.status in targets:
return last
if last.status is not None and last.status in fail:
raise RuntimeError(f"Capsule entered {last.status} state while waiting")
await asyncio.sleep(interval)
raise TimeoutError(
f"Capsule did not reach {targets} within {timeout}s "
f"(last status: {last.status if last else 'unknown'})"
)
class AsyncCapsule:
"""Async Wrenn capsule with e2b-compatible interface.
@ -139,15 +178,21 @@ class AsyncCapsule:
client = AsyncWrennClient(api_key=api_key, base_url=base_url)
info = await client.capsules.get(capsule_id)
if info.status == Status.paused:
info = await client.capsules.resume(capsule_id)
return cls(
capsule = cls(
_capsule_id=capsule_id,
_client=client,
_info=info,
)
if info.status == Status.pausing:
info = await capsule._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
if info.status == Status.paused:
await client.capsules.resume(capsule_id)
if info.status != Status.running:
await capsule.wait_ready()
return capsule
# ── Dual instance/static lifecycle ──────────────────────────
destroy = _DualMethod("_instance_destroy", "_static_destroy")
@ -155,22 +200,35 @@ class AsyncCapsule:
resume = _DualMethod("_instance_resume", "_static_resume")
get_info = _DualMethod("_instance_get_info", "_static_get_info")
async def _instance_destroy(self) -> None:
async def _instance_destroy(self, wait: bool = False) -> None:
await self._client.capsules.destroy(self._id)
if wait:
await self._wait_for_status(
{Status.stopped, Status.missing}, _DESTROY_INTERVAL
)
@classmethod
async def _static_destroy(
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> None:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
await client.capsules.destroy(capsule_id)
if wait:
await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.stopped, Status.missing},
_DESTROY_INTERVAL,
)
async def _instance_pause(self) -> CapsuleModel:
async def _instance_pause(self, wait: bool = False) -> CapsuleModel:
self._info = await self._client.capsules.pause(self._id)
if wait:
self._info = await self._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
return self._info
@classmethod
@ -178,14 +236,24 @@ class AsyncCapsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
return await client.capsules.pause(capsule_id)
info = await client.capsules.pause(capsule_id)
if wait:
info = await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.paused},
_PAUSE_INTERVAL,
)
return info
async def _instance_resume(self) -> CapsuleModel:
async def _instance_resume(self, wait: bool = False) -> CapsuleModel:
self._info = await self._client.capsules.resume(self._id)
if wait:
self._info = await self._wait_for_status({Status.running}, _RESUME_INTERVAL)
return self._info
@classmethod
@ -193,11 +261,19 @@ class AsyncCapsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
return await client.capsules.resume(capsule_id)
info = await client.capsules.resume(capsule_id)
if wait:
info = await _apoll_until(
lambda: client.capsules.get(capsule_id),
{Status.running},
_RESUME_INTERVAL,
)
return info
async def _instance_get_info(self) -> CapsuleModel:
self._info = await self._client.capsules.get(self._id)
@ -224,31 +300,30 @@ class AsyncCapsule:
"""
await self._client.capsules.ping(self._id)
async def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Await until the capsule status is ``running``.
async def _wait_for_status(
self,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
) -> CapsuleModel:
info = await _apoll_until(
lambda: self._client.capsules.get(self._id),
targets,
interval,
timeout,
fail_on={Status.error, Status.stopped, Status.missing} - targets,
)
self._info = info
return info
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
async def wait_ready(self, timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None:
"""Await until capsule status is ``running``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
TimeoutError: If capsule does not reach ``running`` within ``timeout``.
RuntimeError: If capsule enters error/stopped/missing while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = await self._client.capsules.get(self._id)
if info.status == Status.running:
self._info = info
return
if info.status in (Status.error, Status.stopped):
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
if info.status == Status.paused:
info = await self._client.capsules.resume(self._id)
await asyncio.sleep(interval)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
await self._wait_for_status({Status.running}, _START_INTERVAL, timeout)
async def is_running(self) -> bool:
"""Check whether the capsule is currently running.

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import logging
import builtins
import logging
import time
from collections.abc import Iterator
from contextlib import contextmanager
@ -13,6 +13,7 @@ import httpx_ws
from wrenn._git import Git
from wrenn.client import WrennClient
from wrenn.commands import Commands
from wrenn.exceptions import WrennNotFoundError
from wrenn.files import Files
from wrenn.models import Capsule as CapsuleModel
from wrenn.models import Status, Template
@ -28,6 +29,44 @@ def _build_proxy_url(base_url: str, capsule_id: str | None, port: int) -> str:
return f"{scheme}://{port}-{capsule_id}.{host}"
_RESUME_INTERVAL = 0.5
_DESTROY_INTERVAL = 0.5
_PAUSE_INTERVAL = 2.0
_START_INTERVAL = 0.5
_DEFAULT_WAIT_TIMEOUT = 30.0
_FAIL_STATUSES = {Status.error}
def _poll_until(
fetch,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
fail_on: set[Status] | None = None,
) -> CapsuleModel:
"""Poll ``fetch()`` until status ∈ ``targets``. Raise on ``fail_on``/timeout."""
fail = fail_on if fail_on is not None else _FAIL_STATUSES
treat_missing_as_target = Status.missing in targets
deadline = time.monotonic() + timeout
last: CapsuleModel | None = None
while time.monotonic() < deadline:
try:
last = fetch()
except WrennNotFoundError:
if treat_missing_as_target:
return CapsuleModel(status=Status.missing)
raise
if last.status in targets:
return last
if last.status is not None and last.status in fail:
raise RuntimeError(f"Capsule entered {last.status} state while waiting")
time.sleep(interval)
raise TimeoutError(
f"Capsule did not reach {targets} within {timeout}s "
f"(last status: {last.status if last else 'unknown'})"
)
class _DualMethod:
"""Descriptor that dispatches to instance method or classmethod depending on call site."""
@ -100,9 +139,6 @@ class Capsule:
self._id: str = _capsule_id
self._client = _client
self._info = _info
if self._id is None:
self._client.close()
raise RuntimeError("API returned a capsule without an ID")
else:
self._client = WrennClient(api_key=api_key, base_url=base_url)
try:
@ -112,9 +148,9 @@ class Capsule:
memory_mb=memory_mb,
timeout_sec=timeout,
)
self._id = self._info.id
if self._id is None:
if self._info.id is None:
raise RuntimeError("API returned a capsule without an ID")
self._id = self._info.id
except Exception:
self._client.close()
raise
@ -213,15 +249,21 @@ class Capsule:
client = WrennClient(api_key=api_key, base_url=base_url)
info = client.capsules.get(capsule_id)
if info.status == Status.paused:
info = client.capsules.resume(capsule_id)
return cls(
capsule = cls(
_capsule_id=capsule_id,
_client=client,
_info=info,
)
if info.status == Status.pausing:
info = capsule._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
if info.status == Status.paused:
client.capsules.resume(capsule_id)
if info.status != Status.running:
capsule.wait_ready()
return capsule
# ── Dual instance/static lifecycle ──────────────────────────
destroy = _DualMethod("_instance_destroy", "_static_destroy")
@ -229,25 +271,36 @@ class Capsule:
resume = _DualMethod("_instance_resume", "_static_resume")
get_info = _DualMethod("_instance_get_info", "_static_get_info")
def _instance_destroy(self) -> None:
"""Destroy this capsule."""
def _instance_destroy(self, wait: bool = False) -> None:
"""Destroy this capsule. If ``wait``, poll until stopped/missing."""
self._client.capsules.destroy(self._id)
if wait:
self._wait_for_status({Status.stopped, Status.missing}, _DESTROY_INTERVAL)
@classmethod
def _static_destroy(
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> None:
"""Destroy a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
client.capsules.destroy(capsule_id)
if wait:
_poll_until(
lambda: client.capsules.get(capsule_id),
{Status.stopped, Status.missing},
_DESTROY_INTERVAL,
)
def _instance_pause(self) -> CapsuleModel:
"""Pause this capsule."""
def _instance_pause(self, wait: bool = False) -> CapsuleModel:
"""Pause this capsule. If ``wait``, poll until ``paused``."""
self._info = self._client.capsules.pause(self._id)
if wait:
self._info = self._wait_for_status({Status.paused}, _PAUSE_INTERVAL)
return self._info
@classmethod
@ -255,16 +308,26 @@ class Capsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
"""Pause a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
return client.capsules.pause(capsule_id)
info = client.capsules.pause(capsule_id)
if wait:
info = _poll_until(
lambda: client.capsules.get(capsule_id),
{Status.paused},
_PAUSE_INTERVAL,
)
return info
def _instance_resume(self) -> CapsuleModel:
"""Resume this capsule."""
def _instance_resume(self, wait: bool = False) -> CapsuleModel:
"""Resume this capsule. If ``wait``, poll until ``running``."""
self._info = self._client.capsules.resume(self._id)
if wait:
self._info = self._wait_for_status({Status.running}, _RESUME_INTERVAL)
return self._info
@classmethod
@ -272,12 +335,20 @@ class Capsule:
cls,
capsule_id: str,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
) -> CapsuleModel:
"""Resume a capsule by ID."""
with WrennClient(api_key=api_key, base_url=base_url) as client:
return client.capsules.resume(capsule_id)
info = client.capsules.resume(capsule_id)
if wait:
info = _poll_until(
lambda: client.capsules.get(capsule_id),
{Status.running},
_RESUME_INTERVAL,
)
return info
def _instance_get_info(self) -> CapsuleModel:
"""Get current info for this capsule."""
@ -306,31 +377,30 @@ class Capsule:
"""
self._client.capsules.ping(self._id)
def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Block until the capsule status is ``running``.
def _wait_for_status(
self,
targets: set[Status],
interval: float,
timeout: float = _DEFAULT_WAIT_TIMEOUT,
) -> CapsuleModel:
info = _poll_until(
lambda: self._client.capsules.get(self._id),
targets,
interval,
timeout,
fail_on={Status.error, Status.stopped, Status.missing} - targets,
)
self._info = info
return info
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
def wait_ready(self, timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None:
"""Block until capsule status is ``running``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
TimeoutError: If capsule does not reach ``running`` within ``timeout``.
RuntimeError: If capsule enters error/stopped/missing while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = self._client.capsules.get(self._id)
if info.status == Status.running:
self._info = info
return
if info.status in (Status.error, Status.stopped):
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
if info.status == Status.paused:
info = self._client.capsules.resume(self._id)
time.sleep(interval)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
self._wait_for_status({Status.running}, _START_INTERVAL, timeout)
def is_running(self) -> bool:
"""Check whether the capsule is currently running.

View File

@ -111,7 +111,7 @@ class CapsulesResource:
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.post(f"/v1/capsules/{id}/pause", timeout=_LONG_TIMEOUT)
resp = self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
def resume(self, id: str) -> CapsuleModel:
@ -227,7 +227,7 @@ class AsyncCapsulesResource:
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.post(f"/v1/capsules/{id}/pause", timeout=_LONG_TIMEOUT)
resp = await self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
async def resume(self, id: str) -> CapsuleModel:

View File

@ -12,6 +12,11 @@ import httpx_ws
from wrenn.exceptions import handle_response
# Both signal a terminated WebSocket: ``WebSocketDisconnect`` is a clean close,
# ``WebSocketNetworkError`` an abrupt one. The Wrenn server closes exec/process
# streams abruptly, so iterators must treat either as end-of-stream.
_WS_CLOSED = (httpx_ws.WebSocketDisconnect, httpx_ws.WebSocketNetworkError)
@dataclass
class CommandResult:
@ -271,7 +276,7 @@ class Commands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
break
def stream(
@ -306,7 +311,7 @@ class Commands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
break
@ -462,7 +467,7 @@ class AsyncCommands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
pass
async def stream(
@ -497,5 +502,5 @@ class AsyncCommands:
yield event
if event.type in ("exit", "error"):
break
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
pass

View File

@ -150,6 +150,9 @@ def handle_response(resp: httpx.Response) -> dict | list:
if resp.status_code == 204:
return {}
if not resp.content:
return {}
return resp.json()

View File

@ -9,6 +9,36 @@ from wrenn.exceptions import WrennNotFoundError, _raise_for_status, handle_respo
from wrenn.models import FileEntry, ListDirResponse, MakeDirResponse
def _is_already_exists(resp: httpx.Response) -> bool:
"""Detect server's already-exists reply across status codes / code strings.
Server may return 409 with code "conflict"/"already_exists" or wrap
"already_exists" inside an "internal" 500 message.
"""
if resp.status_code < 400:
return False
try:
body = resp.json()
except Exception:
return False
err = body.get("error", {}) if isinstance(body, dict) else {}
code = err.get("code", "")
msg = err.get("message", "") or ""
return code in {"conflict", "already_exists"} or "already_exists" in msg
def _find_entry(list_fn, path: str) -> FileEntry | None:
parent = os.path.dirname(path)
name = os.path.basename(path)
try:
for entry in list_fn(parent, depth=1):
if entry.name == name:
return entry
except WrennNotFoundError:
return None
return None
class Files:
"""Sync filesystem interface. Accessed via ``capsule.files``."""
@ -118,17 +148,10 @@ class Files:
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
)
if resp.status_code == 409:
try:
body = resp.json()
if body.get("error", {}).get("code") == "conflict":
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in self.list(parent, depth=1):
if entry.name == name:
return entry
except Exception:
pass
if _is_already_exists(resp):
existing = _find_entry(self.list, path)
if existing is not None:
return existing
parsed = MakeDirResponse.model_validate(handle_response(resp))
if parsed.entry is None:
raise RuntimeError("mkdir response missing entry")
@ -315,17 +338,12 @@ class AsyncFiles:
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
)
if resp.status_code == 409:
try:
body = resp.json()
if body.get("error", {}).get("code") == "conflict":
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in await self.list(parent, depth=1):
if entry.name == name:
return entry
except Exception:
pass
if _is_already_exists(resp):
parent = os.path.dirname(path)
name = os.path.basename(path)
for entry in await self.list(parent, depth=1):
if entry.name == name:
return entry
parsed = MakeDirResponse.model_validate(handle_response(resp))
if parsed.entry is None:
raise RuntimeError("mkdir response missing entry")

View File

@ -1,6 +1,5 @@
from wrenn.models._generated import (
APIKeyResponse,
AuthResponse,
Capsule,
CreateAPIKeyRequest,
CreateCapsuleRequest,
@ -34,7 +33,6 @@ from wrenn.models._generated import (
__all__ = [
"APIKeyResponse",
"AuthResponse",
"CreateAPIKeyRequest",
"CreateHostRequest",
"CreateHostResponse",

View File

@ -1,10 +1,10 @@
# generated by datamodel-codegen:
# filename: openapi.yaml
# timestamp: 2026-05-04T20:57:00+00:00
# timestamp: 2026-05-19T08:54:50+00:00
from __future__ import annotations
from pydantic import AwareDatetime, BaseModel, EmailStr, Field
from typing import Annotated
from typing import Annotated, Any
from datetime import date as date_aliased
from enum import StrEnum
@ -27,14 +27,20 @@ class SignupResponse(BaseModel):
] = None
class AuthResponse(BaseModel):
token: Annotated[str | None, Field(description="JWT token (valid for 6 hours)")] = (
None
)
class SessionResponse(BaseModel):
"""
Returned by login, activate, and switch-team. The actual auth credential
is the wrenn_sid cookie set on the response. The body carries identity
data the SPA needs to bootstrap.
"""
user_id: str | None = None
team_id: str | None = None
email: str | None = None
name: str | None = None
role: str | None = None
is_admin: bool | None = None
class CreateAPIKeyRequest(BaseModel):
@ -62,10 +68,17 @@ class CreateCapsuleRequest(BaseModel):
template: str | None = "minimal"
vcpus: int | None = 1
memory_mb: int | None = 512
disk_size_mb: Annotated[
int | None,
Field(
description="Maximum size of the per-capsule copy-on-write disk in MB. Capped at 5 GB by default; the actual size is max(disk_size_mb, origin rootfs size).\n"
),
] = 5120
timeout_sec: Annotated[
int | None,
Field(
description="Auto-pause TTL in seconds. The capsule is automatically paused after this duration of inactivity (no exec or ping). 0 means no auto-pause.\n"
description="Auto-pause TTL in seconds. The capsule is automatically paused after this duration of inactivity (no exec or ping). 0 means no auto-pause. Positive values below 60 are silently clamped to 60 (the agent's startup envelope).\n",
ge=0,
),
] = 0
@ -133,7 +146,10 @@ class Status(StrEnum):
pending = "pending"
starting = "starting"
running = "running"
pausing = "pausing"
paused = "paused"
resuming = "resuming"
stopping = "stopping"
hibernated = "hibernated"
stopped = "stopped"
missing = "missing"
@ -153,6 +169,13 @@ class Capsule(BaseModel):
started_at: AwareDatetime | None = None
last_active_at: AwareDatetime | None = None
last_updated: AwareDatetime | None = None
metadata: Annotated[
dict[str, str] | None,
Field(
description="Free-form key/value labels attached at create-time. Also carries\nagent-side version info (kernel_version, vmm_version,\nagent_version, envd_version) when running.\n"
),
] = None
disk_size_mb: int | None = None
class CreateSnapshotRequest(BaseModel):
@ -177,6 +200,13 @@ class Template(BaseModel):
memory_mb: int | None = None
size_bytes: int | None = None
created_at: AwareDatetime | None = None
platform: Annotated[
bool | None,
Field(
description="True when the template is platform-managed (visible to all teams,\ne.g. the built-in `minimal` rootfs). False for team-owned\nsnapshot templates.\n"
),
] = None
metadata: dict[str, str] | None = None
class ExecRequest(BaseModel):
@ -399,7 +429,7 @@ class HostDeletePreview(BaseModel):
host: Host | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of capsulees that would be destroyed on force-delete."),
Field(description="IDs of capsules that would be destroyed on force-delete."),
] = None
@ -407,8 +437,7 @@ class Error(BaseModel):
code: Annotated[str | None, Field(examples=["host_has_sandboxes"])] = None
message: str | None = None
sandbox_ids: Annotated[
list[str] | None,
Field(description="IDs of active capsulees blocking deletion."),
list[str] | None, Field(description="IDs of active capsules blocking deletion.")
] = None
@ -476,7 +505,9 @@ class MetricPoint(BaseModel):
] = None
mem_bytes: Annotated[
int | None,
Field(description="Resident memory in bytes (VmRSS of Firecracker process)"),
Field(
description="Resident memory in bytes (VmRSS of Cloud Hypervisor process)"
),
] = None
disk_bytes: Annotated[
int | None, Field(description="Allocated disk bytes for the CoW sparse file")
@ -494,12 +525,12 @@ class Provider(StrEnum):
class Event(StrEnum):
capsule_created = "capsule.created"
capsule_running = "capsule.running"
capsule_paused = "capsule.paused"
capsule_destroyed = "capsule.destroyed"
template_snapshot_created = "template.snapshot.created"
template_snapshot_deleted = "template.snapshot.deleted"
capsule_create = "capsule.create"
capsule_pause = "capsule.pause"
capsule_resume = "capsule.resume"
capsule_destroy = "capsule.destroy"
template_snapshot_create = "template.snapshot.create"
template_snapshot_delete = "template.snapshot.delete"
host_up = "host.up"
host_down = "host.down"
@ -591,6 +622,106 @@ class Error1(BaseModel):
error: Error2 | None = None
class ActorType(StrEnum):
user = "user"
api_key = "api_key"
host = "host"
system = "system"
class Status2(StrEnum):
success = "success"
failure = "failure"
class AuditLogEntry(BaseModel):
id: str | None = None
actor_type: ActorType | None = None
actor_id: str | None = None
actor_name: str | None = None
resource_type: str | None = None
resource_id: str | None = None
action: str | None = None
scope: str | None = None
status: Status2 | None = None
metadata: dict[str, Any] | None = None
created_at: AwareDatetime | None = None
class Event2(StrEnum):
connected = "connected"
capsule_create = "capsule.create"
capsule_pause = "capsule.pause"
capsule_resume = "capsule.resume"
capsule_destroy = "capsule.destroy"
capsule_state_changed = "capsule.state.changed"
template_snapshot_create = "template.snapshot.create"
template_snapshot_delete = "template.snapshot.delete"
host_up = "host.up"
host_down = "host.down"
class Outcome(StrEnum):
"""
Present for action events (capsule.* except state.changed,
template.snapshot.*). Absent for host.up/down, capsule.state.changed,
and the connected sentinel.
"""
success = "success"
error = "error"
class Resource(BaseModel):
id: str | None = None
type: str | None = None
class Type4(StrEnum):
user = "user"
api_key = "api_key"
system = "system"
class Actor(BaseModel):
type: Type4 | None = None
id: str | None = None
name: str | None = None
class SSEEvent(BaseModel):
"""
Wire format of one SSE message body. The event name (`event:` line) is
the `kind` and the JSON below is the `data:` line.
"""
event: Event2 | None = None
outcome: Annotated[
Outcome | None,
Field(
description="Present for action events (capsule.* except state.changed,\ntemplate.snapshot.*). Absent for host.up/down, capsule.state.changed,\nand the connected sentinel.\n"
),
] = None
resource: Resource | None = None
actor: Actor | None = None
metadata: Annotated[
dict[str, str] | None,
Field(
description="Event-specific context. Examples: `reason` (ttl_expired,\nhost_failure, cleanup_after_create_error, orphaned),\n`host_ip`, `from`/`to` (for capsule.state.changed).\n"
),
] = None
error: Annotated[
str | None, Field(description="Failure reason; only set when outcome=error.")
] = None
sandbox: Annotated[
Capsule | None,
Field(description="Populated for capsule.* events; null if DB lookup failed."),
] = None
timestamp: AwareDatetime | None = None
class ListDirResponse(BaseModel):
entries: list[FileEntry] | None = None

View File

@ -9,6 +9,10 @@ from typing import Any
import httpx_ws
from pydantic import BaseModel
# A clean (``WebSocketDisconnect``) or abrupt (``WebSocketNetworkError``) close
# both mean the PTY stream has ended; iteration must stop on either.
_WS_CLOSED = (httpx_ws.WebSocketDisconnect, httpx_ws.WebSocketNetworkError)
class PtyEventType(StrEnum):
started = "started"
@ -109,6 +113,13 @@ class PtySession:
def _send_connect(self, tag: str) -> None:
self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
def _send_pong(self) -> None:
"""Reply to a server keepalive ``ping`` so the session stays open."""
try:
self._ws.send_text(json.dumps({"type": "pong"}))
except _WS_CLOSED:
pass
def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
@ -144,7 +155,7 @@ class PtySession:
raise StopIteration
try:
raw = self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
raise StopIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
@ -152,6 +163,8 @@ class PtySession:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.ping:
self._send_pong()
if event.type == PtyEventType.exit:
self._done = True
return event
@ -236,6 +249,13 @@ class AsyncPtySession:
async def _send_connect(self, tag: str) -> None:
await self._ws.send_text(json.dumps({"type": "connect", "tag": tag}))
async def _send_pong(self) -> None:
"""Reply to a server keepalive ``ping`` so the session stays open."""
try:
await self._ws.send_text(json.dumps({"type": "pong"}))
except _WS_CLOSED:
pass
async def write(self, data: bytes) -> None:
"""Send raw bytes to the PTY stdin.
@ -273,7 +293,7 @@ class AsyncPtySession:
raise StopAsyncIteration
try:
raw = await self._ws.receive_text()
except httpx_ws.WebSocketDisconnect:
except _WS_CLOSED:
raise StopAsyncIteration
event = _parse_pty_event(json.loads(raw))
if event.type == PtyEventType.started:
@ -281,6 +301,8 @@ class AsyncPtySession:
self._tag = event.tag
if event.pid is not None:
self._pid = event.pid
if event.type == PtyEventType.ping:
await self._send_pong()
if event.type == PtyEventType.exit:
self._done = True
return event

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import httpx
import respx
from wrenn.capsule import Capsule, _build_proxy_url
@ -30,9 +31,13 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_constructor_creates(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending", "template": "minimal"}
202, json={"id": "cl-1", "status": "starting", "template": "minimal"}
)
cap = Capsule(
template="minimal",
api_key="wrn_test1234567890abcdef12345678",
base_url=BASE,
)
cap = Capsule(template="minimal", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-1"
assert hasattr(cap, "commands")
assert hasattr(cap, "files")
@ -40,7 +45,7 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_create_classmethod(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-2", "status": "pending"}
202, json={"id": "cl-2", "status": "starting"}
)
cap = Capsule.create(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-2"
@ -48,9 +53,9 @@ class TestCapsuleCreate:
@respx.mock
def test_capsule_context_manager_kills(self):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-1", "status": "pending"}
202, json={"id": "cl-1", "status": "starting"}
)
kill_route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(204)
kill_route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
with Capsule(api_key="wrn_test1234567890abcdef12345678", base_url=BASE) as cap:
assert cap.capsule_id == "cl-1"
assert kill_route.called
@ -59,7 +64,7 @@ class TestCapsuleCreate:
def test_capsule_env_var(self, monkeypatch):
monkeypatch.setenv("WRENN_API_KEY", "wrn_from_env_key")
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "cl-3", "status": "pending"}
202, json={"id": "cl-3", "status": "starting"}
)
cap = Capsule(base_url=BASE)
assert cap.capsule_id == "cl-3"
@ -68,17 +73,21 @@ class TestCapsuleCreate:
class TestCapsuleStaticMethods:
@respx.mock
def test_static_destroy(self):
route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(204)
Capsule._static_destroy("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
Capsule._static_destroy(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert route.called
@respx.mock
def test_static_pause(self):
respx.post(f"{BASE}/v1/capsules/cl-1/pause").respond(
200, json={"id": "cl-1", "status": "paused"}
202, json={"id": "cl-1", "status": "pausing"}
)
info = Capsule._static_pause("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert info.status.value == "paused"
info = Capsule._static_pause(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert info.status.value == "pausing"
@respx.mock
def test_static_list(self):
@ -106,18 +115,24 @@ class TestCapsuleConnect:
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "running"}
)
cap = Capsule.connect("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert cap.capsule_id == "cl-1"
@respx.mock
def test_connect_paused_resumes(self):
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "paused"}
)
get_route = respx.get(f"{BASE}/v1/capsules/cl-1")
get_route.side_effect = [
httpx.Response(200, json={"id": "cl-1", "status": "paused"}),
httpx.Response(200, json={"id": "cl-1", "status": "running"}),
]
respx.post(f"{BASE}/v1/capsules/cl-1/resume").respond(
200, json={"id": "cl-1", "status": "running"}
202, json={"id": "cl-1", "status": "resuming"}
)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
cap = Capsule.connect("cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-1"

View File

@ -36,10 +36,10 @@ class TestCapsules:
@respx.mock
def test_create(self, client):
respx.post(f"{BASE}/v1/capsules").respond(
201,
202,
json={
"id": "sb-1",
"status": "pending",
"status": "starting",
"template": "base-python",
"vcpus": 2,
"memory_mb": 1024,
@ -48,12 +48,12 @@ class TestCapsules:
resp = client.capsules.create(template="base-python", vcpus=2, memory_mb=1024)
assert isinstance(resp, Capsule)
assert resp.id == "sb-1"
assert resp.status == Status.pending
assert resp.status == Status.starting
@respx.mock
def test_create_defaults(self, client):
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "sb-2", "status": "pending"}
202, json={"id": "sb-2", "status": "starting"}
)
resp = client.capsules.create()
assert resp.id == "sb-2"
@ -77,25 +77,25 @@ class TestCapsules:
@respx.mock
def test_destroy(self, client):
route = respx.delete(f"{BASE}/v1/capsules/sb-1").respond(204)
route = respx.delete(f"{BASE}/v1/capsules/sb-1").respond(202)
client.capsules.destroy("sb-1")
assert route.called
@respx.mock
def test_pause(self, client):
respx.post(f"{BASE}/v1/capsules/sb-1/pause").respond(
200, json={"id": "sb-1", "status": "paused"}
202, json={"id": "sb-1", "status": "pausing"}
)
resp = client.capsules.pause("sb-1")
assert resp.status == Status.paused
assert resp.status == Status.pausing
@respx.mock
def test_resume(self, client):
respx.post(f"{BASE}/v1/capsules/sb-1/resume").respond(
200, json={"id": "sb-1", "status": "running"}
202, json={"id": "sb-1", "status": "resuming"}
)
resp = client.capsules.resume("sb-1")
assert resp.status == Status.running
assert resp.status == Status.resuming
@respx.mock
def test_ping(self, client):
@ -238,7 +238,7 @@ class TestAsyncClient:
async def test_async_capsules_create(self, async_client):
async with async_client:
respx.post(f"{BASE}/v1/capsules").respond(
201, json={"id": "sb-1", "status": "pending"}
202, json={"id": "sb-1", "status": "starting"}
)
resp = await async_client.capsules.create(template="base-python")
assert resp.id == "sb-1"

490
tests/test_commands.py Normal file
View File

@ -0,0 +1,490 @@
"""Unit tests for wrenn.commands — Commands / AsyncCommands.
Covers payload construction (cwd, envs, tag, timeout), foreground/background
dispatch, base64 response decoding, stream-event parsing, and the
WebSocket-backed ``stream`` / ``connect`` iterators (with a fake WS).
"""
from __future__ import annotations
import base64
import json
from contextlib import asynccontextmanager, contextmanager
import httpx_ws
import pytest
import respx
from wrenn.client import AsyncWrennClient, WrennClient
from wrenn.commands import (
AsyncCommands,
CommandHandle,
CommandResult,
Commands,
ProcessInfo,
StreamErrorEvent,
StreamEvent,
StreamExitEvent,
StreamStartEvent,
StreamStderrEvent,
StreamStdoutEvent,
_decode_exec_response,
_parse_stream_event,
)
BASE = "https://app.wrenn.dev/api"
CAPSULE_ID = "cl-cmd123"
EXEC_URL = f"{BASE}/v1/capsules/{CAPSULE_ID}/exec"
PROC_URL = f"{BASE}/v1/capsules/{CAPSULE_ID}/processes"
def _make_commands() -> Commands:
client = WrennClient(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
return Commands(CAPSULE_ID, client.http)
def _make_async_commands() -> AsyncCommands:
client = AsyncWrennClient(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
return AsyncCommands(CAPSULE_ID, client.http)
# ── _decode_exec_response ─────────────────────────────────────────
class TestDecodeExecResponse:
def test_plain_text(self):
result = _decode_exec_response(
{"stdout": "hello\n", "stderr": "", "exit_code": 0, "duration_ms": 12}
)
assert isinstance(result, CommandResult)
assert result.stdout == "hello\n"
assert result.exit_code == 0
assert result.duration_ms == 12
def test_base64_stdout(self):
encoded = base64.b64encode(b"binary\xff\x00out").decode()
result = _decode_exec_response(
{"stdout": encoded, "encoding": "base64", "exit_code": 0}
)
assert "binary" in result.stdout
def test_base64_stderr(self):
out = base64.b64encode(b"ok").decode()
err = base64.b64encode(b"warning").decode()
result = _decode_exec_response(
{"stdout": out, "stderr": err, "encoding": "base64", "exit_code": 1}
)
assert result.stdout == "ok"
assert result.stderr == "warning"
assert result.exit_code == 1
def test_missing_fields_default(self):
result = _decode_exec_response({})
assert result.stdout == ""
assert result.stderr == ""
assert result.exit_code == -1
assert result.duration_ms is None
def test_null_stdout_coerced_to_empty(self):
result = _decode_exec_response({"stdout": None, "stderr": None})
assert result.stdout == ""
assert result.stderr == ""
# ── _parse_stream_event ───────────────────────────────────────────
class TestParseStreamEvent:
def test_start(self):
event = _parse_stream_event({"type": "start", "pid": 99})
assert isinstance(event, StreamStartEvent)
assert event.type == "start"
assert event.pid == 99
def test_stdout(self):
event = _parse_stream_event({"type": "stdout", "data": "out"})
assert isinstance(event, StreamStdoutEvent)
assert event.data == "out"
def test_stderr(self):
event = _parse_stream_event({"type": "stderr", "data": "err"})
assert isinstance(event, StreamStderrEvent)
assert event.data == "err"
def test_exit(self):
event = _parse_stream_event({"type": "exit", "exit_code": 7})
assert isinstance(event, StreamExitEvent)
assert event.exit_code == 7
def test_error(self):
event = _parse_stream_event({"type": "error", "data": "boom"})
assert isinstance(event, StreamErrorEvent)
assert event.data == "boom"
def test_unknown_type(self):
event = _parse_stream_event({"type": "weird"})
assert isinstance(event, StreamEvent)
assert event.type == "weird"
def test_missing_type(self):
event = _parse_stream_event({})
assert event.type == "unknown"
def test_exit_missing_code_defaults(self):
event = _parse_stream_event({"type": "exit"})
assert isinstance(event, StreamExitEvent)
assert event.exit_code == -1
# ── Commands.run — payload construction ───────────────────────────
class TestRunPayload:
@respx.mock
def test_foreground_basic_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"stdout": "hi", "exit_code": 0})
result = _make_commands().run("echo hi")
body = json.loads(route.calls[0].request.content)
assert body["cmd"] == "/bin/sh"
assert body["args"] == ["-c", "echo hi"]
assert body["background"] is False
assert body["timeout_sec"] == 30
assert result.stdout == "hi"
@respx.mock
def test_cwd_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("pwd", cwd="/tmp/work")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/tmp/work"
@respx.mock
def test_cwd_omitted_when_none(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("pwd")
body = json.loads(route.calls[0].request.content)
assert "cwd" not in body
@respx.mock
def test_envs_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("env", envs={"FOO": "bar", "BAZ": "qux"})
body = json.loads(route.calls[0].request.content)
assert body["envs"] == {"FOO": "bar", "BAZ": "qux"}
@respx.mock
def test_empty_envs_still_sent(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("env", envs={})
body = json.loads(route.calls[0].request.content)
assert body["envs"] == {}
@respx.mock
def test_tag_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", tag="my-tag")
body = json.loads(route.calls[0].request.content)
assert body["tag"] == "my-tag"
@respx.mock
def test_custom_timeout_in_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("sleep 1", timeout=120)
body = json.loads(route.calls[0].request.content)
assert body["timeout_sec"] == 120
@respx.mock
def test_timeout_none_omits_field(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", timeout=None)
body = json.loads(route.calls[0].request.content)
assert "timeout_sec" not in body
@respx.mock
def test_all_kwargs_combined(self):
route = respx.post(EXEC_URL).respond(200, json={"exit_code": 0})
_make_commands().run("echo x", timeout=60, envs={"A": "1"}, cwd="/srv", tag="t")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/srv"
assert body["envs"] == {"A": "1"}
assert body["tag"] == "t"
assert body["timeout_sec"] == 60
class TestRunBackground:
@respx.mock
def test_background_returns_handle(self):
respx.post(EXEC_URL).respond(200, json={"pid": 1234, "tag": "bg"})
handle = _make_commands().run("sleep 100", background=True)
assert isinstance(handle, CommandHandle)
assert handle.pid == 1234
assert handle.tag == "bg"
assert handle.capsule_id == CAPSULE_ID
@respx.mock
def test_background_omits_timeout_sec(self):
route = respx.post(EXEC_URL).respond(200, json={"pid": 1, "tag": "x"})
_make_commands().run("sleep 100", background=True, timeout=30)
body = json.loads(route.calls[0].request.content)
assert "timeout_sec" not in body
assert body["background"] is True
@respx.mock
def test_background_carries_cwd_and_envs(self):
route = respx.post(EXEC_URL).respond(200, json={"pid": 5, "tag": "t"})
_make_commands().run(
"server", background=True, cwd="/app", envs={"PORT": "80"}, tag="srv"
)
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/app"
assert body["envs"] == {"PORT": "80"}
assert body["tag"] == "srv"
@respx.mock
def test_background_missing_pid_defaults_zero(self):
respx.post(EXEC_URL).respond(200, json={"tag": "x"})
handle = _make_commands().run("x", background=True)
assert handle.pid == 0
class TestListAndKill:
@respx.mock
def test_list_parses_processes(self):
respx.get(PROC_URL).respond(
200,
json={
"processes": [
{
"pid": 10,
"tag": "web",
"cmd": "/bin/sh",
"args": ["-c", "serve"],
},
{"pid": 11},
]
},
)
procs = _make_commands().list()
assert len(procs) == 2
assert isinstance(procs[0], ProcessInfo)
assert procs[0].pid == 10
assert procs[0].tag == "web"
assert procs[0].args == ["-c", "serve"]
assert procs[1].pid == 11
assert procs[1].tag is None
@respx.mock
def test_list_empty(self):
respx.get(PROC_URL).respond(200, json={"processes": []})
assert _make_commands().list() == []
@respx.mock
def test_list_missing_key(self):
respx.get(PROC_URL).respond(200, json={})
assert _make_commands().list() == []
@respx.mock
def test_kill_sends_delete(self):
route = respx.delete(f"{PROC_URL}/42").respond(204)
_make_commands().kill(42)
assert route.called
@respx.mock
def test_kill_unknown_pid_raises(self):
from wrenn.exceptions import WrennNotFoundError
respx.delete(f"{PROC_URL}/999").respond(
404, json={"error": {"code": "not_found", "message": "no such process"}}
)
with pytest.raises(WrennNotFoundError):
_make_commands().kill(999)
# ── Fake WebSocket plumbing for stream / connect ──────────────────
class _FakeWS:
"""Synchronous fake WebSocket session."""
def __init__(self, messages: list) -> None:
self._messages = list(messages)
self.sent: list[str] = []
def send_text(self, text: str) -> None:
self.sent.append(text)
def receive_json(self) -> dict:
if not self._messages:
raise httpx_ws.WebSocketDisconnect()
msg = self._messages.pop(0)
if isinstance(msg, Exception):
raise msg
return msg
class _AsyncFakeWS:
"""Asynchronous fake WebSocket session."""
def __init__(self, messages: list) -> None:
self._messages = list(messages)
self.sent: list[str] = []
async def send_text(self, text: str) -> None:
self.sent.append(text)
async def receive_json(self) -> dict:
if not self._messages:
raise httpx_ws.WebSocketDisconnect()
msg = self._messages.pop(0)
if isinstance(msg, Exception):
raise msg
return msg
def _patch_sync_ws(monkeypatch, ws: _FakeWS) -> None:
@contextmanager
def _fake_connect(url, client):
yield ws
monkeypatch.setattr("wrenn.commands.httpx_ws.connect_ws", _fake_connect)
def _patch_async_ws(monkeypatch, ws: _AsyncFakeWS) -> None:
@asynccontextmanager
async def _fake_aconnect(url, client):
yield ws
monkeypatch.setattr("wrenn.commands.httpx_ws.aconnect_ws", _fake_aconnect)
# ── Commands.stream ───────────────────────────────────────────────
class TestStream:
def test_stream_sends_shell_wrapped_start(self, monkeypatch):
ws = _FakeWS([{"type": "exit", "exit_code": 0}])
_patch_sync_ws(monkeypatch, ws)
list(_make_commands().stream("echo hi"))
start = json.loads(ws.sent[0])
assert start == {"type": "start", "cmd": "/bin/sh", "args": ["-c", "echo hi"]}
def test_stream_with_explicit_args(self, monkeypatch):
ws = _FakeWS([{"type": "exit", "exit_code": 0}])
_patch_sync_ws(monkeypatch, ws)
list(_make_commands().stream("/usr/bin/env", args=["python", "-V"]))
start = json.loads(ws.sent[0])
assert start == {
"type": "start",
"cmd": "/usr/bin/env",
"args": ["python", "-V"],
}
def test_stream_yields_events_until_exit(self, monkeypatch):
ws = _FakeWS(
[
{"type": "start", "pid": 3},
{"type": "stdout", "data": "line1"},
{"type": "stderr", "data": "warn"},
{"type": "exit", "exit_code": 0},
{"type": "stdout", "data": "after-exit-ignored"},
]
)
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("echo line1"))
assert [e.type for e in events] == ["start", "stdout", "stderr", "exit"]
def test_stream_stops_on_error(self, monkeypatch):
ws = _FakeWS([{"type": "error", "data": "fatal"}])
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("bad"))
assert len(events) == 1
assert events[0].type == "error"
def test_stream_handles_disconnect(self, monkeypatch):
ws = _FakeWS([{"type": "stdout", "data": "x"}]) # then disconnect
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().stream("echo x"))
assert [e.type for e in events] == ["stdout"]
# ── Commands.connect ──────────────────────────────────────────────
class TestConnect:
def test_connect_yields_until_exit(self, monkeypatch):
ws = _FakeWS(
[
{"type": "stdout", "data": "tick"},
{"type": "exit", "exit_code": 0},
]
)
_patch_sync_ws(monkeypatch, ws)
events = list(_make_commands().connect(55))
assert [e.type for e in events] == ["stdout", "exit"]
def test_connect_handles_disconnect(self, monkeypatch):
ws = _FakeWS([]) # immediate disconnect
_patch_sync_ws(monkeypatch, ws)
assert list(_make_commands().connect(1)) == []
# ── AsyncCommands ─────────────────────────────────────────────────
class TestAsyncCommands:
@pytest.mark.asyncio
@respx.mock
async def test_async_run_payload(self):
route = respx.post(EXEC_URL).respond(200, json={"stdout": "hi", "exit_code": 0})
cmds = _make_async_commands()
result = await cmds.run("echo hi", cwd="/tmp", envs={"K": "v"}, tag="z")
body = json.loads(route.calls[0].request.content)
assert body["cwd"] == "/tmp"
assert body["envs"] == {"K": "v"}
assert body["tag"] == "z"
assert result.stdout == "hi"
@pytest.mark.asyncio
@respx.mock
async def test_async_run_background(self):
respx.post(EXEC_URL).respond(200, json={"pid": 7, "tag": "bg"})
handle = await _make_async_commands().run("sleep 1", background=True)
assert isinstance(handle, CommandHandle)
assert handle.pid == 7
@pytest.mark.asyncio
@respx.mock
async def test_async_list(self):
respx.get(PROC_URL).respond(200, json={"processes": [{"pid": 1, "tag": "a"}]})
procs = await _make_async_commands().list()
assert len(procs) == 1
assert procs[0].pid == 1
@pytest.mark.asyncio
@respx.mock
async def test_async_kill(self):
route = respx.delete(f"{PROC_URL}/3").respond(204)
await _make_async_commands().kill(3)
assert route.called
@pytest.mark.asyncio
async def test_async_stream(self, monkeypatch):
ws = _AsyncFakeWS(
[
{"type": "start", "pid": 1},
{"type": "stdout", "data": "out"},
{"type": "exit", "exit_code": 0},
]
)
_patch_async_ws(monkeypatch, ws)
events = [e async for e in _make_async_commands().stream("echo out")]
assert [e.type for e in events] == ["start", "stdout", "exit"]
start = json.loads(ws.sent[0])
assert start["cmd"] == "/bin/sh"
@pytest.mark.asyncio
async def test_async_connect(self, monkeypatch):
ws = _AsyncFakeWS([{"type": "exit", "exit_code": 0}])
_patch_async_ws(monkeypatch, ws)
events = [e async for e in _make_async_commands().connect(9)]
assert [e.type for e in events] == ["exit"]

View File

@ -341,6 +341,39 @@ class TestPtySessionIteration:
assert events == []
class TestPtySessionPong:
def test_ping_triggers_pong(self):
ws = MagicMock()
ws.receive_text.side_effect = [
json.dumps({"type": "ping"}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = PtySession(ws, "cl-abc")
events = list(session)
assert events[0].type == PtyEventType.ping
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} in sent
def test_no_pong_without_ping(self):
ws = MagicMock()
ws.receive_text.side_effect = [
json.dumps({"type": "output", "data": ""}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = PtySession(ws, "cl-abc")
list(session)
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} not in sent
def test_send_pong_swallows_closed_ws(self):
import httpx_ws
ws = MagicMock()
ws.send_text.side_effect = httpx_ws.WebSocketNetworkError()
session = PtySession(ws, "cl-abc")
session._send_pong() # must not raise
class TestPtySessionContextManager:
def test_exit_kills_and_closes(self):
ws = MagicMock()
@ -450,6 +483,28 @@ class TestAsyncPtySession:
assert sent["cmd"] == "/bin/zsh"
assert sent["cols"] == 100
@pytest.mark.asyncio
async def test_async_ping_triggers_pong(self):
ws = AsyncMock()
ws.receive_text.side_effect = [
json.dumps({"type": "ping"}),
json.dumps({"type": "exit", "exit_code": 0}),
]
session = AsyncPtySession(ws, "cl-abc")
events = [e async for e in session]
assert events[0].type == PtyEventType.ping
sent = [json.loads(c[0][0]) for c in ws.send_text.call_args_list]
assert {"type": "pong"} in sent
@pytest.mark.asyncio
async def test_async_send_pong_swallows_closed_ws(self):
import httpx_ws
ws = AsyncMock()
ws.send_text.side_effect = httpx_ws.WebSocketNetworkError()
session = AsyncPtySession(ws, "cl-abc")
await session._send_pong() # must not raise
@pytest.mark.asyncio
async def test_async_iteration(self):
ws = AsyncMock()

View File

@ -15,17 +15,6 @@ pytestmark = pytest.mark.integration
_env_loaded = False
def _wait_for_pid_dead(capsule: Capsule, pid: int, timeout: float = 5.0) -> bool:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = capsule.commands.run(f"ps -p {pid} -o stat= 2>/dev/null || true")
state = result.stdout.strip()
if not state or state.startswith("Z"):
return True
time.sleep(0.2)
return False
def _ensure_env() -> None:
global _env_loaded
if _env_loaded:
@ -57,7 +46,7 @@ class TestCapsuleLifecycle:
assert capsule_id
assert capsule.info is not None
finally:
capsule.destroy()
capsule.destroy(wait=True)
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
@ -76,7 +65,7 @@ class TestCapsuleLifecycle:
assert capsule.is_running()
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
assert info.status in (Status.stopping, Status.stopped, Status.missing)
def test_get_info(self):
capsule = Capsule(wait=True)
@ -91,11 +80,11 @@ class TestCapsuleLifecycle:
def test_pause_and_resume(self):
capsule = Capsule(wait=True)
try:
paused = capsule.pause()
paused = capsule.pause(wait=True)
assert paused.status == Status.paused
assert not capsule.is_running()
resumed = capsule.resume()
resumed = capsule.resume(wait=True)
assert resumed.status == Status.running
finally:
capsule.destroy()
@ -104,7 +93,7 @@ class TestCapsuleLifecycle:
capsule = Capsule(wait=True)
capsule_id = capsule.capsule_id
try:
Capsule.destroy(capsule_id)
Capsule.destroy(capsule_id, wait=True)
except Exception:
capsule.destroy()
raise
@ -229,7 +218,14 @@ class TestCommands:
def test_kill_process(self):
handle = self.capsule.commands.run("sleep 30", background=True)
self.capsule.commands.kill(handle.pid)
assert _wait_for_pid_dead(self.capsule, handle.pid)
# Registry prune runs asynchronously after the process end event,
# so poll rather than asserting on a zero-delay list().
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if handle.pid not in [p.pid for p in self.capsule.commands.list()]:
break
time.sleep(0.2)
assert handle.pid not in [p.pid for p in self.capsule.commands.list()]
def test_run_duration_ms(self):
result = self.capsule.commands.run("sleep 1")

View File

@ -0,0 +1,499 @@
"""Advanced integration tests against a live Wrenn server.
Skipped automatically when ``WRENN_API_KEY`` is not set (see conftest.py).
Covers working-directory / environment handling, long-running commands
(``apt-get``), interactive PTY sessions, streaming exec, and real ``git``
workflows including cloning ``github.com/wrennhq/wrenn``.
"""
from __future__ import annotations
import os
import time
import uuid
from pathlib import Path
import pytest
from wrenn import Capsule
from wrenn.commands import StreamExitEvent, StreamStartEvent
from wrenn.exceptions import WrennError
from wrenn.pty import PtyEventType
pytestmark = pytest.mark.integration
WRENN_REPO = "https://github.com/wrennhq/wrenn"
_env_loaded = False
def _ensure_env() -> None:
global _env_loaded
if _env_loaded:
return
_env_loaded = True
env_file = Path(__file__).resolve().parent.parent / ".env"
if not env_file.exists():
return
for line in env_file.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key, value = key.strip(), value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
# ══════════════════════════════════════════════════════════════════
# Working directory & environment
# ══════════════════════════════════════════════════════════════════
class TestCommandEnvironment:
"""cwd / envs handling for foreground commands."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_cwd_changes_working_directory(self):
result = self.capsule.commands.run("pwd", cwd="/tmp")
assert result.exit_code == 0
assert result.stdout.strip() == "/tmp"
def test_default_cwd_is_home(self):
result = self.capsule.commands.run("pwd")
assert result.stdout.strip() == "/root"
def test_cwd_resolves_relative_paths(self):
self.capsule.files.make_dir("/tmp/cwd_probe/sub")
result = self.capsule.commands.run("ls", cwd="/tmp/cwd_probe")
assert "sub" in result.stdout
def test_cwd_nonexistent_raises(self):
with pytest.raises(WrennError):
self.capsule.commands.run("pwd", cwd="/no/such/dir/xyz")
def test_cwd_does_not_persist_between_calls(self):
# Each run is a fresh process — `cd` in one does not affect the next.
self.capsule.commands.run("cd /tmp")
result = self.capsule.commands.run("pwd")
assert result.stdout.strip() == "/root"
def test_single_env_var(self):
result = self.capsule.commands.run("echo $GREETING", envs={"GREETING": "hi"})
assert result.stdout.strip() == "hi"
def test_multiple_env_vars(self):
result = self.capsule.commands.run(
"echo $A-$B-$C", envs={"A": "1", "B": "2", "C": "3"}
)
assert result.stdout.strip() == "1-2-3"
def test_env_vars_do_not_leak_between_calls(self):
self.capsule.commands.run("echo $SECRET", envs={"SECRET": "leaky"})
result = self.capsule.commands.run("echo [$SECRET]")
assert result.stdout.strip() == "[]"
def test_env_var_with_special_chars(self):
value = "a b&c|d;e"
result = self.capsule.commands.run('printf "%s" "$X"', envs={"X": value})
assert result.stdout == value
def test_base_environment_present(self):
result = self.capsule.commands.run("echo $HOME; echo $PATH")
lines = result.stdout.strip().splitlines()
assert lines[0] == "/root"
assert "/usr/bin" in lines[1]
# ══════════════════════════════════════════════════════════════════
# Long-running commands
# ══════════════════════════════════════════════════════════════════
class TestLongRunningCommands:
"""apt-get installs and other slow commands."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_apt_get_install(self):
result = self.capsule.commands.run(
"apt-get update -qq && apt-get install -y -qq cowsay", timeout=300
)
assert result.exit_code == 0
def test_apt_installed_binary_runs(self):
# Depends on test_apt_get_install having installed the package.
self.capsule.commands.run("apt-get install -y -qq cowsay", timeout=300)
result = self.capsule.commands.run("/usr/games/cowsay moo")
assert result.exit_code == 0
assert "moo" in result.stdout
def test_foreground_timeout_raises(self):
# A command exceeding its timeout surfaces as a server-side error.
with pytest.raises(WrennError):
self.capsule.commands.run("sleep 20", timeout=2)
def test_long_sleep_in_background_returns_immediately(self):
start = time.monotonic()
handle = self.capsule.commands.run(
"sleep 60", background=True, tag="long-sleep"
)
elapsed = time.monotonic() - start
assert elapsed < 10
assert handle.pid > 0
self.capsule.commands.kill(handle.pid)
def test_slow_command_within_timeout(self):
result = self.capsule.commands.run("sleep 3 && echo done", timeout=30)
assert result.exit_code == 0
assert result.stdout.strip() == "done"
# ══════════════════════════════════════════════════════════════════
# PTY sessions
# ══════════════════════════════════════════════════════════════════
def _drain_pty(term, *, max_events: int = 200) -> tuple[bytes, int | None]:
"""Collect PTY output until exit; return (output, exit_code)."""
output = b""
exit_code: int | None = None
for i, event in enumerate(term):
if event.type == PtyEventType.output and event.data:
output += event.data
elif event.type == PtyEventType.exit:
exit_code = event.exit_code
break
elif event.type == PtyEventType.error and event.fatal:
break
if i >= max_events:
break
return output, exit_code
class TestPty:
"""Interactive PTY behaviour."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_pty_runs_command_and_exits(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"echo pty-result-$((6*7))\n")
term.write(b"exit\n")
output, exit_code = _drain_pty(term)
assert b"pty-result-42" in output
assert exit_code is not None
def test_pty_started_event_sets_tag_and_pid(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"exit\n")
_drain_pty(term)
assert term.tag is not None
assert term.tag.startswith("pty-")
assert term.pid is not None and term.pid > 0
def test_pty_respects_cwd(self):
with self.capsule.pty(cmd="/bin/bash", cwd="/tmp") as term:
term.write(b"pwd\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"/tmp" in output
def test_pty_respects_envs(self):
with self.capsule.pty(cmd="/bin/bash", envs={"PTY_VAR": "xyzzy"}) as term:
term.write(b"echo marker-$PTY_VAR\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"marker-xyzzy" in output
def test_pty_resize(self):
with self.capsule.pty(cmd="/bin/bash", cols=80, rows=24) as term:
term.resize(120, 40)
term.write(b"echo resized\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"resized" in output
def test_pty_explicit_command(self):
with self.capsule.pty(cmd="/bin/echo", args=["hello-from-argv"]) as term:
output, exit_code = _drain_pty(term)
assert b"hello-from-argv" in output
def test_pty_exit_code_nonzero(self):
with self.capsule.pty(cmd="/bin/bash") as term:
term.write(b"exit 3\n")
_, exit_code = _drain_pty(term)
assert exit_code == 3
def test_pty_survives_idle_ping_cycle(self):
# The server emits a keepalive `ping` (~every 30s); the SDK must
# auto-reply `pong` and the session must stay usable afterwards.
with self.capsule.pty(cmd="/bin/bash") as term:
saw_ping = False
for event in term:
if event.type == PtyEventType.ping:
saw_ping = True
break
if event.type == PtyEventType.exit:
break
if event.type == PtyEventType.error and event.fatal:
break
assert saw_ping, "no keepalive ping received"
term.write(b"echo still-alive\n")
term.write(b"exit\n")
output, _ = _drain_pty(term)
assert b"still-alive" in output
# ══════════════════════════════════════════════════════════════════
# Streaming exec
# ══════════════════════════════════════════════════════════════════
class TestStreamingExec:
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_stream_emits_start_and_exit(self):
events = list(self.capsule.commands.stream("echo streamed"))
types = [e.type for e in events]
assert "exit" in types
starts = [e for e in events if isinstance(e, StreamStartEvent)]
exits = [e for e in events if isinstance(e, StreamExitEvent)]
assert exits and exits[0].exit_code == 0
if starts:
assert starts[0].pid > 0
def test_stream_captures_stdout(self):
events = list(self.capsule.commands.stream("for i in 1 2 3; do echo n$i; done"))
out = "".join(
e.data for e in events if e.type == "stdout" and getattr(e, "data", None)
)
assert "n1" in out and "n3" in out
def test_stream_nonzero_exit(self):
events = list(self.capsule.commands.stream("exit 5"))
exits = [e for e in events if isinstance(e, StreamExitEvent)]
assert exits and exits[0].exit_code == 5
# ══════════════════════════════════════════════════════════════════
# Process connect — attach to a background process over WebSocket
# ══════════════════════════════════════════════════════════════════
class TestProcessConnect:
"""commands.connect — must survive the server's abrupt WebSocket close."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_connect_streams_running_process(self):
handle = self.capsule.commands.run(
"for i in $(seq 1 5); do echo tick$i; sleep 1; done",
background=True,
tag="connect-run",
)
time.sleep(0.3)
events = list(self.capsule.commands.connect(handle.pid))
types = [e.type for e in events]
assert "exit" in types
# connect streams output from the attach point onward, so early
# ticks may be missed — assert it captured the live tail.
out = "".join(
e.data for e in events if e.type == "stdout" and getattr(e, "data", None)
)
assert "tick" in out
def test_connect_to_finished_process_does_not_raise(self):
handle = self.capsule.commands.run("echo quick", background=True)
time.sleep(2)
# Process already exited — server closes the WebSocket abruptly;
# the iterator must terminate cleanly rather than raise.
events = list(self.capsule.commands.connect(handle.pid))
assert isinstance(events, list)
# ══════════════════════════════════════════════════════════════════
# Git — real workflows including cloning wrennhq/wrenn
# ══════════════════════════════════════════════════════════════════
class TestGitClone:
"""Clone github.com/wrennhq/wrenn and operate on it."""
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
cls.capsule.git.clone(WRENN_REPO, "/root/wrenn", depth=1, timeout=300)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_clone_created_repo(self):
assert self.capsule.files.exists("/root/wrenn/.git")
def test_clone_checked_out_files(self):
entries = self.capsule.files.list("/root/wrenn")
names = [e.name for e in entries]
assert "README.md" in names
def test_status_of_clone_is_clean(self):
status = self.capsule.git.status(cwd="/root/wrenn")
assert status.branch == "main"
assert status.is_clean
def test_branches_lists_main(self):
branches = self.capsule.git.branches(cwd="/root/wrenn")
names = [b.name for b in branches]
assert "main" in names
assert any(b.is_current for b in branches)
def test_remote_get_origin(self):
url = self.capsule.git.remote_get("origin", cwd="/root/wrenn")
assert url is not None
assert "wrennhq/wrenn" in url
def test_git_log_has_commit(self):
result = self.capsule.commands.run("git log --oneline -1", cwd="/root/wrenn")
assert result.exit_code == 0
assert result.stdout.strip()
def test_modify_add_commit(self):
marker = uuid.uuid4().hex
self.capsule.git.configure_user(
"CI Bot", "ci@example.com", cwd="/root/wrenn", scope="local"
)
self.capsule.files.write(f"/root/wrenn/sdk_probe_{marker}.txt", marker)
self.capsule.git.add([f"sdk_probe_{marker}.txt"], cwd="/root/wrenn")
staged = self.capsule.git.status(cwd="/root/wrenn")
assert staged.has_staged
result = self.capsule.git.commit("probe commit", cwd="/root/wrenn")
assert result.exit_code == 0
after = self.capsule.git.status(cwd="/root/wrenn")
assert after.is_clean
assert after.ahead >= 1
def test_create_and_checkout_branch_in_clone(self):
self.capsule.git.create_branch("sdk-feature", cwd="/root/wrenn")
branches = self.capsule.git.branches(cwd="/root/wrenn")
current = [b for b in branches if b.is_current]
assert current and current[0].name == "sdk-feature"
self.capsule.git.checkout_branch("main", cwd="/root/wrenn")
def test_diff_via_commands(self):
self.capsule.files.write("/root/wrenn/README.md", "overwritten\n")
try:
result = self.capsule.commands.run("git diff --stat", cwd="/root/wrenn")
assert "README.md" in result.stdout
finally:
self.capsule.git.restore(["README.md"], worktree=True, cwd="/root/wrenn")
class TestGitErrors:
capsule: Capsule
@classmethod
def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
@classmethod
def teardown_class(cls):
try:
cls.capsule.destroy()
except Exception:
pass
def test_clone_nonexistent_repo_raises(self):
from wrenn._git import GitError
with pytest.raises(GitError):
self.capsule.git.clone(
"https://github.com/wrennhq/this-repo-does-not-exist-xyz",
"/root/missing",
timeout=120,
)
def test_status_outside_repo_raises(self):
from wrenn._git import GitError
with pytest.raises(GitError):
self.capsule.git.status(cwd="/tmp")
def test_clone_with_branch(self):
self.capsule.git.clone(
WRENN_REPO, "/root/wrenn-main", branch="main", depth=1, timeout=300
)
status = self.capsule.git.status(cwd="/root/wrenn-main")
assert status.branch == "main"

4
uv.lock generated
View File

@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
resolution-markers = [
"python_full_version >= '3.14'",
@ -1121,7 +1121,7 @@ wheels = [
[[package]]
name = "wrenn"
version = "0.1.1"
version = "0.1.4"
source = { editable = "." }
dependencies = [
{ name = "email-validator" },