docs: add Google-style docstrings to all public SDK methods
Some checks failed
ci/woodpecker/push/check Pipeline failed

This commit is contained in:
2026-04-17 04:29:34 +06:00
parent 42bcc792d6
commit 7b35ffb60c
11 changed files with 829 additions and 49 deletions

2
.gitignore vendored
View File

@ -175,5 +175,3 @@ cython_debug/
.pypirc
CODE_EXECUTION.md
docs/

View File

@ -12,7 +12,6 @@ dependencies = [
"httpx>=0.28.1",
"httpx-ws>=0.9.0",
"pydantic>=2.12.5",
"python-dotenv>=1.2.2",
]
[build-system]

View File

@ -47,10 +47,21 @@ class AsyncCapsule:
@property
def capsule_id(self) -> str:
"""The capsule's unique identifier.
Returns:
str: Capsule ID assigned by the Wrenn API.
"""
return self._id
@property
def info(self) -> CapsuleModel | None:
"""Cached capsule metadata from the last API call.
Returns:
CapsuleModel | None: The last-fetched capsule model, or ``None``
if the capsule was connected without an initial fetch.
"""
return self._info
# ── Factory classmethods ────────────────────────────────────
@ -67,7 +78,21 @@ class AsyncCapsule:
api_key: str | None = None,
base_url: str | None = None,
) -> AsyncCapsule:
"""Create a new capsule."""
"""Create a new capsule.
Args:
template (str | None): Template name to boot from.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout (int | None): Inactivity TTL in seconds before auto-pause.
wait (bool): Await until the capsule reaches ``running`` status.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
AsyncCapsule: A new capsule instance.
"""
client = AsyncWrennClient(api_key=api_key, base_url=base_url)
info = await client.capsules.create(
template=template,
@ -92,7 +117,20 @@ class AsyncCapsule:
api_key: str | None = None,
base_url: str | None = None,
) -> AsyncCapsule:
"""Connect to an existing capsule. Resumes it if paused."""
"""Connect to an existing capsule, resuming it if paused.
Args:
capsule_id (str): ID of the capsule to connect to.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
AsyncCapsule: A capsule instance bound to the existing capsule.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
client = AsyncWrennClient(api_key=api_key, base_url=base_url)
info = await client.capsules.get(capsule_id)
@ -174,9 +212,26 @@ class AsyncCapsule:
# ── Instance-only methods ───────────────────────────────────
async def ping(self) -> None:
"""Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the
inactivity TTL is set.
"""
await self._client.capsules.ping(self._id)
async def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Await until the capsule status is ``running``.
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = await self._client.capsules.get(self._id)
@ -193,6 +248,13 @@ class AsyncCapsule:
)
async def is_running(self) -> bool:
"""Check whether the capsule is currently running.
Makes a live API call to fetch current status.
Returns:
bool: ``True`` if the capsule status is ``running``.
"""
info = await self._instance_get_info()
return info.status == Status.running
@ -205,6 +267,16 @@ class AsyncCapsule:
api_key: str | None = None,
base_url: str | None = None,
) -> list[CapsuleModel]:
"""List all capsules belonging to the team.
Args:
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
list[CapsuleModel]: All capsules for the authenticated team.
"""
async with AsyncWrennClient(api_key=api_key, base_url=base_url) as client:
return await client.capsules.list()
@ -220,6 +292,30 @@ class AsyncCapsule:
envs: dict[str, str] | None = None,
cwd: str | None = None,
) -> AsyncIterator[AsyncPtySession]:
"""Open an async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over
:class:`PtyEvent` objects::
async with capsule.pty() as term:
await term.write(b"echo hello\\n")
async for event in term:
if event.type == "output":
print(event.data.decode())
Args:
cmd (str): Command to run inside the PTY. Defaults to
``"/bin/bash"``.
args (list[str] | None): Additional arguments for ``cmd``.
cols (int): Initial terminal column count. Defaults to ``80``.
rows (int): Initial terminal row count. Defaults to ``24``.
envs (dict[str, str] | None): Additional environment variables
to inject into the process.
cwd (str | None): Working directory for the process.
Yields:
AsyncPtySession: An interactive async PTY session.
"""
async with httpx_ws.aconnect_ws(
f"/v1/capsules/{self._id}/pty", client=self._client.http
) as ws:
@ -231,6 +327,14 @@ class AsyncCapsule:
@asynccontextmanager
async def pty_connect(self, tag: str) -> AsyncIterator[AsyncPtySession]:
"""Reconnect to an existing PTY session by tag.
Args:
tag (str): Session tag returned in the ``started`` PTY event.
Yields:
AsyncPtySession: The reconnected async PTY session.
"""
async with httpx_ws.aconnect_ws(
f"/v1/capsules/{self._id}/pty", client=self._client.http
) as ws:
@ -241,6 +345,15 @@ class AsyncCapsule:
# ── Proxy helpers ───────────────────────────────────────────
def get_url(self, port: int) -> str:
"""Get the proxy URL for a port exposed inside this capsule.
Args:
port (int): Port number to proxy.
Returns:
str: A ``wss://`` (or ``ws://``) URL that proxies to the given
port inside the capsule.
"""
return _build_proxy_url(self._client._base_url, self._id, port)
# ── Snapshots ───────────────────────────────────────────────
@ -248,6 +361,17 @@ class AsyncCapsule:
async def create_snapshot(
self, name: str | None = None, overwrite: bool = False
) -> Template:
"""Create a snapshot template from this capsule's current state.
Args:
name (str | None): Name for the snapshot template. Auto-generated
if not provided.
overwrite (bool): If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
Returns:
Template: The created snapshot template.
"""
return await self._client.snapshots.create(
capsule_id=self._id, name=name, overwrite=overwrite
)

View File

@ -74,6 +74,24 @@ class Capsule:
_client: WrennClient | None = None,
_info: CapsuleModel | None = None,
) -> None:
"""Create and start a new capsule.
Args:
template (str | None): Template name to boot from. Defaults to
the server-side default (``"minimal"``).
vcpus (int | None): Number of virtual CPUs. Defaults to the
server-side default.
memory_mb (int | None): Memory in MiB. Defaults to the
server-side default.
timeout (int | None): Inactivity TTL in seconds before the capsule
is auto-paused. ``0`` disables auto-pause.
wait (bool): If ``True``, block until the capsule status is
``running`` before returning.
api_key (str | None): Wrenn API key (``wrn_...``). Falls back to
the ``WRENN_API_KEY`` environment variable.
base_url (str | None): Wrenn API base URL. Falls back to
``WRENN_BASE_URL`` or the default production endpoint.
"""
if _capsule_id is not None:
# Internal construction path (from create/connect classmethods)
assert _client is not None
@ -101,10 +119,21 @@ class Capsule:
@property
def capsule_id(self) -> str:
"""The capsule's unique identifier.
Returns:
str: Capsule ID assigned by the Wrenn API.
"""
return self._id
@property
def info(self) -> CapsuleModel | None:
"""Cached capsule metadata from the last API call.
Returns:
CapsuleModel | None: The last-fetched capsule model, or ``None``
if the capsule was connected without an initial fetch.
"""
return self._info
# ── Factory classmethods ────────────────────────────────────
@ -121,7 +150,23 @@ class Capsule:
api_key: str | None = None,
base_url: str | None = None,
) -> Capsule:
"""Create a new capsule. Alias for ``Capsule(...)``."""
"""Create a new capsule.
Equivalent to calling ``Capsule(...)`` directly.
Args:
template (str | None): Template name to boot from.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout (int | None): Inactivity TTL in seconds before auto-pause.
wait (bool): Block until the capsule reaches ``running`` status.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
Capsule: A new capsule instance.
"""
return cls(
template=template,
vcpus=vcpus,
@ -140,7 +185,20 @@ class Capsule:
api_key: str | None = None,
base_url: str | None = None,
) -> Capsule:
"""Connect to an existing capsule. Resumes it if paused."""
"""Connect to an existing capsule, resuming it if paused.
Args:
capsule_id (str): ID of the capsule to connect to.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
Capsule: A capsule instance bound to the existing capsule.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
client = WrennClient(api_key=api_key, base_url=base_url)
info = client.capsules.get(capsule_id)
@ -230,11 +288,26 @@ class Capsule:
# ── Instance-only methods ───────────────────────────────────
def ping(self) -> None:
"""Reset the capsule inactivity timer."""
"""Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the
inactivity TTL is set.
"""
self._client.capsules.ping(self._id)
def wait_ready(self, timeout: float = 30, interval: float = 0.5) -> None:
"""Block until the capsule status is ``running``."""
"""Block until the capsule status is ``running``.
Args:
timeout (float): Maximum seconds to wait. Defaults to ``30``.
interval (float): Polling interval in seconds. Defaults to ``0.5``.
Raises:
TimeoutError: If the capsule does not reach ``running`` state
within ``timeout`` seconds.
RuntimeError: If the capsule enters an error, stopped, or paused
state while waiting.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
info = self._client.capsules.get(self._id)
@ -251,6 +324,13 @@ class Capsule:
)
def is_running(self) -> bool:
"""Check whether the capsule is currently running.
Makes a live API call to fetch current status.
Returns:
bool: ``True`` if the capsule status is ``running``.
"""
info = self._instance_get_info()
return info.status == Status.running
@ -263,7 +343,16 @@ class Capsule:
api_key: str | None = None,
base_url: str | None = None,
) -> list[CapsuleModel]:
"""List all capsules for the team."""
"""List all capsules belonging to the team.
Args:
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
list[CapsuleModel]: All capsules for the authenticated team.
"""
with WrennClient(api_key=api_key, base_url=base_url) as client:
return client.capsules.list()
@ -279,7 +368,29 @@ class Capsule:
envs: dict[str, str] | None = None,
cwd: str | None = None,
) -> Iterator[PtySession]:
"""Open an interactive PTY session."""
"""Open an interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over :class:`PtyEvent` objects::
with capsule.pty() as term:
term.write(b"echo hello\\n")
for event in term:
if event.type == "output":
print(event.data.decode())
Args:
cmd (str): Command to run inside the PTY. Defaults to
``"/bin/bash"``.
args (list[str] | None): Additional arguments for ``cmd``.
cols (int): Initial terminal column count. Defaults to ``80``.
rows (int): Initial terminal row count. Defaults to ``24``.
envs (dict[str, str] | None): Additional environment variables to
inject into the process.
cwd (str | None): Working directory for the process.
Yields:
PtySession: An interactive PTY session.
"""
with httpx_ws.connect_ws(
f"/v1/capsules/{self._id}/pty", client=self._client.http
) as ws:
@ -291,7 +402,14 @@ class Capsule:
@contextmanager
def pty_connect(self, tag: str) -> Iterator[PtySession]:
"""Reconnect to an existing PTY session by tag."""
"""Reconnect to an existing PTY session by tag.
Args:
tag (str): Session tag returned in the ``started`` PTY event.
Yields:
PtySession: The reconnected PTY session.
"""
with httpx_ws.connect_ws(
f"/v1/capsules/{self._id}/pty", client=self._client.http
) as ws:
@ -302,7 +420,15 @@ class Capsule:
# ── Proxy helpers ───────────────────────────────────────────
def get_url(self, port: int) -> str:
"""Get the proxy URL for a port inside this capsule."""
"""Get the proxy URL for a port exposed inside this capsule.
Args:
port (int): Port number to proxy.
Returns:
str: A ``wss://`` (or ``ws://``) URL that proxies to the given
port inside the capsule.
"""
return _build_proxy_url(self._client._base_url, self._id, port)
# ── Snapshots ───────────────────────────────────────────────
@ -310,7 +436,17 @@ class Capsule:
def create_snapshot(
self, name: str | None = None, overwrite: bool = False
) -> Template:
"""Create a snapshot template from this capsule."""
"""Create a snapshot template from this capsule's current state.
Args:
name (str | None): Name for the snapshot template. Auto-generated
if not provided.
overwrite (bool): If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
Returns:
Template: The created snapshot template.
"""
return self._client.snapshots.create(
capsule_id=self._id, name=name, overwrite=overwrite
)

View File

@ -36,6 +36,18 @@ class CapsulesResource:
memory_mb: int | None = None,
timeout_sec: int | None = None,
) -> CapsuleModel:
"""Create a new capsule.
Args:
template (str | None): Template name to boot from.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout_sec (int | None): Inactivity TTL in seconds before
auto-pause. ``0`` disables auto-pause.
Returns:
CapsuleModel: The newly created capsule.
"""
payload: dict = {}
if template is not None:
payload["template"] = template
@ -49,26 +61,80 @@ class CapsulesResource:
return CapsuleModel.model_validate(handle_response(resp))
def list(self) -> list[CapsuleModel]:
"""List all capsules for the authenticated team.
Returns:
list[CapsuleModel]: All capsules belonging to the team.
"""
resp = self._http.get("/v1/capsules")
return [CapsuleModel.model_validate(item) for item in handle_response(resp)]
def get(self, id: str) -> CapsuleModel:
"""Get a capsule by ID.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Current state of the capsule.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.get(f"/v1/capsules/{id}")
return CapsuleModel.model_validate(handle_response(resp))
def destroy(self, id: str) -> None:
"""Destroy a capsule permanently.
Args:
id (str): Capsule ID.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.delete(f"/v1/capsules/{id}")
handle_response(resp)
def pause(self, id: str) -> CapsuleModel:
"""Pause a running capsule.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Updated capsule state.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
def resume(self, id: str) -> CapsuleModel:
"""Resume a paused capsule.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Updated capsule state.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.post(f"/v1/capsules/{id}/resume")
return CapsuleModel.model_validate(handle_response(resp))
def ping(self, id: str) -> None:
"""Reset the inactivity timer for a capsule.
Args:
id (str): Capsule ID.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = self._http.post(f"/v1/capsules/{id}/ping")
handle_response(resp)
@ -86,6 +152,18 @@ class AsyncCapsulesResource:
memory_mb: int | None = None,
timeout_sec: int | None = None,
) -> CapsuleModel:
"""Create a new capsule.
Args:
template (str | None): Template name to boot from.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout_sec (int | None): Inactivity TTL in seconds before
auto-pause. ``0`` disables auto-pause.
Returns:
CapsuleModel: The newly created capsule.
"""
payload: dict = {}
if template is not None:
payload["template"] = template
@ -99,26 +177,80 @@ class AsyncCapsulesResource:
return CapsuleModel.model_validate(handle_response(resp))
async def list(self) -> list[CapsuleModel]:
"""List all capsules for the authenticated team.
Returns:
list[CapsuleModel]: All capsules belonging to the team.
"""
resp = await self._http.get("/v1/capsules")
return [CapsuleModel.model_validate(item) for item in handle_response(resp)]
async def get(self, id: str) -> CapsuleModel:
"""Get a capsule by ID.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Current state of the capsule.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.get(f"/v1/capsules/{id}")
return CapsuleModel.model_validate(handle_response(resp))
async def destroy(self, id: str) -> None:
"""Destroy a capsule permanently.
Args:
id (str): Capsule ID.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.delete(f"/v1/capsules/{id}")
handle_response(resp)
async def pause(self, id: str) -> CapsuleModel:
"""Pause a running capsule.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Updated capsule state.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.post(f"/v1/capsules/{id}/pause")
return CapsuleModel.model_validate(handle_response(resp))
async def resume(self, id: str) -> CapsuleModel:
"""Resume a paused capsule.
Args:
id (str): Capsule ID.
Returns:
CapsuleModel: Updated capsule state.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.post(f"/v1/capsules/{id}/resume")
return CapsuleModel.model_validate(handle_response(resp))
async def ping(self, id: str) -> None:
"""Reset the inactivity timer for a capsule.
Args:
id (str): Capsule ID.
Raises:
WrennNotFoundError: If no capsule with the given ID exists.
"""
resp = await self._http.post(f"/v1/capsules/{id}/ping")
handle_response(resp)
@ -135,6 +267,18 @@ class SnapshotsResource:
name: str | None = None,
overwrite: bool = False,
) -> Template:
"""Create a snapshot template from a running capsule.
Args:
capsule_id (str): ID of the capsule to snapshot.
name (str | None): Name for the snapshot template. Auto-generated
if not provided.
overwrite (bool): If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
Returns:
Template: The created snapshot template.
"""
payload: dict = {"sandbox_id": capsule_id}
if name is not None:
payload["name"] = name
@ -145,6 +289,15 @@ class SnapshotsResource:
return Template.model_validate(handle_response(resp))
def list(self, type: str | None = None) -> list[Template]:
"""List snapshot templates.
Args:
type (str | None): Filter by template type. Returns all templates
if not provided.
Returns:
list[Template]: Matching snapshot templates.
"""
params: dict = {}
if type is not None:
params["type"] = type
@ -152,6 +305,14 @@ class SnapshotsResource:
return [Template.model_validate(item) for item in handle_response(resp)]
def delete(self, name: str) -> None:
"""Delete a snapshot template by name.
Args:
name (str): Template name to delete.
Raises:
WrennNotFoundError: If no template with the given name exists.
"""
resp = self._http.delete(f"/v1/snapshots/{name}")
handle_response(resp)
@ -168,6 +329,18 @@ class AsyncSnapshotsResource:
name: str | None = None,
overwrite: bool = False,
) -> Template:
"""Create a snapshot template from a running capsule.
Args:
capsule_id (str): ID of the capsule to snapshot.
name (str | None): Name for the snapshot template. Auto-generated
if not provided.
overwrite (bool): If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
Returns:
Template: The created snapshot template.
"""
payload: dict = {"sandbox_id": capsule_id}
if name is not None:
payload["name"] = name
@ -178,6 +351,15 @@ class AsyncSnapshotsResource:
return Template.model_validate(handle_response(resp))
async def list(self, type: str | None = None) -> list[Template]:
"""List snapshot templates.
Args:
type (str | None): Filter by template type. Returns all templates
if not provided.
Returns:
list[Template]: Matching snapshot templates.
"""
params: dict = {}
if type is not None:
params["type"] = type
@ -185,6 +367,14 @@ class AsyncSnapshotsResource:
return [Template.model_validate(item) for item in handle_response(resp)]
async def delete(self, name: str) -> None:
"""Delete a snapshot template by name.
Args:
name (str): Template name to delete.
Raises:
WrennNotFoundError: If no template with the given name exists.
"""
resp = await self._http.delete(f"/v1/snapshots/{name}")
handle_response(resp)

