# wrenn
# wrenn.client
## CapsulesResource Objects
```python
class CapsulesResource()
```
Sync capsule control-plane operations.
#### create
```python
def create(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout_sec: int | None = None) -> CapsuleModel
```
Create a new capsule.
**Arguments**:
- `template` _str | None_ - Template name to boot from.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout_sec` _int | None_ - Inactivity TTL in seconds before
auto-pause. ``0`` disables auto-pause.
**Returns**:
- `CapsuleModel` - The newly created capsule.
#### list
```python
def list() -> list[CapsuleModel]
```
List all capsules for the authenticated team.
**Returns**:
- `list[CapsuleModel]` - All capsules belonging to the team.
#### get
```python
def get(id: str) -> CapsuleModel
```
Get a capsule by ID.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Current state of the capsule.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### destroy
```python
def destroy(id: str) -> None
```
Destroy a capsule permanently.
**Arguments**:
- `id` _str_ - Capsule ID.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### pause
```python
def pause(id: str) -> CapsuleModel
```
Pause a running capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Updated capsule state.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### resume
```python
def resume(id: str) -> CapsuleModel
```
Resume a paused capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Updated capsule state.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### ping
```python
def ping(id: str) -> None
```
Reset the inactivity timer for a capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
## AsyncCapsulesResource Objects
```python
class AsyncCapsulesResource()
```
Async capsule control-plane operations.
#### create
```python
async def create(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout_sec: int | None = None) -> CapsuleModel
```
Create a new capsule.
**Arguments**:
- `template` _str | None_ - Template name to boot from.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout_sec` _int | None_ - Inactivity TTL in seconds before
auto-pause. ``0`` disables auto-pause.
**Returns**:
- `CapsuleModel` - The newly created capsule.
#### list
```python
async def list() -> list[CapsuleModel]
```
List all capsules for the authenticated team.
**Returns**:
- `list[CapsuleModel]` - All capsules belonging to the team.
#### get
```python
async def get(id: str) -> CapsuleModel
```
Get a capsule by ID.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Current state of the capsule.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### destroy
```python
async def destroy(id: str) -> None
```
Destroy a capsule permanently.
**Arguments**:
- `id` _str_ - Capsule ID.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### pause
```python
async def pause(id: str) -> CapsuleModel
```
Pause a running capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Updated capsule state.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### resume
```python
async def resume(id: str) -> CapsuleModel
```
Resume a paused capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Returns**:
- `CapsuleModel` - Updated capsule state.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### ping
```python
async def ping(id: str) -> None
```
Reset the inactivity timer for a capsule.
**Arguments**:
- `id` _str_ - Capsule ID.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
## SnapshotsResource Objects
```python
class SnapshotsResource()
```
Sync snapshot operations.
#### create
```python
def create(capsule_id: str,
name: str | None = None,
overwrite: bool = False) -> Template
```
Create a snapshot template from a running capsule.
**Arguments**:
- `capsule_id` _str_ - ID of the capsule to snapshot.
- `name` _str | None_ - Name for the snapshot template. Auto-generated
if not provided.
- `overwrite` _bool_ - If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
**Returns**:
- `Template` - The created snapshot template.
#### list
```python
def list(type: str | None = None) -> list[Template]
```
List snapshot templates.
**Arguments**:
- `type` _str | None_ - Filter by template type. Returns all templates
if not provided.
**Returns**:
- `list[Template]` - Matching snapshot templates.
#### delete
```python
def delete(name: str) -> None
```
Delete a snapshot template by name.
**Arguments**:
- `name` _str_ - Template name to delete.
**Raises**:
- `WrennNotFoundError` - If no template with the given name exists.
## AsyncSnapshotsResource Objects
```python
class AsyncSnapshotsResource()
```
Async snapshot operations.
#### create
```python
async def create(capsule_id: str,
name: str | None = None,
overwrite: bool = False) -> Template
```
Create a snapshot template from a running capsule.
**Arguments**:
- `capsule_id` _str_ - ID of the capsule to snapshot.
- `name` _str | None_ - Name for the snapshot template. Auto-generated
if not provided.
- `overwrite` _bool_ - If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
**Returns**:
- `Template` - The created snapshot template.
#### list
```python
async def list(type: str | None = None) -> list[Template]
```
List snapshot templates.
**Arguments**:
- `type` _str | None_ - Filter by template type. Returns all templates
if not provided.
**Returns**:
- `list[Template]` - Matching snapshot templates.
#### delete
```python
async def delete(name: str) -> None
```
Delete a snapshot template by name.
**Arguments**:
- `name` _str_ - Template name to delete.
**Raises**:
- `WrennNotFoundError` - If no template with the given name exists.
## WrennClient Objects
```python
class WrennClient()
```
Synchronous client for the Wrenn API.
Authenticates with an API key.
**Arguments**:
- `api_key` - API key (``wrn_...``). Falls back to ``WRENN_API_KEY`` env var.
- `base_url` - Wrenn API base URL.
#### http
```python
@property
def http() -> httpx.Client
```
The underlying httpx.Client (for sub-objects that need direct access).
#### close
```python
def close() -> None
```
Close the underlying HTTP connection pool.
## AsyncWrennClient Objects
```python
class AsyncWrennClient()
```
Asynchronous client for the Wrenn API.
Authenticates with an API key.
**Arguments**:
- `api_key` - API key (``wrn_...``). Falls back to ``WRENN_API_KEY`` env var.
- `base_url` - Wrenn API base URL. Falls back to ``WRENN_BASE_URL`` env var.
#### http
```python
@property
def http() -> httpx.AsyncClient
```
The underlying httpx.AsyncClient.
#### aclose
```python
async def aclose() -> None
```
Close the underlying async HTTP connection pool.
# wrenn.sandbox
# wrenn.commands
## CommandResult Objects
```python
@dataclass
class CommandResult()
```
Result from a foreground command execution.
## CommandHandle Objects
```python
@dataclass
class CommandHandle()
```
Handle for a background process.
## ProcessInfo Objects
```python
@dataclass
class ProcessInfo()
```
Information about a running process.
## StreamEvent Objects
```python
class StreamEvent()
```
Base class for streaming exec events.
## Commands Objects
```python
class Commands()
```
Sync command execution interface. Accessed via ``capsule.commands``.
#### run
```python
def run(cmd: str,
*,
background: bool = False,
timeout: int | None = 30,
envs: dict[str, str] | None = None,
cwd: str | None = None,
tag: str | None = None) -> CommandResult | CommandHandle
```
Execute a shell command inside the capsule.
**Arguments**:
- `cmd` _str_ - Shell command string to execute.
- `background` _bool_ - If ``True``, launch the process in the
background and return a :class:`CommandHandle` immediately.
Defaults to ``False``.
- `timeout` _int | None_ - Seconds before the foreground command times
out. Ignored for background commands. Defaults to ``30``.
- `envs` _dict[str, str] | None_ - Additional environment variables
to set for the process.
- `cwd` _str | None_ - Working directory for the process.
- `tag` _str | None_ - Optional label attached to background processes
for later retrieval via :meth:`connect`.
**Returns**:
- `CommandResult` - stdout, stderr, exit code, and duration for
foreground commands (``background=False``).
- `CommandHandle` - PID and tag for background commands
(``background=True``).
#### list
```python
def list() -> list[ProcessInfo]
```
List all running background processes in the capsule.
**Returns**:
- `list[ProcessInfo]` - Running processes with their PID, tag, and
command information.
#### kill
```python
def kill(pid: int) -> None
```
Send SIGKILL to a background process.
**Arguments**:
- `pid` _int_ - PID of the process to kill.
**Raises**:
- `WrennNotFoundError` - If no process with the given PID exists.
#### connect
```python
def connect(pid: int) -> Iterator[StreamEvent]
```
Connect to a running background process and stream its output.
**Arguments**:
- `pid` _int_ - PID of the background process to attach to.
**Yields**:
- `StreamEvent` - Successive output events. Stops on
:class:`StreamExitEvent` or :class:`StreamErrorEvent`.
#### stream
```python
def stream(cmd: str,
args: builtins.list[str] | None = None) -> Iterator[StreamEvent]
```
Execute a command via WebSocket, streaming output as events.
**Arguments**:
- `cmd` _str_ - Command to execute.
- `args` _list[str] | None_ - Additional arguments for the command.
When omitted, *cmd* is interpreted as a shell command
string and executed via ``/bin/sh -c``.
**Yields**:
- `StreamEvent` - Successive events including :class:`StreamStartEvent`,
:class:`StreamStdoutEvent`, :class:`StreamStderrEvent`,
:class:`StreamExitEvent`, and :class:`StreamErrorEvent`.
## AsyncCommands Objects
```python
class AsyncCommands()
```
Async command execution interface. Accessed via ``capsule.commands``.
#### run
```python
async def run(cmd: str,
*,
background: bool = False,
timeout: int | None = 30,
envs: dict[str, str] | None = None,
cwd: str | None = None,
tag: str | None = None) -> CommandResult | CommandHandle
```
Execute a shell command inside the capsule.
**Arguments**:
- `cmd` _str_ - Shell command string to execute.
- `background` _bool_ - If ``True``, launch the process in the
background and return a :class:`CommandHandle` immediately.
Defaults to ``False``.
- `timeout` _int | None_ - Seconds before the foreground command times
out. Ignored for background commands. Defaults to ``30``.
- `envs` _dict[str, str] | None_ - Additional environment variables
to set for the process.
- `cwd` _str | None_ - Working directory for the process.
- `tag` _str | None_ - Optional label attached to background processes
for later retrieval via :meth:`connect`.
**Returns**:
- `CommandResult` - stdout, stderr, exit code, and duration for
foreground commands (``background=False``).
- `CommandHandle` - PID and tag for background commands
(``background=True``).
#### list
```python
async def list() -> list[ProcessInfo]
```
List all running background processes in the capsule.
**Returns**:
- `list[ProcessInfo]` - Running processes with their PID, tag, and
command information.
#### kill
```python
async def kill(pid: int) -> None
```
Send SIGKILL to a background process.
**Arguments**:
- `pid` _int_ - PID of the process to kill.
**Raises**:
- `WrennNotFoundError` - If no process with the given PID exists.
#### connect
```python
async def connect(pid: int) -> AsyncIterator[StreamEvent]
```
Connect to a running background process and stream its output.
**Arguments**:
- `pid` _int_ - PID of the background process to attach to.
**Yields**:
- `StreamEvent` - Successive output events. Stops on
:class:`StreamExitEvent` or :class:`StreamErrorEvent`.
#### stream
```python
async def stream(
cmd: str,
args: builtins.list[str] | None = None) -> AsyncIterator[StreamEvent]
```
Execute a command via WebSocket, streaming output as events.
**Arguments**:
- `cmd` _str_ - Command to execute.
- `args` _list[str] | None_ - Additional arguments for the command.
When omitted, *cmd* is interpreted as a shell command
string and executed via ``/bin/sh -c``.
**Yields**:
- `StreamEvent` - Successive events including :class:`StreamStartEvent`,
:class:`StreamStdoutEvent`, :class:`StreamStderrEvent`,
:class:`StreamExitEvent`, and :class:`StreamErrorEvent`.
# wrenn.files
## Files Objects
```python
class Files()
```
Sync filesystem interface. Accessed via ``capsule.files``.
#### read
```python
def read(path: str) -> str
```
Read a file as a UTF-8 string.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Returns**:
- `str` - File contents decoded as UTF-8.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### read\_bytes
```python
def read_bytes(path: str) -> bytes
```
Read a file as raw bytes.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Returns**:
- `bytes` - Raw file contents.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### write
```python
def write(path: str, data: str | bytes) -> None
```
Write data to a file inside the capsule.
Creates parent directories if they do not exist.
**Arguments**:
- `path` _str_ - Absolute destination path inside the capsule.
- `data` _str | bytes_ - Content to write. Strings are UTF-8 encoded.
#### list
```python
def list(path: str, depth: int = 1) -> list[FileEntry]
```
List directory contents.
**Arguments**:
- `path` _str_ - Absolute path to the directory inside the capsule.
- `depth` _int_ - Recursion depth. ``1`` lists only immediate children.
Defaults to ``1``.
**Returns**:
- `list[FileEntry]` - Entries in the directory.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### exists
```python
def exists(path: str) -> bool
```
Check whether a path exists inside the capsule.
**Arguments**:
- `path` _str_ - Absolute path to check.
**Returns**:
- `bool` - ``True`` if the path exists.
#### make\_dir
```python
def make_dir(path: str) -> FileEntry
```
Create a directory (with parents). Idempotent.
**Arguments**:
- `path` _str_ - Absolute path of the directory to create.
**Returns**:
- `FileEntry` - The created (or already-existing) directory entry.
#### remove
```python
def remove(path: str) -> None
```
Remove a file or directory recursively.
**Arguments**:
- `path` _str_ - Absolute path to remove.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### upload\_stream
```python
def upload_stream(path: str, stream: Iterator[bytes]) -> None
```
Stream a large file into the capsule.
Prefer this over :meth:`write` when the file is too large to hold in
memory.
**Arguments**:
- `path` _str_ - Absolute destination path inside the capsule.
- `stream` _Iterator[bytes]_ - Iterable of byte chunks to upload.
#### download\_stream
```python
def download_stream(path: str) -> Iterator[bytes]
```
Stream a large file out of the capsule.
Prefer this over :meth:`read_bytes` when the file is too large to hold
in memory.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Yields**:
- `bytes` - Successive byte chunks of the file.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
## AsyncFiles Objects
```python
class AsyncFiles()
```
Async filesystem interface. Accessed via ``capsule.files``.
#### read
```python
async def read(path: str) -> str
```
Read a file as a UTF-8 string.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Returns**:
- `str` - File contents decoded as UTF-8.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### read\_bytes
```python
async def read_bytes(path: str) -> bytes
```
Read a file as raw bytes.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Returns**:
- `bytes` - Raw file contents.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### write
```python
async def write(path: str, data: str | bytes) -> None
```
Write data to a file inside the capsule.
Creates parent directories if they do not exist.
**Arguments**:
- `path` _str_ - Absolute destination path inside the capsule.
- `data` _str | bytes_ - Content to write. Strings are UTF-8 encoded.
#### list
```python
async def list(path: str, depth: int = 1) -> list[FileEntry]
```
List directory contents.
**Arguments**:
- `path` _str_ - Absolute path to the directory inside the capsule.
- `depth` _int_ - Recursion depth. ``1`` lists only immediate children.
Defaults to ``1``.
**Returns**:
- `list[FileEntry]` - Entries in the directory.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### exists
```python
async def exists(path: str) -> bool
```
Check whether a path exists inside the capsule.
**Arguments**:
- `path` _str_ - Absolute path to check.
**Returns**:
- `bool` - ``True`` if the path exists.
#### make\_dir
```python
async def make_dir(path: str) -> FileEntry
```
Create a directory (with parents). Idempotent.
**Arguments**:
- `path` _str_ - Absolute path of the directory to create.
**Returns**:
- `FileEntry` - The created (or already-existing) directory entry.
#### remove
```python
async def remove(path: str) -> None
```
Remove a file or directory recursively.
**Arguments**:
- `path` _str_ - Absolute path to remove.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
#### upload\_stream
```python
async def upload_stream(path: str, stream: AsyncIterator[bytes]) -> None
```
Stream a large file into the capsule.
Prefer this over :meth:`write` when the file is too large to hold in
memory.
**Arguments**:
- `path` _str_ - Absolute destination path inside the capsule.
- `stream` _AsyncIterator[bytes]_ - Async iterable of byte chunks to
upload.
#### download\_stream
```python
async def download_stream(path: str) -> AsyncIterator[bytes]
```
Stream a large file out of the capsule.
Prefer this over :meth:`read_bytes` when the file is too large to hold
in memory.
**Arguments**:
- `path` _str_ - Absolute path to the file inside the capsule.
**Yields**:
- `bytes` - Successive byte chunks of the file.
**Raises**:
- `WrennNotFoundError` - If the path does not exist.
# wrenn.code\_interpreter.models
Deprecated — use :mod:`wrenn.code_runner.models`.
# wrenn.code\_interpreter.async\_capsule
Deprecated — use :mod:`wrenn.code_runner.async_capsule`.
# wrenn.code\_interpreter
Deprecated alias for :mod:`wrenn.code_runner`.
Importing from ``wrenn.code_interpreter`` emits a ``FutureWarning``.
Use ``wrenn.code_runner`` instead.
# wrenn.code\_interpreter.capsule
Deprecated — use :mod:`wrenn.code_runner.capsule`.
# wrenn.exceptions
## WrennError Objects
```python
class WrennError(Exception)
```
Base exception for all Wrenn SDK errors.
All SDK exceptions inherit from this class, so you can catch
``WrennError`` to handle any API error generically.
**Attributes**:
- `code` _str_ - Machine-readable error code from the API
(e.g. ``"not_found"``).
- `message` _str_ - Human-readable error description.
- `status_code` _int_ - HTTP status code of the response.
#### \_\_init\_\_
```python
def __init__(code: str, message: str, status_code: int) -> None
```
Initialize a WrennError.
**Arguments**:
- `code` _str_ - Machine-readable error code.
- `message` _str_ - Human-readable error description.
- `status_code` _int_ - HTTP status code of the response.
## WrennValidationError Objects
```python
class WrennValidationError(WrennError)
```
400 — Invalid request parameters.
## WrennAuthenticationError Objects
```python
class WrennAuthenticationError(WrennError)
```
401 — Invalid or missing authentication.
## WrennForbiddenError Objects
```python
class WrennForbiddenError(WrennError)
```
403 — Authenticated but not authorized.
## WrennNotFoundError Objects
```python
class WrennNotFoundError(WrennError)
```
404 — Resource not found.
## WrennConflictError Objects
```python
class WrennConflictError(WrennError)
```
409 — State conflict (e.g. invalid_state).
## WrennHostHasCapsulesError Objects
```python
class WrennHostHasCapsulesError(WrennConflictError)
```
409 — Host still has running capsules.
**Attributes**:
- `capsule_ids` _list[str]_ - IDs of the capsules still running on the host.
#### \_\_init\_\_
```python
def __init__(code: str, message: str, status_code: int,
capsule_ids: list[str]) -> None
```
Initialize a WrennHostHasCapsulesError.
**Arguments**:
- `code` _str_ - Machine-readable error code.
- `message` _str_ - Human-readable error description.
- `status_code` _int_ - HTTP status code of the response.
- `capsule_ids` _list[str]_ - IDs of capsules still on the host.
## WrennHostUnavailableError Objects
```python
class WrennHostUnavailableError(WrennError)
```
503 — No suitable host available.
## WrennAgentError Objects
```python
class WrennAgentError(WrennError)
```
502 — Host agent returned an error.
## WrennInternalError Objects
```python
class WrennInternalError(WrennError)
```
500 — Unexpected server error.
# wrenn.async\_capsule
## AsyncCapsule Objects
```python
class AsyncCapsule()
```
Async Wrenn capsule with e2b-compatible interface.
Create via classmethod::
capsule = await AsyncCapsule.create(template="minimal")
Use as async context manager::
async with await AsyncCapsule.create() as capsule:
await capsule.commands.run("echo hello")
#### capsule\_id
```python
@property
def capsule_id() -> str
```
The capsule's unique identifier.
**Returns**:
- `str` - Capsule ID assigned by the Wrenn API.
#### info
```python
@property
def info() -> CapsuleModel | None
```
Cached capsule metadata from the last API call.
**Returns**:
CapsuleModel | None: The last-fetched capsule model, or ``None``
if the capsule was connected without an initial fetch.
#### create
```python
@classmethod
async def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
```
Create a new capsule.
**Arguments**:
- `template` _str | None_ - Template name to boot from.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout` _int | None_ - Inactivity TTL in seconds before auto-pause.
- `wait` _bool_ - Await until the capsule reaches ``running`` status.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `AsyncCapsule` - A new capsule instance.
#### connect
```python
@classmethod
async def connect(cls,
capsule_id: str,
*,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
```
Connect to an existing capsule, resuming it if paused.
**Arguments**:
- `capsule_id` _str_ - ID of the capsule to connect to.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `AsyncCapsule` - A capsule instance bound to the existing capsule.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### ping
```python
async def ping() -> None
```
Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the
inactivity TTL is set.
#### wait\_ready
```python
async def wait_ready(timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None
```
Await until capsule status is ``running``.
**Raises**:
- `TimeoutError` - If capsule does not reach ``running`` within ``timeout``.
- `RuntimeError` - If capsule enters error/stopped/missing while waiting.
#### is\_running
```python
async def is_running() -> bool
```
Check whether the capsule is currently running.
Makes a live API call to fetch current status.
**Returns**:
- `bool` - ``True`` if the capsule status is ``running``.
#### list
```python
@classmethod
async def list(cls,
*,
api_key: str | None = None,
base_url: str | None = None) -> list[CapsuleModel]
```
List all capsules belonging to the team.
**Arguments**:
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `list[CapsuleModel]` - All capsules for the authenticated team.
#### pty
```python
@asynccontextmanager
async def pty(cmd: str = "/bin/bash",
args: builtins.list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None) -> AsyncIterator[AsyncPtySession]
```
Open an async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over
:class:`PtyEvent` objects::
async with capsule.pty() as term:
await term.write(b"echo hello\n")
async for event in term:
if event.type == "output":
print(event.data.decode())
**Arguments**:
- `cmd` _str_ - Command to run inside the PTY. Defaults to
``"/bin/bash"``.
- `args` _list[str] | None_ - Additional arguments for ``cmd``.
- `cols` _int_ - Initial terminal column count. Defaults to ``80``.
- `rows` _int_ - Initial terminal row count. Defaults to ``24``.
- `envs` _dict[str, str] | None_ - Additional environment variables
to inject into the process.
- `cwd` _str | None_ - Working directory for the process.
**Yields**:
- `AsyncPtySession` - An interactive async PTY session.
#### pty\_connect
```python
@asynccontextmanager
async def pty_connect(tag: str) -> AsyncIterator[AsyncPtySession]
```
Reconnect to an existing PTY session by tag.
**Arguments**:
- `tag` _str_ - Session tag returned in the ``started`` PTY event.
**Yields**:
- `AsyncPtySession` - The reconnected async PTY session.
#### get\_url
```python
def get_url(port: int) -> str
```
Get the HTTP proxy URL for a port exposed inside this capsule.
**Arguments**:
- `port` _int_ - Port number to proxy.
**Returns**:
- `str` - A ``https://`` (or ``http://``) URL that proxies HTTP
requests to the given port inside the capsule. For raw
WebSocket access, see the lower-level ``_build_proxy_url``
helper or the ``pty()`` API.
#### create\_snapshot
```python
async def create_snapshot(name: str | None = None,
overwrite: bool = False) -> Template
```
Create a snapshot template from this capsule's current state.
**Arguments**:
- `name` _str | None_ - Name for the snapshot template. Auto-generated
if not provided.
- `overwrite` _bool_ - If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
**Returns**:
- `Template` - The created snapshot template.
# wrenn.pty
## PtySession Objects
```python
class PtySession()
```
Interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over events::
with sb.pty(cmd="/bin/bash") as term:
term.write(b"ls -la\n")
for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
#### tag
```python
@property
def tag() -> str | None
```
Session tag. Available after the ``started`` event.
#### pid
```python
@property
def pid() -> int | None
```
Process PID. Available after the ``started`` event.
#### write
```python
def write(data: bytes) -> None
```
Send raw bytes to the PTY stdin.
**Arguments**:
- `data` - Raw bytes to send. Base64-encoded internally.
#### resize
```python
def resize(cols: int, rows: int) -> None
```
Resize the PTY terminal.
**Arguments**:
- `cols` - New column count. Must be > 0.
- `rows` - New row count. Must be > 0.
**Raises**:
- `ValueError` - If cols or rows is 0.
#### kill
```python
def kill() -> None
```
Send SIGKILL to the PTY process.
## AsyncPtySession Objects
```python
class AsyncPtySession()
```
Async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over events::
async with sb.pty(cmd="/bin/bash") as term:
await term.write(b"ls -la\n")
async for event in term:
if event.type == "output":
sys.stdout.buffer.write(event.data)
elif event.type == "exit":
break
#### tag
```python
@property
def tag() -> str | None
```
Session tag. Available after the ``started`` event.
#### pid
```python
@property
def pid() -> int | None
```
Process PID. Available after the ``started`` event.
#### write
```python
async def write(data: bytes) -> None
```
Send raw bytes to the PTY stdin.
**Arguments**:
- `data` - Raw bytes to send. Base64-encoded internally.
#### resize
```python
async def resize(cols: int, rows: int) -> None
```
Resize the PTY terminal.
**Arguments**:
- `cols` - New column count. Must be > 0.
- `rows` - New row count. Must be > 0.
**Raises**:
- `ValueError` - If cols or rows is 0.
#### kill
```python
async def kill() -> None
```
Send SIGKILL to the PTY process.
# wrenn.models.\_generated
## SessionResponse Objects
```python
class SessionResponse(BaseModel)
```
Returned by login, activate, and switch-team. The actual auth credential
is the wrenn_sid cookie set on the response. The body carries identity
data the SPA needs to bootstrap.
## Peaks Objects
```python
class Peaks(BaseModel)
```
Maximum values over the last 30 days.
## Series Objects
```python
class Series(BaseModel)
```
Parallel arrays for chart rendering.
## Encoding Objects
```python
class Encoding(StrEnum)
```
Output encoding. "base64" when stdout/stderr contain binary data.
## Type2 Objects
```python
class Type2(StrEnum)
```
Host type. Regular hosts are shared; BYOC hosts belong to a team.
## Outcome Objects
```python
class Outcome(StrEnum)
```
Present for action events (capsule.* except state.changed,
template.snapshot.*). Absent for host.up/down, capsule.state.changed,
and the connected sentinel.
## SSEEvent Objects
```python
class SSEEvent(BaseModel)
```
Wire format of one SSE message body. The event name (`event:` line) is
the `kind` and the JSON below is the `data:` line.
# wrenn.models
# wrenn.capsule
## Capsule Objects
```python
class Capsule()
```
A Wrenn capsule (sandbox) with e2b-compatible interface.
Create directly::
capsule = Capsule(api_key="wrn_...")
capsule = Capsule(template="minimal") # reads WRENN_API_KEY env
Or via classmethod::
capsule = Capsule.create(template="minimal")
Use as context manager for automatic cleanup::
with Capsule() as capsule:
capsule.commands.run("echo hello")
#### \_\_init\_\_
```python
def __init__(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
_capsule_id: str | None = None,
_client: WrennClient | None = None,
_info: CapsuleModel | None = None) -> None
```
Create and start a new capsule.
**Arguments**:
- `template` _str | None_ - Template name to boot from. Defaults to
the server-side default (``"minimal"``).
- `vcpus` _int | None_ - Number of virtual CPUs. Defaults to the
server-side default.
- `memory_mb` _int | None_ - Memory in MiB. Defaults to the
server-side default.
- `timeout` _int | None_ - Inactivity TTL in seconds before the capsule
is auto-paused. ``0`` disables auto-pause.
- `wait` _bool_ - If ``True``, block until the capsule status is
``running`` before returning.
- `api_key` _str | None_ - Wrenn API key (``wrn_...``). Falls back to
the ``WRENN_API_KEY`` environment variable.
- `base_url` _str | None_ - Wrenn API base URL. Falls back to
``WRENN_BASE_URL`` or the default production endpoint.
#### capsule\_id
```python
@property
def capsule_id() -> str
```
The capsule's unique identifier.
**Returns**:
- `str` - Capsule ID assigned by the Wrenn API.
#### info
```python
@property
def info() -> CapsuleModel | None
```
Cached capsule metadata from the last API call.
**Returns**:
CapsuleModel | None: The last-fetched capsule model, or ``None``
if the capsule was connected without an initial fetch.
#### create
```python
@classmethod
def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
```
Create a new capsule.
Equivalent to calling ``Capsule(...)`` directly.
**Arguments**:
- `template` _str | None_ - Template name to boot from.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout` _int | None_ - Inactivity TTL in seconds before auto-pause.
- `wait` _bool_ - Block until the capsule reaches ``running`` status.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `Capsule` - A new capsule instance.
#### connect
```python
@classmethod
def connect(cls,
capsule_id: str,
*,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
```
Connect to an existing capsule, resuming it if paused.
**Arguments**:
- `capsule_id` _str_ - ID of the capsule to connect to.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `Capsule` - A capsule instance bound to the existing capsule.
**Raises**:
- `WrennNotFoundError` - If no capsule with the given ID exists.
#### ping
```python
def ping() -> None
```
Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the
inactivity TTL is set.
#### wait\_ready
```python
def wait_ready(timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None
```
Block until capsule status is ``running``.
**Raises**:
- `TimeoutError` - If capsule does not reach ``running`` within ``timeout``.
- `RuntimeError` - If capsule enters error/stopped/missing while waiting.
#### is\_running
```python
def is_running() -> bool
```
Check whether the capsule is currently running.
Makes a live API call to fetch current status.
**Returns**:
- `bool` - ``True`` if the capsule status is ``running``.
#### list
```python
@classmethod
def list(cls,
*,
api_key: str | None = None,
base_url: str | None = None) -> list[CapsuleModel]
```
List all capsules belonging to the team.
**Arguments**:
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `list[CapsuleModel]` - All capsules for the authenticated team.
#### pty
```python
@contextmanager
def pty(cmd: str = "/bin/bash",
args: builtins.list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None) -> Iterator[PtySession]
```
Open an interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over :class:`PtyEvent` objects::
with capsule.pty() as term:
term.write(b"echo hello\n")
for event in term:
if event.type == "output":
print(event.data.decode())
**Arguments**:
- `cmd` _str_ - Command to run inside the PTY. Defaults to
``"/bin/bash"``.
- `args` _list[str] | None_ - Additional arguments for ``cmd``.
- `cols` _int_ - Initial terminal column count. Defaults to ``80``.
- `rows` _int_ - Initial terminal row count. Defaults to ``24``.
- `envs` _dict[str, str] | None_ - Additional environment variables to
inject into the process.
- `cwd` _str | None_ - Working directory for the process.
**Yields**:
- `PtySession` - An interactive PTY session.
#### pty\_connect
```python
@contextmanager
def pty_connect(tag: str) -> Iterator[PtySession]
```
Reconnect to an existing PTY session by tag.
**Arguments**:
- `tag` _str_ - Session tag returned in the ``started`` PTY event.
**Yields**:
- `PtySession` - The reconnected PTY session.
#### get\_url
```python
def get_url(port: int) -> str
```
Get the HTTP proxy URL for a port exposed inside this capsule.
**Arguments**:
- `port` _int_ - Port number to proxy.
**Returns**:
- `str` - A ``https://`` (or ``http://``) URL that proxies HTTP
requests to the given port inside the capsule. For raw
WebSocket access, see the lower-level ``_build_proxy_url``
helper or the ``pty()`` API.
#### create\_snapshot
```python
def create_snapshot(name: str | None = None,
overwrite: bool = False) -> Template
```
Create a snapshot template from this capsule's current state.
**Arguments**:
- `name` _str | None_ - Name for the snapshot template. Auto-generated
if not provided.
- `overwrite` _bool_ - If ``True``, overwrite an existing template with
the same name. Defaults to ``False``.
**Returns**:
- `Template` - The created snapshot template.
# wrenn.code\_runner.models
## ExecutionError Objects
```python
@dataclass
class ExecutionError()
```
Error raised during code execution.
**Attributes**:
- `name` - Exception class name (e.g. ``"NameError"``).
- `value` - Exception message.
- `traceback` - Full traceback string.
## Logs Objects
```python
@dataclass
class Logs()
```
Captured stdout/stderr streams.
Each element in the list is one chunk of text as it arrived from
the kernel.
## Result Objects
```python
@dataclass
class Result()
```
A single rich output from code execution.
Jupyter cells can produce multiple outputs — one ``execute_result``
(the expression value) and zero or more ``display_data`` messages
(from ``plt.show()``, ``display()``, etc.). Each becomes a
``Result``.
Known MIME types are unpacked into named attributes; anything else
lands in :pyattr:`extra`.
#### text
``text/plain`` representation.
#### html
``text/html`` representation.
#### markdown
``text/markdown`` representation.
#### svg
``image/svg+xml`` representation.
#### png
``image/png`` — base64-encoded.
#### jpeg
``image/jpeg`` — base64-encoded.
#### gif
``image/gif`` — base64-encoded.
#### pdf
``application/pdf`` — base64-encoded.
#### latex
``text/latex`` representation.
#### json
``application/json`` representation.
#### javascript
``application/javascript`` representation.
#### plotly
``application/vnd.plotly.v1+json`` representation.
#### extra
MIME types not covered by the named fields above.
#### is\_main\_result
``True`` when this came from an ``execute_result`` message
(i.e. the value of the last expression in the cell). ``False``
for ``display_data`` outputs.
#### from\_bundle
```python
@classmethod
def from_bundle(cls,
bundle: dict[str, str],
*,
is_main_result: bool = False) -> Result
```
Build a ``Result`` from a Jupyter MIME bundle dict.
#### formats
```python
def formats() -> list[str]
```
Return names of non-``None`` MIME-type fields.
## Execution Objects
```python
@dataclass
class Execution()
```
Complete result of a ``run_code`` call.
**Attributes**:
- `results` - All rich outputs produced by the cell — charts, tables,
images, expression values, etc.
- `logs` - Captured stdout/stderr text.
- `error` - Populated when the cell raised an exception.
- `execution_count` - Jupyter execution counter (the ``[N]`` number).
#### timed\_out
``True`` when execution was cut short by the ``timeout`` parameter
(or by the kernel WebSocket dropping). Pairs with ``error`` of name
``"Timeout"`` or ``"Disconnected"``.
#### text
```python
@property
def text() -> str | None
```
Convenience — ``text/plain`` of the main ``execute_result``,
or ``None`` if the cell had no expression value.
# wrenn.code\_runner.async\_capsule
## AsyncCapsule Objects
```python
class AsyncCapsule(BaseAsyncCapsule)
```
Async code runner capsule with ``run_code`` support.
Uses ``code-runner-beta`` template and the ``wrenn`` Jupyter
kernelspec by default::
from wrenn.code_runner import AsyncCapsule
capsule = await AsyncCapsule.create()
result = await capsule.run_code("print('hello')")
#### create
```python
@classmethod
async def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
```
Create a new async code runner capsule.
**Arguments**:
- `template` _str | None_ - Template to boot from. Defaults to
``"code-runner-beta"``.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout` _int | None_ - Inactivity TTL in seconds before auto-pause.
- `kernel` _str | None_ - Jupyter kernelspec name. Defaults to
``"wrenn"``.
- `wait` _bool_ - Await until the capsule reaches ``running`` status.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `AsyncCapsule` - A new async code runner capsule instance.
#### run\_code
```python
async def run_code(
code: str,
language: str = "python",
timeout: float = 30,
jupyter_timeout: float = 30,
on_result: Callable[[Result], Any] | None = None,
on_stdout: Callable[[str], Any] | None = None,
on_stderr: Callable[[str], Any] | None = None,
on_error: Callable[[ExecutionError], Any] | None = None) -> Execution
```
Execute code in a persistent Jupyter kernel (async).
**Arguments**:
- `code` - Code string to execute.
- `language` - Execution backend language. Currently only ``"python"``.
- `timeout` - Maximum seconds to wait for execution to complete.
- `jupyter_timeout` - Maximum seconds to wait for Jupyter to become
available.
- `on_result` - Called for each rich output (charts, images, expression
values).
- `on_stdout` - Called for each stdout chunk.
- `on_stderr` - Called for each stderr chunk.
- `on_error` - Called when the cell raises an exception.
**Returns**:
An :class:`Execution` with ``.results``, ``.logs``, ``.error``,
and a convenience ``.text`` property.
# wrenn.code\_runner
Code runner — execute code in persistent Jupyter kernels.
Uses the ``code-runner-beta`` template and the ``wrenn`` Jupyter
kernelspec by default.
Example::
from wrenn.code_runner import Capsule
with Capsule(wait=True) as capsule:
result = capsule.run_code("print('hello')")
print(result.logs.stdout)
# wrenn.code\_runner.capsule
## Capsule Objects
```python
class Capsule(BaseCapsule)
```
Code runner capsule with ``run_code`` support.
Uses ``code-runner-beta`` template and the ``wrenn`` Jupyter
kernelspec by default::
from wrenn.code_runner import Capsule
capsule = Capsule()
result = capsule.run_code("print('hello')")
print(result.logs.stdout) # ["hello\n"]
#### \_\_init\_\_
```python
def __init__(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
**kwargs) -> None
```
Create a code runner capsule.
**Arguments**:
- `template` _str | None_ - Template to boot from. Defaults to
``"code-runner-beta"``.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout` _int | None_ - Inactivity TTL in seconds before auto-pause.
- `kernel` _str | None_ - Jupyter kernelspec name. Defaults to
``"wrenn"``.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
#### create
```python
@classmethod
def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
```
Create a new code runner capsule.
**Arguments**:
- `template` _str | None_ - Template to boot from. Defaults to
``"code-runner-beta"``.
- `vcpus` _int | None_ - Number of virtual CPUs.
- `memory_mb` _int | None_ - Memory in MiB.
- `timeout` _int | None_ - Inactivity TTL in seconds before auto-pause.
- `kernel` _str | None_ - Jupyter kernelspec name. Defaults to
``"wrenn"``.
- `wait` _bool_ - Block until the capsule reaches ``running`` status.
- `api_key` _str | None_ - Wrenn API key. Falls back to
``WRENN_API_KEY`` env var.
- `base_url` _str | None_ - API base URL override.
**Returns**:
- `Capsule` - A new code runner capsule instance.
#### run\_code
```python
def run_code(
code: str,
language: str = "python",
timeout: float = 30,
jupyter_timeout: float = 30,
on_result: Callable[[Result], Any] | None = None,
on_stdout: Callable[[str], Any] | None = None,
on_stderr: Callable[[str], Any] | None = None,
on_error: Callable[[ExecutionError], Any] | None = None) -> Execution
```
Execute code in a persistent Jupyter kernel.
Variables, imports, and function definitions survive across calls.
**Arguments**:
- `code` - Code string to execute.
- `language` - Execution backend language. Currently only ``"python"``
is supported; passing anything else raises ``ValueError``.
To target a non-Python kernel, set ``kernel=`` on the
capsule constructor.
- `timeout` - Maximum seconds to wait for execution to complete.
- `jupyter_timeout` - Maximum seconds to wait for Jupyter to become
available.
- `on_result` - Called for each rich output (charts, images, expression
values).
- `on_stdout` - Called for each stdout chunk.
- `on_stderr` - Called for each stderr chunk.
- `on_error` - Called when the cell raises an exception.
**Returns**:
An :class:`Execution` with ``.results``, ``.logs``, ``.error``,
and a convenience ``.text`` property.
# wrenn.code\_runner.\_protocol
Shared Jupyter protocol helpers used by both sync and async capsules.
Pure functions only — no I/O, no sync/async coupling.
#### build\_execute\_request
```python
def build_execute_request(code: str) -> dict
```
Build a Jupyter ``execute_request`` message envelope.
**Returns**:
- `dict` - A fully-formed Jupyter shell-channel message ready to be
JSON-serialized over the kernel WebSocket. The caller is
expected to read ``msg["header"]["msg_id"]`` to correlate
responses.
#### build\_ws\_url
```python
def build_ws_url(base_url: str, capsule_id: str, kernel_id: str) -> str
```
Build the Jupyter kernel WebSocket URL for the given capsule.
# wrenn.\_config
# wrenn.\_git.\_auth
#### embed\_credentials
```python
def embed_credentials(url: str, username: str, password: str) -> str
```
Embed HTTP(S) credentials into a git URL.
**Arguments**:
- `url` - Git repository URL.
- `username` - Username for authentication.
- `password` - Password or personal access token.
**Returns**:
URL with ``username:password@`` embedded in the netloc.
**Raises**:
- `ValueError` - If the URL scheme is not ``http`` or ``https``.
#### strip\_credentials
```python
def strip_credentials(url: str) -> str
```
Remove embedded credentials from a git URL.
**Arguments**:
- `url` - Git repository URL, possibly with credentials.
**Returns**:
URL with credentials removed. Non-HTTP(S) URLs are returned
unchanged.
#### is\_auth\_error
```python
def is_auth_error(stderr: str) -> bool
```
Check whether git stderr indicates an authentication failure.
**Arguments**:
- `stderr` - Combined stderr output from a git command.
**Returns**:
``True`` if any known auth-failure pattern is found.
#### build\_credential\_approve\_cmd
```python
def build_credential_approve_cmd(username: str,
password: str,
host: str = "github.com",
protocol: str = "https") -> str
```
Build a shell command that pipes credentials into ``git credential approve``.
**Arguments**:
- `username` - Git username.
- `password` - Password or personal access token.
- `host` - Target host. Defaults to ``"github.com"``.
- `protocol` - Protocol. Defaults to ``"https"``.
**Returns**:
A shell command string safe to pass to ``commands.run()``.
# wrenn.\_git.\_cmd
Pure functions that build git argument lists and parse git output.
No I/O, no network, no imports from ``wrenn``. Every ``build_*`` function
returns a ``list[str]`` suitable for ``shlex.join()``. Every ``parse_*``
function takes raw stdout and returns a typed structure.
## FileStatus Objects
```python
@dataclass
class FileStatus()
```
A single entry from ``git status --porcelain=v1``.
**Attributes**:
- `path` _str_ - File path relative to the repository root.
- `index_status` _str_ - Index (staged) status character.
- `work_tree_status` _str_ - Working-tree status character.
- `renamed_from` _str | None_ - Original path when status is a rename.
#### staged
```python
@property
def staged() -> bool
```
Whether the change is staged in the index.
#### status
```python
@property
def status() -> str
```
Normalized human-readable status label.
## GitStatus Objects
```python
@dataclass
class GitStatus()
```
Parsed output of ``git status --porcelain=v1 --branch``.
**Attributes**:
- `branch` _str | None_ - Current branch name, or ``None`` if detached.
- `upstream` _str | None_ - Upstream tracking branch.
- `ahead` _int_ - Commits ahead of upstream.
- `behind` _int_ - Commits behind upstream.
- `detached` _bool_ - Whether HEAD is detached.
- `files` _list[FileStatus]_ - Per-file status entries.
#### is\_clean
```python
@property
def is_clean() -> bool
```
``True`` when there are no changed or untracked files.
#### has\_staged
```python
@property
def has_staged() -> bool
```
``True`` when at least one file has staged changes.
#### has\_untracked
```python
@property
def has_untracked() -> bool
```
``True`` when at least one file is untracked.
#### has\_conflicts
```python
@property
def has_conflicts() -> bool
```
``True`` when at least one file has merge conflicts.
## GitBranch Objects
```python
@dataclass
class GitBranch()
```
A single branch entry.
**Attributes**:
- `name` _str_ - Branch name (short ref).
- `is_current` _bool_ - Whether this is the checked-out branch.
#### build\_clone
```python
def build_clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None) -> list[str]
```
Build ``git clone`` arguments.
#### build\_init
```python
def build_init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None) -> list[str]
```
Build ``git init`` arguments.
#### build\_add
```python
def build_add(paths: list[str] | None = None,
*,
all: bool = False) -> list[str]
```
Build ``git add`` arguments.
#### build\_commit
```python
def build_commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None) -> list[str]
```
Build ``git commit`` arguments.
#### build\_push
```python
def build_push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False) -> list[str]
```
Build ``git push`` arguments.
#### build\_pull
```python
def build_pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False) -> list[str]
```
Build ``git pull`` arguments.
#### build\_status
```python
def build_status() -> list[str]
```
Build ``git status`` arguments for porcelain parsing.
#### build\_branches
```python
def build_branches() -> list[str]
```
Build ``git branch`` arguments for structured parsing.
#### build\_create\_branch
```python
def build_create_branch(name: str,
*,
start_point: str | None = None) -> list[str]
```
Build ``git checkout -b`` arguments.
#### build\_checkout
```python
def build_checkout(name: str) -> list[str]
```
Build ``git checkout`` arguments.
#### build\_delete\_branch
```python
def build_delete_branch(name: str, *, force: bool = False) -> list[str]
```
Build ``git branch -d/-D`` arguments.
#### build\_remote\_add
```python
def build_remote_add(name: str, url: str, *, fetch: bool = False) -> list[str]
```
Build ``git remote add`` arguments.
#### build\_remote\_get\_url
```python
def build_remote_get_url(name: str = "origin") -> list[str]
```
Build ``git remote get-url`` arguments.
#### build\_remote\_set\_url
```python
def build_remote_set_url(name: str, url: str) -> list[str]
```
Build ``git remote set-url`` arguments.
#### build\_reset
```python
def build_reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None) -> list[str]
```
Build ``git reset`` arguments.
**Arguments**:
- `mode` - Reset mode (``soft``, ``mixed``, ``hard``, ``merge``, ``keep``).
- `ref` - Commit, branch, or ref to reset to.
- `paths` - Paths to reset (mutually exclusive with ``mode``).
#### build\_restore
```python
def build_restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None) -> list[str]
```
Build ``git restore`` arguments.
**Arguments**:
- `paths` - Paths to restore.
- `staged` - Restore the index (unstage).
- `worktree` - Restore working-tree files.
- `source` - Commit or ref to restore from.
#### build\_config\_set
```python
def build_config_set(key: str,
value: str,
*,
scope: str = "local",
repo_path: str | None = None) -> list[str]
```
Build ``git config`` set arguments.
#### build\_config\_get
```python
def build_config_get(key: str,
*,
scope: str = "local",
repo_path: str | None = None) -> list[str]
```
Build ``git config --get`` arguments.
#### parse\_status
```python
def parse_status(stdout: str) -> GitStatus
```
Parse ``git status --porcelain=v1 --branch`` output.
**Arguments**:
- `stdout` - Raw stdout from the git status command.
**Returns**:
Parsed :class:`GitStatus`.
#### parse\_branches
```python
def parse_branches(stdout: str) -> list[GitBranch]
```
Parse ``git branch --format=%(refname:short)\t%(HEAD)`` output.
**Arguments**:
- `stdout` - Raw stdout from the git branch command.
**Returns**:
List of :class:`GitBranch`.
# wrenn.\_git.exceptions
## GitError Objects
```python
class GitError(Exception)
```
Base exception for all git operations inside a capsule.
Not a subclass of :class:`WrennError` because git errors originate
from a process exit code, not an HTTP response.
**Attributes**:
- `message` _str_ - Human-readable error description.
- `stderr` _str_ - Raw stderr output from the git process.
- `exit_code` _int_ - Process exit code.
## GitCommandError Objects
```python
class GitCommandError(GitError)
```
A git command exited with a non-zero exit code.
## GitAuthError Objects
```python
class GitAuthError(GitError)
```
Authentication failed when communicating with a remote.
# wrenn.\_git
Git operations inside a Wrenn capsule.
Provides :class:`Git` (sync) and :class:`AsyncGit` (async) interfaces
accessed via ``capsule.git``. All operations execute the real ``git``
binary inside the capsule through :class:`~wrenn.commands.Commands`.
## Git Objects
```python
class Git()
```
Sync git interface. Accessed via ``capsule.git``.
Executes the real ``git`` binary inside the capsule through
:meth:`Commands.run`. Methods raise :class:`GitCommandError` (or
:class:`GitAuthError`) on non-zero exit codes.
#### clone
```python
def clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None,
username: str | None = None,
password: str | None = None,
dangerously_store_credentials: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 300) -> CommandResult
```
Clone a remote repository into the capsule.
**Arguments**:
- `url` - Remote repository URL.
- `dest` - Destination path. Defaults to the repository name
derived from the URL.
- `branch` - Branch or tag to check out.
- `depth` - Create a shallow clone with this many commits.
- `username` - Username for HTTP(S) authentication.
- `password` - Password or token for HTTP(S) authentication.
- `dangerously_store_credentials` - If ``True``, leave credentials
embedded in the remote URL after cloning.
- `cwd` - Working directory for the command.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds. Defaults to ``300``.
**Returns**:
Command result with stdout, stderr, exit_code, and duration.
**Raises**:
- `GitAuthError` - If the remote rejected authentication.
- `GitCommandError` - If clone failed for another reason.
- `ValueError` - If *password* is provided without *username*.
#### init
```python
def init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Initialize a new git repository.
**Arguments**:
- `path` - Destination path for the repository.
- `bare` - Create a bare repository.
- `initial_branch` - Name for the initial branch (e.g. ``"main"``).
- `cwd` - Working directory for the command.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If init failed.
#### add
```python
def add(paths: list[str] | None = None,
*,
all: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Stage files for commit.
**Arguments**:
- `paths` - Specific files to stage. If ``None``, stages the
current directory (or all with ``all=True``).
- `all` - Stage all changes including untracked files.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If add failed.
#### commit
```python
def commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Create a commit.
**Arguments**:
- `message` - Commit message.
- `allow_empty` - Allow creating a commit with no changes.
- `author_name` - Override the commit author name.
- `author_email` - Override the commit author email.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If commit failed.
#### push
```python
def push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
```
Push commits to a remote.
**Arguments**:
- `remote` - Remote name. Defaults to ``"origin"``.
- `branch` - Branch to push. Defaults to the current branch.
- `force` - Force-push.
- `set_upstream` - Set upstream tracking reference.
- `username` - Username for HTTP(S) authentication.
- `password` - Password or token for HTTP(S) authentication.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitAuthError` - If authentication failed.
- `GitCommandError` - If push failed.
#### pull
```python
def pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
```
Pull changes from a remote.
**Arguments**:
- `remote` - Remote name. Defaults to ``"origin"``.
- `branch` - Branch to pull.
- `rebase` - Rebase instead of merge.
- `ff_only` - Only allow fast-forward merges.
- `username` - Username for HTTP(S) authentication.
- `password` - Password or token for HTTP(S) authentication.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitAuthError` - If authentication failed.
- `GitCommandError` - If pull failed.
#### status
```python
def status(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> GitStatus
```
Get repository status.
**Arguments**:
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Parsed :class:`GitStatus` with branch info and file changes.
**Raises**:
- `GitCommandError` - If the command failed.
#### branches
```python
def branches(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> list[GitBranch]
```
List local branches.
**Arguments**:
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
List of :class:`GitBranch`.
**Raises**:
- `GitCommandError` - If the command failed.
#### create\_branch
```python
def create_branch(name: str,
*,
start_point: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Create and check out a new branch.
**Arguments**:
- `name` - Branch name.
- `start_point` - Commit or ref to branch from.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### checkout\_branch
```python
def checkout_branch(name: str,
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Check out an existing branch.
**Arguments**:
- `name` - Branch name.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### delete\_branch
```python
def delete_branch(name: str,
*,
force: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Delete a branch.
**Arguments**:
- `name` - Branch name.
- `force` - Force-delete with ``-D``.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### remote\_add
```python
def remote_add(name: str,
url: str,
*,
fetch: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Add a remote.
**Arguments**:
- `name` - Remote name (e.g. ``"origin"``).
- `url` - Remote URL.
- `fetch` - Fetch after adding.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### remote\_get
```python
def remote_get(name: str = "origin",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
```
Get the URL of a remote.
Returns ``None`` if the remote does not exist rather than raising.
**Arguments**:
- `name` - Remote name. Defaults to ``"origin"``.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Remote URL or ``None``.
#### reset
```python
def reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Reset the current HEAD.
**Arguments**:
- `mode` - Reset mode (``soft``, ``mixed``, ``hard``, ``merge``,
``keep``).
- `ref` - Commit, branch, or ref to reset to.
- `paths` - Paths to reset.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### restore
```python
def restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Restore working-tree files or unstage changes.
**Arguments**:
- `paths` - Paths to restore.
- `staged` - Restore the index (unstage).
- `worktree` - Restore working-tree files.
- `source` - Commit or ref to restore from.
- `cwd` - Working directory (repository root).
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### set\_config
```python
def set_config(key: str,
value: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Set a git config value.
**Arguments**:
- `key` - Config key (e.g. ``"user.name"``).
- `value` - Config value.
- `scope` - Config scope: ``"local"``, ``"global"``, or
``"system"``.
- `cwd` - Working directory (repository root). Required when
scope is ``"local"``.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Command result.
**Raises**:
- `GitCommandError` - If the command failed.
#### get\_config
```python
def get_config(key: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
```
Get a git config value.
Returns ``None`` if the key is not set rather than raising.
**Arguments**:
- `key` - Config key (e.g. ``"user.name"``).
- `scope` - Config scope: ``"local"``, ``"global"``, or
``"system"``.
- `cwd` - Working directory (repository root). Required when
scope is ``"local"``.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Returns**:
Config value or ``None``.
#### configure\_user
```python
def configure_user(name: str,
email: str,
*,
scope: str = "global",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
```
Configure git user name and email.
**Arguments**:
- `name` - Git user name.
- `email` - Git user email.
- `scope` - Config scope. Defaults to ``"global"``.
- `cwd` - Working directory (repository root). Required when
scope is ``"local"``.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Raises**:
- `ValueError` - If *name* or *email* is empty.
- `GitCommandError` - If a config command failed.
#### dangerously\_authenticate
```python
def dangerously_authenticate(username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
```
Persist git credentials via the credential store.
.. warning::
Credentials are written in plain text to the capsule
filesystem and are accessible to any process running inside
the capsule. Prefer per-operation ``username``/``password``
parameters on :meth:`clone`, :meth:`push`, and :meth:`pull`
instead.
**Arguments**:
- `username` - Git username.
- `password` - Password or personal access token.
- `host` - Target host. Defaults to ``"github.com"``.
- `protocol` - Protocol. Defaults to ``"https"``.
- `cwd` - Working directory.
- `envs` - Extra environment variables.
- `timeout` - Command timeout in seconds.
**Raises**:
- `ValueError` - If *username* or *password* is empty.
- `GitCommandError` - If a command failed.
## AsyncGit Objects
```python
class AsyncGit()
```
Async git interface. Accessed via ``capsule.git``.
Async mirror of :class:`Git`. See that class for full method
documentation.
#### clone
```python
async def clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None,
username: str | None = None,
password: str | None = None,
dangerously_store_credentials: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 300) -> CommandResult
```
Clone a remote repository into the capsule.
#### init
```python
async def init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Initialize a new git repository.
#### add
```python
async def add(paths: list[str] | None = None,
*,
all: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Stage files for commit.
#### commit
```python
async def commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Create a commit.
#### push
```python
async def push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
```
Push commits to a remote.
#### pull
```python
async def pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
```
Pull changes from a remote.
#### status
```python
async def status(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> GitStatus
```
Get repository status.
#### branches
```python
async def branches(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> list[GitBranch]
```
List local branches.
#### create\_branch
```python
async def create_branch(name: str,
*,
start_point: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Create and check out a new branch.
#### checkout\_branch
```python
async def checkout_branch(name: str,
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Check out an existing branch.
#### delete\_branch
```python
async def delete_branch(name: str,
*,
force: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Delete a branch.
#### remote\_add
```python
async def remote_add(name: str,
url: str,
*,
fetch: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Add a remote.
#### remote\_get
```python
async def remote_get(name: str = "origin",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
```
Get the URL of a remote. Returns ``None`` if not found.
#### reset
```python
async def reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Reset the current HEAD.
#### restore
```python
async def restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Restore working-tree files or unstage changes.
#### set\_config
```python
async def set_config(key: str,
value: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
```
Set a git config value.
#### get\_config
```python
async def get_config(key: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
```
Get a git config value. Returns ``None`` if not set.
#### configure\_user
```python
async def configure_user(name: str,
email: str,
*,
scope: str = "global",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
```
Configure git user name and email.
#### dangerously\_authenticate
```python
async def dangerously_authenticate(username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
```
Persist git credentials via the credential store.
.. warning::
Credentials are written in plain text to the capsule
filesystem. Prefer per-operation ``username``/``password``
parameters instead.