View File

@ -53,6 +53,22 @@ class AsyncCapsule(BaseAsyncCapsule):
api_key: str | None = None,
base_url: str | None = None,
) -> AsyncCapsule:
"""Create a new async code interpreter capsule.
Args:
template (str | None): Template to boot from. Defaults to
``"code-runner-beta"``.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout (int | None): Inactivity TTL in seconds before auto-pause.
wait (bool): Await until the capsule reaches ``running`` status.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
AsyncCapsule: A new async code interpreter capsule instance.
"""
client = AsyncWrennClient(api_key=api_key, base_url=base_url)
info = await client.capsules.create(
template=template or DEFAULT_TEMPLATE,

View File

@ -47,6 +47,18 @@ class Capsule(BaseCapsule):
base_url: str | None = None,
**kwargs,
) -> None:
"""Create a code interpreter capsule.
Args:
template (str | None): Template to boot from. Defaults to
``"code-runner-beta"``.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout (int | None): Inactivity TTL in seconds before auto-pause.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
"""
super().__init__(
template=template or DEFAULT_TEMPLATE,
vcpus=vcpus,
@ -71,6 +83,22 @@ class Capsule(BaseCapsule):
api_key: str | None = None,
base_url: str | None = None,
) -> Capsule:
"""Create a new code interpreter capsule.
Args:
template (str | None): Template to boot from. Defaults to
``"code-runner-beta"``.
vcpus (int | None): Number of virtual CPUs.
memory_mb (int | None): Memory in MiB.
timeout (int | None): Inactivity TTL in seconds before auto-pause.
wait (bool): Block until the capsule reaches ``running`` status.
api_key (str | None): Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
base_url (str | None): API base URL override.
Returns:
Capsule: A new code interpreter capsule instance.
"""
return cls(
template=template or DEFAULT_TEMPLATE,
vcpus=vcpus,

View File

@ -161,6 +161,28 @@ class Commands:
cwd: str | None = None,
tag: str | None = None,
) -> CommandResult | CommandHandle:
"""Execute a shell command inside the capsule.
Args:
cmd (str): Shell command string to execute.
background (bool): If ``True``, launch the process in the
background and return a :class:`CommandHandle` immediately.
Defaults to ``False``.
timeout (int | None): Seconds before the foreground command times
out. Ignored for background commands. Defaults to ``30``.
envs (dict[str, str] | None): Additional environment variables
to set for the process.
cwd (str | None): Working directory for the process.
tag (str | None): Optional label attached to background processes
for later retrieval via :meth:`connect`.
Returns:
CommandResult: stdout, stderr, exit code, and duration for
foreground commands (``background=False``).
CommandHandle: PID and tag for background commands
(``background=True``).
"""
payload: dict = {"cmd": cmd, "background": background}
if timeout is not None and not background:
payload["timeout_sec"] = timeout
@ -185,6 +207,12 @@ class Commands:
return _decode_exec_response(data)
def list(self) -> list[ProcessInfo]:
"""List all running background processes in the capsule.
Returns:
list[ProcessInfo]: Running processes with their PID, tag, and
command information.
"""
resp = self._http.get(f"/v1/capsules/{self._capsule_id}/processes")
data = handle_response(resp)
return [
@ -198,13 +226,29 @@ class Commands:
]
def kill(self, pid: int) -> None:
"""Send SIGKILL to a background process.
Args:
pid (int): PID of the process to kill.
Raises:
WrennNotFoundError: If no process with the given PID exists.
"""
resp = self._http.delete(
f"/v1/capsules/{self._capsule_id}/processes/{pid}"
)
handle_response(resp)
def connect(self, pid: int) -> Iterator[StreamEvent]:
"""Connect to a running background process and stream its output."""
"""Connect to a running background process and stream its output.
Args:
pid (int): PID of the background process to attach to.
Yields:
StreamEvent: Successive output events. Stops on
:class:`StreamExitEvent` or :class:`StreamErrorEvent`.
"""
with httpx_ws.connect_ws(
f"/v1/capsules/{self._capsule_id}/processes/{pid}/stream",
self._http,
@ -222,7 +266,17 @@ class Commands:
def stream(
self, cmd: str, args: list[str] | None = None
) -> Iterator[StreamEvent]:
"""Execute a command via WebSocket, yielding ``StreamEvent`` objects."""
"""Execute a command via WebSocket, streaming output as events.
Args:
cmd (str): Command to execute.
args (list[str] | None): Additional arguments for the command.
Yields:
StreamEvent: Successive events including :class:`StreamStartEvent`,
:class:`StreamStdoutEvent`, :class:`StreamStderrEvent`,
:class:`StreamExitEvent`, and :class:`StreamErrorEvent`.
"""
with httpx_ws.connect_ws(
f"/v1/capsules/{self._capsule_id}/exec/stream",
self._http,
@ -283,6 +337,28 @@ class AsyncCommands:
cwd: str | None = None,
tag: str | None = None,
) -> CommandResult | CommandHandle:
"""Execute a shell command inside the capsule.
Args:
cmd (str): Shell command string to execute.
background (bool): If ``True``, launch the process in the
background and return a :class:`CommandHandle` immediately.
Defaults to ``False``.
timeout (int | None): Seconds before the foreground command times
out. Ignored for background commands. Defaults to ``30``.
envs (dict[str, str] | None): Additional environment variables
to set for the process.
cwd (str | None): Working directory for the process.
tag (str | None): Optional label attached to background processes
for later retrieval via :meth:`connect`.
Returns:
CommandResult: stdout, stderr, exit code, and duration for
foreground commands (``background=False``).
CommandHandle: PID and tag for background commands
(``background=True``).
"""
payload: dict = {"cmd": cmd, "background": background}
if timeout is not None and not background:
payload["timeout_sec"] = timeout
@ -307,6 +383,12 @@ class AsyncCommands:
return _decode_exec_response(data)
async def list(self) -> list[ProcessInfo]:
"""List all running background processes in the capsule.
Returns:
list[ProcessInfo]: Running processes with their PID, tag, and
command information.
"""
resp = await self._http.get(
f"/v1/capsules/{self._capsule_id}/processes"
)
@ -322,13 +404,29 @@ class AsyncCommands:
]
async def kill(self, pid: int) -> None:
"""Send SIGKILL to a background process.
Args:
pid (int): PID of the process to kill.
Raises:
WrennNotFoundError: If no process with the given PID exists.
"""
resp = await self._http.delete(
f"/v1/capsules/{self._capsule_id}/processes/{pid}"
)
handle_response(resp)
async def connect(self, pid: int) -> AsyncIterator[StreamEvent]:
"""Connect to a running background process and stream its output."""
"""Connect to a running background process and stream its output.
Args:
pid (int): PID of the background process to attach to.
Yields:
StreamEvent: Successive output events. Stops on
:class:`StreamExitEvent` or :class:`StreamErrorEvent`.
"""
async with httpx_ws.aconnect_ws(
f"/v1/capsules/{self._capsule_id}/processes/{pid}/stream",
self._http,
@ -346,7 +444,17 @@ class AsyncCommands:
async def stream(
self, cmd: str, args: list[str] | None = None
) -> AsyncIterator[StreamEvent]:
"""Execute a command via WebSocket, yielding ``StreamEvent`` objects."""
"""Execute a command via WebSocket, streaming output as events.
Args:
cmd (str): Command to execute.
args (list[str] | None): Additional arguments for the command.
Yields:
StreamEvent: Successive events including :class:`StreamStartEvent`,
:class:`StreamStdoutEvent`, :class:`StreamStderrEvent`,
:class:`StreamExitEvent`, and :class:`StreamErrorEvent`.
"""
async with httpx_ws.aconnect_ws(
f"/v1/capsules/{self._capsule_id}/exec/stream",
self._http,

View File

@ -6,9 +6,26 @@ import httpx
class WrennError(Exception):
"""Base exception for all Wrenn SDK errors."""
"""Base exception for all Wrenn SDK errors.
All SDK exceptions inherit from this class, so you can catch
``WrennError`` to handle any API error generically.
Attributes:
code (str): Machine-readable error code from the API
(e.g. ``"not_found"``).
message (str): Human-readable error description.
status_code (int): HTTP status code of the response.
"""
def __init__(self, code: str, message: str, status_code: int) -> None:
"""Initialize a WrennError.
Args:
code (str): Machine-readable error code.
message (str): Human-readable error description.
status_code (int): HTTP status code of the response.
"""
self.code = code
self.message = message
self.status_code = status_code
@ -36,11 +53,23 @@ class WrennConflictError(WrennError):
class WrennHostHasCapsulesError(WrennConflictError):
"""409 — Host still has running capsules."""
"""409 — Host still has running capsules.
Attributes:
capsule_ids (list[str]): IDs of the capsules still running on the host.
"""
def __init__(
self, code: str, message: str, status_code: int, capsule_ids: list[str]
) -> None:
"""Initialize a WrennHostHasCapsulesError.
Args:
code (str): Machine-readable error code.
message (str): Human-readable error description.
status_code (int): HTTP status code of the response.
capsule_ids (list[str]): IDs of capsules still on the host.
"""
self.capsule_ids = capsule_ids
super().__init__(code, message, status_code)

View File

@ -17,11 +17,31 @@ class Files:
self._http = http
def read(self, path: str) -> str:
"""Read a file as a UTF-8 string."""
"""Read a file as a UTF-8 string.
Args:
path (str): Absolute path to the file inside the capsule.
Returns:
str: File contents decoded as UTF-8.
Raises:
WrennNotFoundError: If the path does not exist.
"""
return self.read_bytes(path).decode("utf-8", errors="replace")
def read_bytes(self, path: str) -> bytes:
"""Read a file as raw bytes."""
"""Read a file as raw bytes.
Args:
path (str): Absolute path to the file inside the capsule.
Returns:
bytes: Raw file contents.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = self._http.post(
f"/v1/capsules/{self._capsule_id}/files/read",
json={"path": path},
@ -30,7 +50,14 @@ class Files:
return resp.content
def write(self, path: str, data: str | bytes) -> None:
"""Write data to a file inside the capsule."""
"""Write data to a file inside the capsule.
Creates parent directories if they do not exist.
Args:
path (str): Absolute destination path inside the capsule.
data (str | bytes): Content to write. Strings are UTF-8 encoded.
"""
if isinstance(data, str):
data = data.encode("utf-8")
resp = self._http.post(
@ -41,7 +68,19 @@ class Files:
resp.raise_for_status()
def list(self, path: str, depth: int = 1) -> list[FileEntry]:
"""List directory contents."""
"""List directory contents.
Args:
path (str): Absolute path to the directory inside the capsule.
depth (int): Recursion depth. ``1`` lists only immediate children.
Defaults to ``1``.
Returns:
list[FileEntry]: Entries in the directory.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = self._http.post(
f"/v1/capsules/{self._capsule_id}/files/list",
json={"path": path, "depth": depth},
@ -50,7 +89,14 @@ class Files:
return parsed.entries or []
def exists(self, path: str) -> bool:
"""Check whether a path exists inside the capsule."""
"""Check whether a path exists inside the capsule.
Args:
path (str): Absolute path to check.
Returns:
bool: ``True`` if the path exists.
"""
parent = os.path.dirname(path)
name = os.path.basename(path)
try:
@ -60,7 +106,14 @@ class Files:
return any(e.name == name for e in entries)
def make_dir(self, path: str) -> FileEntry:
"""Create a directory (with parents). Idempotent."""
"""Create a directory (with parents). Idempotent.
Args:
path (str): Absolute path of the directory to create.
Returns:
FileEntry: The created (or already-existing) directory entry.
"""
resp = self._http.post(
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
@ -82,7 +135,14 @@ class Files:
return parsed.entry
def remove(self, path: str) -> None:
"""Remove a file or directory recursively."""
"""Remove a file or directory recursively.
Args:
path (str): Absolute path to remove.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = self._http.post(
f"/v1/capsules/{self._capsule_id}/files/remove",
json={"path": path},
@ -90,7 +150,15 @@ class Files:
handle_response(resp)
def upload_stream(self, path: str, stream: Iterator[bytes]) -> None:
"""Streaming upload for large files."""
"""Stream a large file into the capsule.
Prefer this over :meth:`write` when the file is too large to hold in
memory.
Args:
path (str): Absolute destination path inside the capsule.
stream (Iterator[bytes]): Iterable of byte chunks to upload.
"""
boundary = os.urandom(16).hex().encode("utf-8")
def _multipart() -> Iterator[bytes]:
@ -114,7 +182,20 @@ class Files:
resp.raise_for_status()
def download_stream(self, path: str) -> Iterator[bytes]:
"""Streaming download for large files."""
"""Stream a large file out of the capsule.
Prefer this over :meth:`read_bytes` when the file is too large to hold
in memory.
Args:
path (str): Absolute path to the file inside the capsule.
Yields:
bytes: Successive byte chunks of the file.
Raises:
WrennNotFoundError: If the path does not exist.
"""
with self._http.stream(
"POST",
f"/v1/capsules/{self._capsule_id}/files/stream/read",
@ -132,12 +213,32 @@ class AsyncFiles:
self._http = http
async def read(self, path: str) -> str:
"""Read a file as a UTF-8 string."""
"""Read a file as a UTF-8 string.
Args:
path (str): Absolute path to the file inside the capsule.
Returns:
str: File contents decoded as UTF-8.
Raises:
WrennNotFoundError: If the path does not exist.
"""
data = await self.read_bytes(path)
return data.decode("utf-8", errors="replace")
async def read_bytes(self, path: str) -> bytes:
"""Read a file as raw bytes."""
"""Read a file as raw bytes.
Args:
path (str): Absolute path to the file inside the capsule.
Returns:
bytes: Raw file contents.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = await self._http.post(
f"/v1/capsules/{self._capsule_id}/files/read",
json={"path": path},
@ -146,7 +247,14 @@ class AsyncFiles:
return resp.content
async def write(self, path: str, data: str | bytes) -> None:
"""Write data to a file inside the capsule."""
"""Write data to a file inside the capsule.
Creates parent directories if they do not exist.
Args:
path (str): Absolute destination path inside the capsule.
data (str | bytes): Content to write. Strings are UTF-8 encoded.
"""
if isinstance(data, str):
data = data.encode("utf-8")
resp = await self._http.post(
@ -157,7 +265,19 @@ class AsyncFiles:
resp.raise_for_status()
async def list(self, path: str, depth: int = 1) -> list[FileEntry]:
"""List directory contents."""
"""List directory contents.
Args:
path (str): Absolute path to the directory inside the capsule.
depth (int): Recursion depth. ``1`` lists only immediate children.
Defaults to ``1``.
Returns:
list[FileEntry]: Entries in the directory.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = await self._http.post(
f"/v1/capsules/{self._capsule_id}/files/list",
json={"path": path, "depth": depth},
@ -166,7 +286,14 @@ class AsyncFiles:
return parsed.entries or []
async def exists(self, path: str) -> bool:
"""Check whether a path exists inside the capsule."""
"""Check whether a path exists inside the capsule.
Args:
path (str): Absolute path to check.
Returns:
bool: ``True`` if the path exists.
"""
parent = os.path.dirname(path)
name = os.path.basename(path)
try:
@ -176,7 +303,14 @@ class AsyncFiles:
return any(e.name == name for e in entries)
async def make_dir(self, path: str) -> FileEntry:
"""Create a directory (with parents). Idempotent."""
"""Create a directory (with parents). Idempotent.
Args:
path (str): Absolute path of the directory to create.
Returns:
FileEntry: The created (or already-existing) directory entry.
"""
resp = await self._http.post(
f"/v1/capsules/{self._capsule_id}/files/mkdir",
json={"path": path},
@ -198,7 +332,14 @@ class AsyncFiles:
return parsed.entry
async def remove(self, path: str) -> None:
"""Remove a file or directory recursively."""
"""Remove a file or directory recursively.
Args:
path (str): Absolute path to remove.
Raises:
WrennNotFoundError: If the path does not exist.
"""
resp = await self._http.post(
f"/v1/capsules/{self._capsule_id}/files/remove",
json={"path": path},
@ -206,7 +347,16 @@ class AsyncFiles:
handle_response(resp)
async def upload_stream(self, path: str, stream: AsyncIterator[bytes]) -> None:
"""Streaming upload for large files."""
"""Stream a large file into the capsule.
Prefer this over :meth:`write` when the file is too large to hold in
memory.
Args:
path (str): Absolute destination path inside the capsule.
stream (AsyncIterator[bytes]): Async iterable of byte chunks to
upload.
"""
boundary = os.urandom(16).hex().encode("utf-8")
async def _multipart() -> AsyncIterator[bytes]:
@ -230,7 +380,20 @@ class AsyncFiles:
resp.raise_for_status()
async def download_stream(self, path: str) -> AsyncIterator[bytes]:
"""Streaming download for large files."""
"""Stream a large file out of the capsule.
Prefer this over :meth:`read_bytes` when the file is too large to hold
in memory.
Args:
path (str): Absolute path to the file inside the capsule.
Yields:
bytes: Successive byte chunks of the file.
Raises:
WrennNotFoundError: If the path does not exist.
"""
async with self._http.stream(
"POST",
f"/v1/capsules/{self._capsule_id}/files/stream/read",

11
uv.lock generated
View File

@ -546,15 +546,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/82/ed/0301aeeac3e5353ef3d94b6ec08bbcabd04a72018415dcb29e588514bba8/python_dotenv-1.2.2.tar.gz", hash = "sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3", size = 50135, upload-time = "2026-03-01T16:00:26.196Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" },
]
[[package]]
name = "pytokens"
version = "0.4.1"
@ -694,7 +685,6 @@ dependencies = [
{ name = "httpx" },
{ name = "httpx-ws" },
{ name = "pydantic" },
{ name = "python-dotenv" },
]
[package.dev-dependencies]
@ -713,7 +703,6 @@ requires-dist = [
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "httpx-ws", specifier = ">=0.9.0" },
{ name = "pydantic", specifier = ">=2.12.5" },
{ name = "python-dotenv", specifier = ">=1.2.2" },
]
[package.metadata.requires-dev]