Files
python-sdk/docs/reference.md
pptx704 be573d07a3 v0.1.1 (#7)
Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com>
Reviewed-on: #7
Co-authored-by: pptx704 <rafeed@omukk.dev>
Co-committed-by: pptx704 <rafeed@omukk.dev>
2026-05-01 23:06:54 +00:00

86 KiB

wrenn

wrenn.client

CapsulesResource Objects

class CapsulesResource()

Sync capsule control-plane operations.

create

def create(template: str | None = None,
           vcpus: int | None = None,
           memory_mb: int | None = None,
           timeout_sec: int | None = None) -> CapsuleModel

Create a new capsule.

Arguments:

  • template str | None - Template name to boot from.
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout_sec int | None - Inactivity TTL in seconds before auto-pause. 0 disables auto-pause.

Returns:

  • CapsuleModel - The newly created capsule.

list

def list() -> list[CapsuleModel]

List all capsules for the authenticated team.

Returns:

  • list[CapsuleModel] - All capsules belonging to the team.

get

def get(id: str) -> CapsuleModel

Get a capsule by ID.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Current state of the capsule.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

destroy

def destroy(id: str) -> None

Destroy a capsule permanently.

Arguments:

  • id str - Capsule ID.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

pause

def pause(id: str) -> CapsuleModel

Pause a running capsule.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Updated capsule state.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

resume

def resume(id: str) -> CapsuleModel

Resume a paused capsule.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Updated capsule state.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

ping

def ping(id: str) -> None

Reset the inactivity timer for a capsule.

Arguments:

  • id str - Capsule ID.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

AsyncCapsulesResource Objects

class AsyncCapsulesResource()

Async capsule control-plane operations.

create

async def create(template: str | None = None,
                 vcpus: int | None = None,
                 memory_mb: int | None = None,
                 timeout_sec: int | None = None) -> CapsuleModel

Create a new capsule.

Arguments:

  • template str | None - Template name to boot from.
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout_sec int | None - Inactivity TTL in seconds before auto-pause. 0 disables auto-pause.

Returns:

  • CapsuleModel - The newly created capsule.

list

async def list() -> list[CapsuleModel]

List all capsules for the authenticated team.

Returns:

  • list[CapsuleModel] - All capsules belonging to the team.

get

async def get(id: str) -> CapsuleModel

Get a capsule by ID.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Current state of the capsule.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

destroy

async def destroy(id: str) -> None

Destroy a capsule permanently.

Arguments:

  • id str - Capsule ID.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

pause

async def pause(id: str) -> CapsuleModel

Pause a running capsule.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Updated capsule state.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

resume

async def resume(id: str) -> CapsuleModel

Resume a paused capsule.

Arguments:

  • id str - Capsule ID.

Returns:

  • CapsuleModel - Updated capsule state.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

ping

async def ping(id: str) -> None

Reset the inactivity timer for a capsule.

Arguments:

  • id str - Capsule ID.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

SnapshotsResource Objects

class SnapshotsResource()

Sync snapshot operations.

create

def create(capsule_id: str,
           name: str | None = None,
           overwrite: bool = False) -> Template

Create a snapshot template from a running capsule.

Arguments:

  • capsule_id str - ID of the capsule to snapshot.
  • name str | None - Name for the snapshot template. Auto-generated if not provided.
  • overwrite bool - If True, overwrite an existing template with the same name. Defaults to False.

Returns:

  • Template - The created snapshot template.

list

def list(type: str | None = None) -> list[Template]

List snapshot templates.

Arguments:

  • type str | None - Filter by template type. Returns all templates if not provided.

Returns:

  • list[Template] - Matching snapshot templates.

delete

def delete(name: str) -> None

Delete a snapshot template by name.

Arguments:

  • name str - Template name to delete.

Raises:

  • WrennNotFoundError - If no template with the given name exists.

AsyncSnapshotsResource Objects

class AsyncSnapshotsResource()

Async snapshot operations.

create

async def create(capsule_id: str,
                 name: str | None = None,
                 overwrite: bool = False) -> Template

Create a snapshot template from a running capsule.

Arguments:

  • capsule_id str - ID of the capsule to snapshot.
  • name str | None - Name for the snapshot template. Auto-generated if not provided.
  • overwrite bool - If True, overwrite an existing template with the same name. Defaults to False.

Returns:

  • Template - The created snapshot template.

list

async def list(type: str | None = None) -> list[Template]

List snapshot templates.

Arguments:

  • type str | None - Filter by template type. Returns all templates if not provided.

Returns:

  • list[Template] - Matching snapshot templates.

delete

async def delete(name: str) -> None

Delete a snapshot template by name.

Arguments:

  • name str - Template name to delete.

Raises:

  • WrennNotFoundError - If no template with the given name exists.

WrennClient Objects

class WrennClient()

Synchronous client for the Wrenn API.

Authenticates with an API key.

Arguments:

  • api_key - API key (wrn_...). Falls back to WRENN_API_KEY env var.
  • base_url - Wrenn API base URL.

http

@property
def http() -> httpx.Client

The underlying httpx.Client (for sub-objects that need direct access).

close

def close() -> None

Close the underlying HTTP connection pool.

AsyncWrennClient Objects

class AsyncWrennClient()

Asynchronous client for the Wrenn API.

Authenticates with an API key.

Arguments:

  • api_key - API key (wrn_...). Falls back to WRENN_API_KEY env var.
  • base_url - Wrenn API base URL. Falls back to WRENN_BASE_URL env var.

http

@property
def http() -> httpx.AsyncClient

The underlying httpx.AsyncClient.

aclose

async def aclose() -> None

Close the underlying async HTTP connection pool.

wrenn.sandbox

wrenn.commands

CommandResult Objects

@dataclass
class CommandResult()

Result from a foreground command execution.

CommandHandle Objects

@dataclass
class CommandHandle()

Handle for a background process.

ProcessInfo Objects

@dataclass
class ProcessInfo()

Information about a running process.

StreamEvent Objects

class StreamEvent()

Base class for streaming exec events.

Commands Objects

class Commands()

Sync command execution interface. Accessed via capsule.commands.

run

def run(cmd: str,
        *,
        background: bool = False,
        timeout: int | None = 30,
        envs: dict[str, str] | None = None,
        cwd: str | None = None,
        tag: str | None = None) -> CommandResult | CommandHandle

Execute a shell command inside the capsule.

Arguments:

  • cmd str - Shell command string to execute.
  • background bool - If True, launch the process in the background and return a :class:CommandHandle immediately. Defaults to False.
  • timeout int | None - Seconds before the foreground command times out. Ignored for background commands. Defaults to 30.
  • envs dict[str, str] | None - Additional environment variables to set for the process.
  • cwd str | None - Working directory for the process.
  • tag str | None - Optional label attached to background processes for later retrieval via :meth:connect.

Returns:

  • CommandResult - stdout, stderr, exit code, and duration for foreground commands (background=False).

  • CommandHandle - PID and tag for background commands (background=True).

list

def list() -> list[ProcessInfo]

List all running background processes in the capsule.

Returns:

  • list[ProcessInfo] - Running processes with their PID, tag, and command information.

kill

def kill(pid: int) -> None

Send SIGKILL to a background process.

Arguments:

  • pid int - PID of the process to kill.

Raises:

  • WrennNotFoundError - If no process with the given PID exists.

connect

def connect(pid: int) -> Iterator[StreamEvent]

Connect to a running background process and stream its output.

Arguments:

  • pid int - PID of the background process to attach to.

Yields:

  • StreamEvent - Successive output events. Stops on :class:StreamExitEvent or :class:StreamErrorEvent.

stream

def stream(cmd: str, args: list[str] | None = None) -> Iterator[StreamEvent]

Execute a command via WebSocket, streaming output as events.

Arguments:

  • cmd str - Command to execute.
  • args list[str] | None - Additional arguments for the command. When omitted, cmd is interpreted as a shell command string and executed via /bin/sh -c.

Yields:

  • StreamEvent - Successive events including :class:StreamStartEvent, :class:StreamStdoutEvent, :class:StreamStderrEvent, :class:StreamExitEvent, and :class:StreamErrorEvent.

AsyncCommands Objects

class AsyncCommands()

Async command execution interface. Accessed via capsule.commands.

run

async def run(cmd: str,
              *,
              background: bool = False,
              timeout: int | None = 30,
              envs: dict[str, str] | None = None,
              cwd: str | None = None,
              tag: str | None = None) -> CommandResult | CommandHandle

Execute a shell command inside the capsule.

Arguments:

  • cmd str - Shell command string to execute.
  • background bool - If True, launch the process in the background and return a :class:CommandHandle immediately. Defaults to False.
  • timeout int | None - Seconds before the foreground command times out. Ignored for background commands. Defaults to 30.
  • envs dict[str, str] | None - Additional environment variables to set for the process.
  • cwd str | None - Working directory for the process.
  • tag str | None - Optional label attached to background processes for later retrieval via :meth:connect.

Returns:

  • CommandResult - stdout, stderr, exit code, and duration for foreground commands (background=False).

  • CommandHandle - PID and tag for background commands (background=True).

list

async def list() -> list[ProcessInfo]

List all running background processes in the capsule.

Returns:

  • list[ProcessInfo] - Running processes with their PID, tag, and command information.

kill

async def kill(pid: int) -> None

Send SIGKILL to a background process.

Arguments:

  • pid int - PID of the process to kill.

Raises:

  • WrennNotFoundError - If no process with the given PID exists.

connect

async def connect(pid: int) -> AsyncIterator[StreamEvent]

Connect to a running background process and stream its output.

Arguments:

  • pid int - PID of the background process to attach to.

Yields:

  • StreamEvent - Successive output events. Stops on :class:StreamExitEvent or :class:StreamErrorEvent.

stream

async def stream(cmd: str,
                 args: list[str] | None = None) -> AsyncIterator[StreamEvent]

Execute a command via WebSocket, streaming output as events.

Arguments:

  • cmd str - Command to execute.
  • args list[str] | None - Additional arguments for the command. When omitted, cmd is interpreted as a shell command string and executed via /bin/sh -c.

Yields:

  • StreamEvent - Successive events including :class:StreamStartEvent, :class:StreamStdoutEvent, :class:StreamStderrEvent, :class:StreamExitEvent, and :class:StreamErrorEvent.

wrenn.files

Files Objects

class Files()

Sync filesystem interface. Accessed via capsule.files.

read

def read(path: str) -> str

Read a file as a UTF-8 string.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Returns:

  • str - File contents decoded as UTF-8.

Raises:

  • WrennNotFoundError - If the path does not exist.

read_bytes

def read_bytes(path: str) -> bytes

Read a file as raw bytes.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Returns:

  • bytes - Raw file contents.

Raises:

  • WrennNotFoundError - If the path does not exist.

write

def write(path: str, data: str | bytes) -> None

Write data to a file inside the capsule.

Creates parent directories if they do not exist.

Arguments:

  • path str - Absolute destination path inside the capsule.
  • data str | bytes - Content to write. Strings are UTF-8 encoded.

list

def list(path: str, depth: int = 1) -> list[FileEntry]

List directory contents.

Arguments:

  • path str - Absolute path to the directory inside the capsule.
  • depth int - Recursion depth. 1 lists only immediate children. Defaults to 1.

Returns:

  • list[FileEntry] - Entries in the directory.

Raises:

  • WrennNotFoundError - If the path does not exist.

exists

def exists(path: str) -> bool

Check whether a path exists inside the capsule.

Arguments:

  • path str - Absolute path to check.

Returns:

  • bool - True if the path exists.

make_dir

def make_dir(path: str) -> FileEntry

Create a directory (with parents). Idempotent.

Arguments:

  • path str - Absolute path of the directory to create.

Returns:

  • FileEntry - The created (or already-existing) directory entry.

remove

def remove(path: str) -> None

Remove a file or directory recursively.

Arguments:

  • path str - Absolute path to remove.

Raises:

  • WrennNotFoundError - If the path does not exist.

upload_stream

def upload_stream(path: str, stream: Iterator[bytes]) -> None

Stream a large file into the capsule.

Prefer this over :meth:write when the file is too large to hold in memory.

Arguments:

  • path str - Absolute destination path inside the capsule.
  • stream Iterator[bytes] - Iterable of byte chunks to upload.

download_stream

def download_stream(path: str) -> Iterator[bytes]

Stream a large file out of the capsule.

Prefer this over :meth:read_bytes when the file is too large to hold in memory.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Yields:

  • bytes - Successive byte chunks of the file.

Raises:

  • WrennNotFoundError - If the path does not exist.

AsyncFiles Objects

class AsyncFiles()

Async filesystem interface. Accessed via capsule.files.

read

async def read(path: str) -> str

Read a file as a UTF-8 string.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Returns:

  • str - File contents decoded as UTF-8.

Raises:

  • WrennNotFoundError - If the path does not exist.

read_bytes

async def read_bytes(path: str) -> bytes

Read a file as raw bytes.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Returns:

  • bytes - Raw file contents.

Raises:

  • WrennNotFoundError - If the path does not exist.

write

async def write(path: str, data: str | bytes) -> None

Write data to a file inside the capsule.

Creates parent directories if they do not exist.

Arguments:

  • path str - Absolute destination path inside the capsule.
  • data str | bytes - Content to write. Strings are UTF-8 encoded.

list

async def list(path: str, depth: int = 1) -> list[FileEntry]

List directory contents.

Arguments:

  • path str - Absolute path to the directory inside the capsule.
  • depth int - Recursion depth. 1 lists only immediate children. Defaults to 1.

Returns:

  • list[FileEntry] - Entries in the directory.

Raises:

  • WrennNotFoundError - If the path does not exist.

exists

async def exists(path: str) -> bool

Check whether a path exists inside the capsule.

Arguments:

  • path str - Absolute path to check.

Returns:

  • bool - True if the path exists.

make_dir

async def make_dir(path: str) -> FileEntry

Create a directory (with parents). Idempotent.

Arguments:

  • path str - Absolute path of the directory to create.

Returns:

  • FileEntry - The created (or already-existing) directory entry.

remove

async def remove(path: str) -> None

Remove a file or directory recursively.

Arguments:

  • path str - Absolute path to remove.

Raises:

  • WrennNotFoundError - If the path does not exist.

upload_stream

async def upload_stream(path: str, stream: AsyncIterator[bytes]) -> None

Stream a large file into the capsule.

Prefer this over :meth:write when the file is too large to hold in memory.

Arguments:

  • path str - Absolute destination path inside the capsule.
  • stream AsyncIterator[bytes] - Async iterable of byte chunks to upload.

download_stream

async def download_stream(path: str) -> AsyncIterator[bytes]

Stream a large file out of the capsule.

Prefer this over :meth:read_bytes when the file is too large to hold in memory.

Arguments:

  • path str - Absolute path to the file inside the capsule.

Yields:

  • bytes - Successive byte chunks of the file.

Raises:

  • WrennNotFoundError - If the path does not exist.

wrenn.code_interpreter.models

ExecutionError Objects

@dataclass
class ExecutionError()

Error raised during code execution.

Attributes:

  • name - Exception class name (e.g. "NameError").
  • value - Exception message.
  • traceback - Full traceback string.

Logs Objects

@dataclass
class Logs()

Captured stdout/stderr streams.

Each element in the list is one chunk of text as it arrived from the kernel.

Result Objects

@dataclass
class Result()

A single rich output from code execution.

Jupyter cells can produce multiple outputs — one execute_result (the expression value) and zero or more display_data messages (from plt.show(), display(), etc.). Each becomes a Result.

Known MIME types are unpacked into named attributes; anything else lands in :pyattr:extra.

text

text/plain representation.

html

text/html representation.

markdown

text/markdown representation.

svg

image/svg+xml representation.

png

image/png — base64-encoded.

jpeg

image/jpeg — base64-encoded.

pdf

application/pdf — base64-encoded.

latex

text/latex representation.

json

application/json representation.

javascript

application/javascript representation.

extra

MIME types not covered by the named fields above.

is_main_result

True when this came from an execute_result message (i.e. the value of the last expression in the cell). False for display_data outputs.

from_bundle

@classmethod
def from_bundle(cls,
                bundle: dict[str, str],
                *,
                is_main_result: bool = False) -> Result

Build a Result from a Jupyter MIME bundle dict.

formats

def formats() -> list[str]

Return names of non-None MIME-type fields.

Execution Objects

@dataclass
class Execution()

Complete result of a run_code call.

Attributes:

  • results - All rich outputs produced by the cell — charts, tables, images, expression values, etc.
  • logs - Captured stdout/stderr text.
  • error - Populated when the cell raised an exception.
  • execution_count - Jupyter execution counter (the [N] number).

text

@property
def text() -> str | None

Convenience — text/plain of the main execute_result, or None if the cell had no expression value.

wrenn.code_interpreter.async_capsule

AsyncCapsule Objects

class AsyncCapsule(BaseAsyncCapsule)

Async code interpreter capsule with run_code support.

Uses code-runner-beta template by default::

from wrenn.code_interpreter import AsyncCapsule

capsule = await AsyncCapsule.create() result = await capsule.run_code("print('hello')")

create

@classmethod
async def create(cls,
                 template: str | None = None,
                 vcpus: int | None = None,
                 memory_mb: int | None = None,
                 timeout: int | None = None,
                 *,
                 wait: bool = False,
                 api_key: str | None = None,
                 base_url: str | None = None) -> AsyncCapsule

Create a new async code interpreter capsule.

Arguments:

  • template str | None - Template to boot from. Defaults to "code-runner-beta".
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout int | None - Inactivity TTL in seconds before auto-pause.
  • wait bool - Await until the capsule reaches running status.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • AsyncCapsule - A new async code interpreter capsule instance.

run_code

async def run_code(
        code: str,
        language: str = "python",
        timeout: float = 30,
        jupyter_timeout: float = 30,
        on_result: Callable[[Result], Any] | None = None,
        on_stdout: Callable[[str], Any] | None = None,
        on_stderr: Callable[[str], Any] | None = None,
        on_error: Callable[[ExecutionError], Any] | None = None) -> Execution

Execute code in a persistent Jupyter kernel (async).

Arguments:

  • code - Code string to execute.
  • language - Execution backend language. Currently only "python".
  • timeout - Maximum seconds to wait for execution to complete.
  • jupyter_timeout - Maximum seconds to wait for Jupyter to become available.
  • on_result - Called for each rich output (charts, images, expression values).
  • on_stdout - Called for each stdout chunk.
  • on_stderr - Called for each stderr chunk.
  • on_error - Called when the cell raises an exception.

Returns:

An :class:Execution with .results, .logs, .error, and a convenience .text property.

wrenn.code_interpreter

wrenn.code_interpreter.capsule

Capsule Objects

class Capsule(BaseCapsule)

Code interpreter capsule with run_code support.

Uses code-runner-beta template by default::

from wrenn.code_interpreter import Capsule

capsule = Capsule() result = capsule.run_code("print('hello')") print(result.logs.stdout) # ["hello\n"]

__init__

def __init__(template: str | None = None,
             vcpus: int | None = None,
             memory_mb: int | None = None,
             timeout: int | None = None,
             *,
             api_key: str | None = None,
             base_url: str | None = None,
             **kwargs) -> None

Create a code interpreter capsule.

Arguments:

  • template str | None - Template to boot from. Defaults to "code-runner-beta".
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout int | None - Inactivity TTL in seconds before auto-pause.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

create

@classmethod
def create(cls,
           template: str | None = None,
           vcpus: int | None = None,
           memory_mb: int | None = None,
           timeout: int | None = None,
           *,
           wait: bool = False,
           api_key: str | None = None,
           base_url: str | None = None) -> Capsule

Create a new code interpreter capsule.

Arguments:

  • template str | None - Template to boot from. Defaults to "code-runner-beta".
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout int | None - Inactivity TTL in seconds before auto-pause.
  • wait bool - Block until the capsule reaches running status.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • Capsule - A new code interpreter capsule instance.

run_code

def run_code(
        code: str,
        language: str = "python",
        timeout: float = 30,
        jupyter_timeout: float = 30,
        on_result: Callable[[Result], Any] | None = None,
        on_stdout: Callable[[str], Any] | None = None,
        on_stderr: Callable[[str], Any] | None = None,
        on_error: Callable[[ExecutionError], Any] | None = None) -> Execution

Execute code in a persistent Jupyter kernel.

Variables, imports, and function definitions survive across calls.

Arguments:

  • code - Code string to execute.
  • language - Execution backend language. Currently only "python".
  • timeout - Maximum seconds to wait for execution to complete.
  • jupyter_timeout - Maximum seconds to wait for Jupyter to become available.
  • on_result - Called for each rich output (charts, images, expression values).
  • on_stdout - Called for each stdout chunk.
  • on_stderr - Called for each stderr chunk.
  • on_error - Called when the cell raises an exception.

Returns:

An :class:Execution with .results, .logs, .error, and a convenience .text property.

wrenn.exceptions

WrennError Objects

class WrennError(Exception)

Base exception for all Wrenn SDK errors.

All SDK exceptions inherit from this class, so you can catch WrennError to handle any API error generically.

Attributes:

  • code str - Machine-readable error code from the API (e.g. "not_found").
  • message str - Human-readable error description.
  • status_code int - HTTP status code of the response.

__init__

def __init__(code: str, message: str, status_code: int) -> None

Initialize a WrennError.

Arguments:

  • code str - Machine-readable error code.
  • message str - Human-readable error description.
  • status_code int - HTTP status code of the response.

WrennValidationError Objects

class WrennValidationError(WrennError)

400 — Invalid request parameters.

WrennAuthenticationError Objects

class WrennAuthenticationError(WrennError)

401 — Invalid or missing authentication.

WrennForbiddenError Objects

class WrennForbiddenError(WrennError)

403 — Authenticated but not authorized.

WrennNotFoundError Objects

class WrennNotFoundError(WrennError)

404 — Resource not found.

WrennConflictError Objects

class WrennConflictError(WrennError)

409 — State conflict (e.g. invalid_state).

WrennHostHasCapsulesError Objects

class WrennHostHasCapsulesError(WrennConflictError)

409 — Host still has running capsules.

Attributes:

  • capsule_ids list[str] - IDs of the capsules still running on the host.

__init__

def __init__(code: str, message: str, status_code: int,
             capsule_ids: list[str]) -> None

Initialize a WrennHostHasCapsulesError.

Arguments:

  • code str - Machine-readable error code.
  • message str - Human-readable error description.
  • status_code int - HTTP status code of the response.
  • capsule_ids list[str] - IDs of capsules still on the host.

WrennHostUnavailableError Objects

class WrennHostUnavailableError(WrennError)

503 — No suitable host available.

WrennAgentError Objects

class WrennAgentError(WrennError)

502 — Host agent returned an error.

WrennInternalError Objects

class WrennInternalError(WrennError)

500 — Unexpected server error.

wrenn.async_capsule

AsyncCapsule Objects

class AsyncCapsule()

Async Wrenn capsule with e2b-compatible interface.

Create via classmethod::

capsule = await AsyncCapsule.create(template="minimal")

Use as async context manager::

async with await AsyncCapsule.create() as capsule: await capsule.commands.run("echo hello")

capsule_id

@property
def capsule_id() -> str

The capsule's unique identifier.

Returns:

  • str - Capsule ID assigned by the Wrenn API.

info

@property
def info() -> CapsuleModel | None

Cached capsule metadata from the last API call.

Returns:

CapsuleModel | None: The last-fetched capsule model, or None if the capsule was connected without an initial fetch.

create

@classmethod
async def create(cls,
                 template: str | None = None,
                 vcpus: int | None = None,
                 memory_mb: int | None = None,
                 timeout: int | None = None,
                 *,
                 wait: bool = False,
                 api_key: str | None = None,
                 base_url: str | None = None) -> AsyncCapsule

Create a new capsule.

Arguments:

  • template str | None - Template name to boot from.
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout int | None - Inactivity TTL in seconds before auto-pause.
  • wait bool - Await until the capsule reaches running status.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • AsyncCapsule - A new capsule instance.

connect

@classmethod
async def connect(cls,
                  capsule_id: str,
                  *,
                  api_key: str | None = None,
                  base_url: str | None = None) -> AsyncCapsule

Connect to an existing capsule, resuming it if paused.

Arguments:

  • capsule_id str - ID of the capsule to connect to.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • AsyncCapsule - A capsule instance bound to the existing capsule.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

ping

async def ping() -> None

Reset the capsule inactivity timer.

Call this to prevent the capsule from being auto-paused when the inactivity TTL is set.

wait_ready

async def wait_ready(timeout: float = 30, interval: float = 0.5) -> None

Await until the capsule status is running.

Arguments:

  • timeout float - Maximum seconds to wait. Defaults to 30.
  • interval float - Polling interval in seconds. Defaults to 0.5.

Raises:

  • TimeoutError - If the capsule does not reach running state within timeout seconds.
  • RuntimeError - If the capsule enters an error, stopped, or paused state while waiting.

is_running

async def is_running() -> bool

Check whether the capsule is currently running.

Makes a live API call to fetch current status.

Returns:

  • bool - True if the capsule status is running.

list

@classmethod
async def list(cls,
               *,
               api_key: str | None = None,
               base_url: str | None = None) -> list[CapsuleModel]

List all capsules belonging to the team.

Arguments:

  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • list[CapsuleModel] - All capsules for the authenticated team.

pty

@asynccontextmanager
async def pty(cmd: str = "/bin/bash",
              args: list[str] | None = None,
              cols: int = 80,
              rows: int = 24,
              envs: dict[str, str] | None = None,
              cwd: str | None = None) -> AsyncIterator[AsyncPtySession]

Open an async interactive PTY session backed by a WebSocket.

Use as an async context manager and async iterate over :class:PtyEvent objects::

async with capsule.pty() as term: await term.write(b"echo hello\n") async for event in term: if event.type == "output": print(event.data.decode())

Arguments:

  • cmd str - Command to run inside the PTY. Defaults to "/bin/bash".
  • args list[str] | None - Additional arguments for cmd.
  • cols int - Initial terminal column count. Defaults to 80.
  • rows int - Initial terminal row count. Defaults to 24.
  • envs dict[str, str] | None - Additional environment variables to inject into the process.
  • cwd str | None - Working directory for the process.

Yields:

  • AsyncPtySession - An interactive async PTY session.

pty_connect

@asynccontextmanager
async def pty_connect(tag: str) -> AsyncIterator[AsyncPtySession]

Reconnect to an existing PTY session by tag.

Arguments:

  • tag str - Session tag returned in the started PTY event.

Yields:

  • AsyncPtySession - The reconnected async PTY session.

get_url

def get_url(port: int) -> str

Get the proxy URL for a port exposed inside this capsule.

Arguments:

  • port int - Port number to proxy.

Returns:

  • str - A wss:// (or ws://) URL that proxies to the given port inside the capsule.

create_snapshot

async def create_snapshot(name: str | None = None,
                          overwrite: bool = False) -> Template

Create a snapshot template from this capsule's current state.

Arguments:

  • name str | None - Name for the snapshot template. Auto-generated if not provided.
  • overwrite bool - If True, overwrite an existing template with the same name. Defaults to False.

Returns:

  • Template - The created snapshot template.

wrenn.pty

PtySession Objects

class PtySession()

Interactive PTY session backed by a WebSocket.

Use as a context manager and iterate over events::

with sb.pty(cmd="/bin/bash") as term: term.write(b"ls -la\n") for event in term: if event.type == "output": sys.stdout.buffer.write(event.data) elif event.type == "exit": break

tag

@property
def tag() -> str | None

Session tag. Available after the started event.

pid

@property
def pid() -> int | None

Process PID. Available after the started event.

write

def write(data: bytes) -> None

Send raw bytes to the PTY stdin.

Arguments:

  • data - Raw bytes to send. Base64-encoded internally.

resize

def resize(cols: int, rows: int) -> None

Resize the PTY terminal.

Arguments:

  • cols - New column count. Must be > 0.
  • rows - New row count. Must be > 0.

Raises:

  • ValueError - If cols or rows is 0.

kill

def kill() -> None

Send SIGKILL to the PTY process.

AsyncPtySession Objects

class AsyncPtySession()

Async interactive PTY session backed by a WebSocket.

Use as an async context manager and async iterate over events::

async with sb.pty(cmd="/bin/bash") as term: await term.write(b"ls -la\n") async for event in term: if event.type == "output": sys.stdout.buffer.write(event.data) elif event.type == "exit": break

tag

@property
def tag() -> str | None

Session tag. Available after the started event.

pid

@property
def pid() -> int | None

Process PID. Available after the started event.

write

async def write(data: bytes) -> None

Send raw bytes to the PTY stdin.

Arguments:

  • data - Raw bytes to send. Base64-encoded internally.

resize

async def resize(cols: int, rows: int) -> None

Resize the PTY terminal.

Arguments:

  • cols - New column count. Must be > 0.
  • rows - New row count. Must be > 0.

Raises:

  • ValueError - If cols or rows is 0.

kill

async def kill() -> None

Send SIGKILL to the PTY process.

wrenn.models._generated

Peaks Objects

class Peaks(BaseModel)

Maximum values over the last 30 days.

Series Objects

class Series(BaseModel)

Parallel arrays for chart rendering.

Encoding Objects

class Encoding(StrEnum)

Output encoding. "base64" when stdout/stderr contain binary data.

Type2 Objects

class Type2(StrEnum)

Host type. Regular hosts are shared; BYOC hosts belong to a team.

wrenn.models

wrenn.capsule

Capsule Objects

class Capsule()

A Wrenn capsule (sandbox) with e2b-compatible interface.

Create directly::

capsule = Capsule(api_key="wrn_...") capsule = Capsule(template="minimal") # reads WRENN_API_KEY env

Or via classmethod::

capsule = Capsule.create(template="minimal")

Use as context manager for automatic cleanup::

with Capsule() as capsule: capsule.commands.run("echo hello")

__init__

def __init__(template: str | None = None,
             vcpus: int | None = None,
             memory_mb: int | None = None,
             timeout: int | None = None,
             *,
             wait: bool = False,
             api_key: str | None = None,
             base_url: str | None = None,
             _capsule_id: str | None = None,
             _client: WrennClient | None = None,
             _info: CapsuleModel | None = None) -> None

Create and start a new capsule.

Arguments:

  • template str | None - Template name to boot from. Defaults to the server-side default ("minimal").
  • vcpus int | None - Number of virtual CPUs. Defaults to the server-side default.
  • memory_mb int | None - Memory in MiB. Defaults to the server-side default.
  • timeout int | None - Inactivity TTL in seconds before the capsule is auto-paused. 0 disables auto-pause.
  • wait bool - If True, block until the capsule status is running before returning.
  • api_key str | None - Wrenn API key (wrn_...). Falls back to the WRENN_API_KEY environment variable.
  • base_url str | None - Wrenn API base URL. Falls back to WRENN_BASE_URL or the default production endpoint.

capsule_id

@property
def capsule_id() -> str

The capsule's unique identifier.

Returns:

  • str - Capsule ID assigned by the Wrenn API.

info

@property
def info() -> CapsuleModel | None

Cached capsule metadata from the last API call.

Returns:

CapsuleModel | None: The last-fetched capsule model, or None if the capsule was connected without an initial fetch.

create

@classmethod
def create(cls,
           template: str | None = None,
           vcpus: int | None = None,
           memory_mb: int | None = None,
           timeout: int | None = None,
           *,
           wait: bool = False,
           api_key: str | None = None,
           base_url: str | None = None) -> Capsule

Create a new capsule.

Equivalent to calling Capsule(...) directly.

Arguments:

  • template str | None - Template name to boot from.
  • vcpus int | None - Number of virtual CPUs.
  • memory_mb int | None - Memory in MiB.
  • timeout int | None - Inactivity TTL in seconds before auto-pause.
  • wait bool - Block until the capsule reaches running status.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • Capsule - A new capsule instance.

connect

@classmethod
def connect(cls,
            capsule_id: str,
            *,
            api_key: str | None = None,
            base_url: str | None = None) -> Capsule

Connect to an existing capsule, resuming it if paused.

Arguments:

  • capsule_id str - ID of the capsule to connect to.
  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • Capsule - A capsule instance bound to the existing capsule.

Raises:

  • WrennNotFoundError - If no capsule with the given ID exists.

ping

def ping() -> None

Reset the capsule inactivity timer.

Call this to prevent the capsule from being auto-paused when the inactivity TTL is set.

wait_ready

def wait_ready(timeout: float = 30, interval: float = 0.5) -> None

Block until the capsule status is running.

Arguments:

  • timeout float - Maximum seconds to wait. Defaults to 30.
  • interval float - Polling interval in seconds. Defaults to 0.5.

Raises:

  • TimeoutError - If the capsule does not reach running state within timeout seconds.
  • RuntimeError - If the capsule enters an error, stopped, or paused state while waiting.

is_running

def is_running() -> bool

Check whether the capsule is currently running.

Makes a live API call to fetch current status.

Returns:

  • bool - True if the capsule status is running.

list

@classmethod
def list(cls,
         *,
         api_key: str | None = None,
         base_url: str | None = None) -> list[CapsuleModel]

List all capsules belonging to the team.

Arguments:

  • api_key str | None - Wrenn API key. Falls back to WRENN_API_KEY env var.
  • base_url str | None - API base URL override.

Returns:

  • list[CapsuleModel] - All capsules for the authenticated team.

pty

@contextmanager
def pty(cmd: str = "/bin/bash",
        args: list[str] | None = None,
        cols: int = 80,
        rows: int = 24,
        envs: dict[str, str] | None = None,
        cwd: str | None = None) -> Iterator[PtySession]

Open an interactive PTY session backed by a WebSocket.

Use as a context manager and iterate over :class:PtyEvent objects::

with capsule.pty() as term: term.write(b"echo hello\n") for event in term: if event.type == "output": print(event.data.decode())

Arguments:

  • cmd str - Command to run inside the PTY. Defaults to "/bin/bash".
  • args list[str] | None - Additional arguments for cmd.
  • cols int - Initial terminal column count. Defaults to 80.
  • rows int - Initial terminal row count. Defaults to 24.
  • envs dict[str, str] | None - Additional environment variables to inject into the process.
  • cwd str | None - Working directory for the process.

Yields:

  • PtySession - An interactive PTY session.

pty_connect

@contextmanager
def pty_connect(tag: str) -> Iterator[PtySession]

Reconnect to an existing PTY session by tag.

Arguments:

  • tag str - Session tag returned in the started PTY event.

Yields:

  • PtySession - The reconnected PTY session.

get_url

def get_url(port: int) -> str

Get the proxy URL for a port exposed inside this capsule.

Arguments:

  • port int - Port number to proxy.

Returns:

  • str - A wss:// (or ws://) URL that proxies to the given port inside the capsule.

create_snapshot

def create_snapshot(name: str | None = None,
                    overwrite: bool = False) -> Template

Create a snapshot template from this capsule's current state.

Arguments:

  • name str | None - Name for the snapshot template. Auto-generated if not provided.
  • overwrite bool - If True, overwrite an existing template with the same name. Defaults to False.

Returns:

  • Template - The created snapshot template.

wrenn._config

ConnectionConfig Objects

@dataclass(frozen=True)
class ConnectionConfig()

Resolved credentials and base URL for Wrenn API calls.

wrenn._git._auth

embed_credentials

def embed_credentials(url: str, username: str, password: str) -> str

Embed HTTP(S) credentials into a git URL.

Arguments:

  • url - Git repository URL.
  • username - Username for authentication.
  • password - Password or personal access token.

Returns:

URL with username:password@ embedded in the netloc.

Raises:

  • ValueError - If the URL scheme is not http or https.

strip_credentials

def strip_credentials(url: str) -> str

Remove embedded credentials from a git URL.

Arguments:

  • url - Git repository URL, possibly with credentials.

Returns:

URL with credentials removed. Non-HTTP(S) URLs are returned unchanged.

is_auth_error

def is_auth_error(stderr: str) -> bool

Check whether git stderr indicates an authentication failure.

Arguments:

  • stderr - Combined stderr output from a git command.

Returns:

True if any known auth-failure pattern is found.

build_credential_approve_cmd

def build_credential_approve_cmd(username: str,
                                 password: str,
                                 host: str = "github.com",
                                 protocol: str = "https") -> str

Build a shell command that pipes credentials into git credential approve.

Arguments:

  • username - Git username.
  • password - Password or personal access token.
  • host - Target host. Defaults to "github.com".
  • protocol - Protocol. Defaults to "https".

Returns:

A shell command string safe to pass to commands.run().

wrenn._git._cmd

Pure functions that build git argument lists and parse git output.

No I/O, no network, no imports from wrenn. Every build_* function returns a list[str] suitable for shlex.join(). Every parse_* function takes raw stdout and returns a typed structure.

FileStatus Objects

@dataclass
class FileStatus()

A single entry from git status --porcelain=v1.

Attributes:

  • path str - File path relative to the repository root.
  • index_status str - Index (staged) status character.
  • work_tree_status str - Working-tree status character.
  • renamed_from str | None - Original path when status is a rename.

staged

@property
def staged() -> bool

Whether the change is staged in the index.

status

@property
def status() -> str

Normalized human-readable status label.

GitStatus Objects

@dataclass
class GitStatus()

Parsed output of git status --porcelain=v1 --branch.

Attributes:

  • branch str | None - Current branch name, or None if detached.
  • upstream str | None - Upstream tracking branch.
  • ahead int - Commits ahead of upstream.
  • behind int - Commits behind upstream.
  • detached bool - Whether HEAD is detached.
  • files list[FileStatus] - Per-file status entries.

is_clean

@property
def is_clean() -> bool

True when there are no changed or untracked files.

has_staged

@property
def has_staged() -> bool

True when at least one file has staged changes.

has_untracked

@property
def has_untracked() -> bool

True when at least one file is untracked.

has_conflicts

@property
def has_conflicts() -> bool

True when at least one file has merge conflicts.

GitBranch Objects

@dataclass
class GitBranch()

A single branch entry.

Attributes:

  • name str - Branch name (short ref).
  • is_current bool - Whether this is the checked-out branch.

build_clone

def build_clone(url: str,
                dest: str | None = None,
                *,
                branch: str | None = None,
                depth: int | None = None) -> list[str]

Build git clone arguments.

build_init

def build_init(path: str = ".",
               *,
               bare: bool = False,
               initial_branch: str | None = None) -> list[str]

Build git init arguments.

build_add

def build_add(paths: list[str] | None = None,
              *,
              all: bool = False) -> list[str]

Build git add arguments.

build_commit

def build_commit(message: str,
                 *,
                 allow_empty: bool = False,
                 author_name: str | None = None,
                 author_email: str | None = None) -> list[str]

Build git commit arguments.

build_push

def build_push(remote: str = "origin",
               branch: str | None = None,
               *,
               force: bool = False,
               set_upstream: bool = False) -> list[str]

Build git push arguments.

build_pull

def build_pull(remote: str = "origin",
               branch: str | None = None,
               *,
               rebase: bool = False,
               ff_only: bool = False) -> list[str]

Build git pull arguments.

build_status

def build_status() -> list[str]

Build git status arguments for porcelain parsing.

build_branches

def build_branches() -> list[str]

Build git branch arguments for structured parsing.

build_create_branch

def build_create_branch(name: str,
                        *,
                        start_point: str | None = None) -> list[str]

Build git checkout -b arguments.

build_checkout

def build_checkout(name: str) -> list[str]

Build git checkout arguments.

build_delete_branch

def build_delete_branch(name: str, *, force: bool = False) -> list[str]

Build git branch -d/-D arguments.

build_remote_add

def build_remote_add(name: str, url: str, *, fetch: bool = False) -> list[str]

Build git remote add arguments.

build_remote_get_url

def build_remote_get_url(name: str = "origin") -> list[str]

Build git remote get-url arguments.

build_remote_set_url

def build_remote_set_url(name: str, url: str) -> list[str]

Build git remote set-url arguments.

build_reset

def build_reset(*,
                mode: str | None = None,
                ref: str | None = None,
                paths: list[str] | None = None) -> list[str]

Build git reset arguments.

Arguments:

  • mode - Reset mode (soft, mixed, hard, merge, keep).
  • ref - Commit, branch, or ref to reset to.
  • paths - Paths to reset (mutually exclusive with mode).

build_restore

def build_restore(paths: list[str],
                  *,
                  staged: bool = False,
                  worktree: bool = False,
                  source: str | None = None) -> list[str]

Build git restore arguments.

Arguments:

  • paths - Paths to restore.
  • staged - Restore the index (unstage).
  • worktree - Restore working-tree files.
  • source - Commit or ref to restore from.

build_config_set

def build_config_set(key: str,
                     value: str,
                     *,
                     scope: str = "local",
                     repo_path: str | None = None) -> list[str]

Build git config set arguments.

build_config_get

def build_config_get(key: str,
                     *,
                     scope: str = "local",
                     repo_path: str | None = None) -> list[str]

Build git config --get arguments.

build_has_upstream

def build_has_upstream() -> list[str]

Build arguments to check if current branch has upstream tracking.

parse_status

def parse_status(stdout: str) -> GitStatus

Parse git status --porcelain=v1 --branch output.

Arguments:

  • stdout - Raw stdout from the git status command.

Returns:

Parsed :class:GitStatus.

parse_branches

def parse_branches(stdout: str) -> list[GitBranch]

Parse git branch --format=%(refname:short)\t%(HEAD) output.

Arguments:

  • stdout - Raw stdout from the git branch command.

Returns:

List of :class:GitBranch.

wrenn._git.exceptions

GitError Objects

class GitError(Exception)

Base exception for all git operations inside a capsule.

Not a subclass of :class:WrennError because git errors originate from a process exit code, not an HTTP response.

Attributes:

  • message str - Human-readable error description.
  • stderr str - Raw stderr output from the git process.
  • exit_code int - Process exit code.

GitCommandError Objects

class GitCommandError(GitError)

A git command exited with a non-zero exit code.

GitAuthError Objects

class GitAuthError(GitError)

Authentication failed when communicating with a remote.

wrenn._git

Git operations inside a Wrenn capsule.

Provides :class:Git (sync) and :class:AsyncGit (async) interfaces accessed via capsule.git. All operations execute the real git binary inside the capsule through :class:~wrenn.commands.Commands.

Git Objects

class Git()

Sync git interface. Accessed via capsule.git.

Executes the real git binary inside the capsule through :meth:Commands.run. Methods raise :class:GitCommandError (or :class:GitAuthError) on non-zero exit codes.

clone

def clone(url: str,
          dest: str | None = None,
          *,
          branch: str | None = None,
          depth: int | None = None,
          username: str | None = None,
          password: str | None = None,
          dangerously_store_credentials: bool = False,
          cwd: str | None = None,
          envs: dict[str, str] | None = None,
          timeout: int | None = 300) -> CommandResult

Clone a remote repository into the capsule.

Arguments:

  • url - Remote repository URL.
  • dest - Destination path. Defaults to the repository name derived from the URL.
  • branch - Branch or tag to check out.
  • depth - Create a shallow clone with this many commits.
  • username - Username for HTTP(S) authentication.
  • password - Password or token for HTTP(S) authentication.
  • dangerously_store_credentials - If True, leave credentials embedded in the remote URL after cloning.
  • cwd - Working directory for the command.
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds. Defaults to 300.

Returns:

Command result with stdout, stderr, exit_code, and duration.

Raises:

  • GitAuthError - If the remote rejected authentication.
  • GitCommandError - If clone failed for another reason.
  • ValueError - If password is provided without username.

init

def init(path: str = ".",
         *,
         bare: bool = False,
         initial_branch: str | None = None,
         cwd: str | None = None,
         envs: dict[str, str] | None = None,
         timeout: int | None = 30) -> CommandResult

Initialize a new git repository.

Arguments:

  • path - Destination path for the repository.
  • bare - Create a bare repository.
  • initial_branch - Name for the initial branch (e.g. "main").
  • cwd - Working directory for the command.
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If init failed.

add

def add(paths: list[str] | None = None,
        *,
        all: bool = False,
        cwd: str | None = None,
        envs: dict[str, str] | None = None,
        timeout: int | None = 30) -> CommandResult

Stage files for commit.

Arguments:

  • paths - Specific files to stage. If None, stages the current directory (or all with all=True).
  • all - Stage all changes including untracked files.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If add failed.

commit

def commit(message: str,
           *,
           allow_empty: bool = False,
           author_name: str | None = None,
           author_email: str | None = None,
           cwd: str | None = None,
           envs: dict[str, str] | None = None,
           timeout: int | None = 30) -> CommandResult

Create a commit.

Arguments:

  • message - Commit message.
  • allow_empty - Allow creating a commit with no changes.
  • author_name - Override the commit author name.
  • author_email - Override the commit author email.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If commit failed.

push

def push(remote: str = "origin",
         branch: str | None = None,
         *,
         force: bool = False,
         set_upstream: bool = False,
         username: str | None = None,
         password: str | None = None,
         cwd: str | None = None,
         envs: dict[str, str] | None = None,
         timeout: int | None = 60) -> CommandResult

Push commits to a remote.

Arguments:

  • remote - Remote name. Defaults to "origin".
  • branch - Branch to push. Defaults to the current branch.
  • force - Force-push.
  • set_upstream - Set upstream tracking reference.
  • username - Username for HTTP(S) authentication.
  • password - Password or token for HTTP(S) authentication.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitAuthError - If authentication failed.
  • GitCommandError - If push failed.

pull

def pull(remote: str = "origin",
         branch: str | None = None,
         *,
         rebase: bool = False,
         ff_only: bool = False,
         username: str | None = None,
         password: str | None = None,
         cwd: str | None = None,
         envs: dict[str, str] | None = None,
         timeout: int | None = 60) -> CommandResult

Pull changes from a remote.

Arguments:

  • remote - Remote name. Defaults to "origin".
  • branch - Branch to pull.
  • rebase - Rebase instead of merge.
  • ff_only - Only allow fast-forward merges.
  • username - Username for HTTP(S) authentication.
  • password - Password or token for HTTP(S) authentication.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitAuthError - If authentication failed.
  • GitCommandError - If pull failed.

status

def status(*,
           cwd: str | None = None,
           envs: dict[str, str] | None = None,
           timeout: int | None = 30) -> GitStatus

Get repository status.

Arguments:

  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Parsed :class:GitStatus with branch info and file changes.

Raises:

  • GitCommandError - If the command failed.

branches

def branches(*,
             cwd: str | None = None,
             envs: dict[str, str] | None = None,
             timeout: int | None = 30) -> list[GitBranch]

List local branches.

Arguments:

  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

List of :class:GitBranch.

Raises:

  • GitCommandError - If the command failed.

create_branch

def create_branch(name: str,
                  *,
                  start_point: str | None = None,
                  cwd: str | None = None,
                  envs: dict[str, str] | None = None,
                  timeout: int | None = 30) -> CommandResult

Create and check out a new branch.

Arguments:

  • name - Branch name.
  • start_point - Commit or ref to branch from.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

checkout_branch

def checkout_branch(name: str,
                    *,
                    cwd: str | None = None,
                    envs: dict[str, str] | None = None,
                    timeout: int | None = 30) -> CommandResult

Check out an existing branch.

Arguments:

  • name - Branch name.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

delete_branch

def delete_branch(name: str,
                  *,
                  force: bool = False,
                  cwd: str | None = None,
                  envs: dict[str, str] | None = None,
                  timeout: int | None = 30) -> CommandResult

Delete a branch.

Arguments:

  • name - Branch name.
  • force - Force-delete with -D.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

remote_add

def remote_add(name: str,
               url: str,
               *,
               fetch: bool = False,
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 30) -> CommandResult

Add a remote.

Arguments:

  • name - Remote name (e.g. "origin").
  • url - Remote URL.
  • fetch - Fetch after adding.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

remote_get

def remote_get(name: str = "origin",
               *,
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 30) -> str | None

Get the URL of a remote.

Returns None if the remote does not exist rather than raising.

Arguments:

  • name - Remote name. Defaults to "origin".
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Remote URL or None.

reset

def reset(*,
          mode: str | None = None,
          ref: str | None = None,
          paths: list[str] | None = None,
          cwd: str | None = None,
          envs: dict[str, str] | None = None,
          timeout: int | None = 30) -> CommandResult

Reset the current HEAD.

Arguments:

  • mode - Reset mode (soft, mixed, hard, merge, keep).
  • ref - Commit, branch, or ref to reset to.
  • paths - Paths to reset.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

restore

def restore(paths: list[str],
            *,
            staged: bool = False,
            worktree: bool = False,
            source: str | None = None,
            cwd: str | None = None,
            envs: dict[str, str] | None = None,
            timeout: int | None = 30) -> CommandResult

Restore working-tree files or unstage changes.

Arguments:

  • paths - Paths to restore.
  • staged - Restore the index (unstage).
  • worktree - Restore working-tree files.
  • source - Commit or ref to restore from.
  • cwd - Working directory (repository root).
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

set_config

def set_config(key: str,
               value: str,
               *,
               scope: str = "local",
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 30) -> CommandResult

Set a git config value.

Arguments:

  • key - Config key (e.g. "user.name").
  • value - Config value.
  • scope - Config scope: "local", "global", or "system".
  • cwd - Working directory (repository root). Required when scope is "local".
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Command result.

Raises:

  • GitCommandError - If the command failed.

get_config

def get_config(key: str,
               *,
               scope: str = "local",
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 30) -> str | None

Get a git config value.

Returns None if the key is not set rather than raising.

Arguments:

  • key - Config key (e.g. "user.name").
  • scope - Config scope: "local", "global", or "system".
  • cwd - Working directory (repository root). Required when scope is "local".
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Returns:

Config value or None.

configure_user

def configure_user(name: str,
                   email: str,
                   *,
                   scope: str = "global",
                   cwd: str | None = None,
                   envs: dict[str, str] | None = None,
                   timeout: int | None = 30) -> None

Configure git user name and email.

Arguments:

  • name - Git user name.
  • email - Git user email.
  • scope - Config scope. Defaults to "global".
  • cwd - Working directory (repository root). Required when scope is "local".
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Raises:

  • ValueError - If name or email is empty.
  • GitCommandError - If a config command failed.

dangerously_authenticate

def dangerously_authenticate(username: str,
                             password: str,
                             host: str = "github.com",
                             protocol: str = "https",
                             *,
                             cwd: str | None = None,
                             envs: dict[str, str] | None = None,
                             timeout: int | None = 30) -> None

Persist git credentials via the credential store.

.. warning::

Credentials are written in plain text to the capsule filesystem and are accessible to any process running inside the capsule. Prefer per-operation username/password parameters on :meth:clone, :meth:push, and :meth:pull instead.

Arguments:

  • username - Git username.
  • password - Password or personal access token.
  • host - Target host. Defaults to "github.com".
  • protocol - Protocol. Defaults to "https".
  • cwd - Working directory.
  • envs - Extra environment variables.
  • timeout - Command timeout in seconds.

Raises:

  • ValueError - If username or password is empty.
  • GitCommandError - If a command failed.

AsyncGit Objects

class AsyncGit()

Async git interface. Accessed via capsule.git.

Async mirror of :class:Git. See that class for full method documentation.

clone

async def clone(url: str,
                dest: str | None = None,
                *,
                branch: str | None = None,
                depth: int | None = None,
                username: str | None = None,
                password: str | None = None,
                dangerously_store_credentials: bool = False,
                cwd: str | None = None,
                envs: dict[str, str] | None = None,
                timeout: int | None = 300) -> CommandResult

Clone a remote repository into the capsule.

init

async def init(path: str = ".",
               *,
               bare: bool = False,
               initial_branch: str | None = None,
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 30) -> CommandResult

Initialize a new git repository.

add

async def add(paths: list[str] | None = None,
              *,
              all: bool = False,
              cwd: str | None = None,
              envs: dict[str, str] | None = None,
              timeout: int | None = 30) -> CommandResult

Stage files for commit.

commit

async def commit(message: str,
                 *,
                 allow_empty: bool = False,
                 author_name: str | None = None,
                 author_email: str | None = None,
                 cwd: str | None = None,
                 envs: dict[str, str] | None = None,
                 timeout: int | None = 30) -> CommandResult

Create a commit.

push

async def push(remote: str = "origin",
               branch: str | None = None,
               *,
               force: bool = False,
               set_upstream: bool = False,
               username: str | None = None,
               password: str | None = None,
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 60) -> CommandResult

Push commits to a remote.

pull

async def pull(remote: str = "origin",
               branch: str | None = None,
               *,
               rebase: bool = False,
               ff_only: bool = False,
               username: str | None = None,
               password: str | None = None,
               cwd: str | None = None,
               envs: dict[str, str] | None = None,
               timeout: int | None = 60) -> CommandResult

Pull changes from a remote.

status

async def status(*,
                 cwd: str | None = None,
                 envs: dict[str, str] | None = None,
                 timeout: int | None = 30) -> GitStatus

Get repository status.

branches

async def branches(*,
                   cwd: str | None = None,
                   envs: dict[str, str] | None = None,
                   timeout: int | None = 30) -> list[GitBranch]

List local branches.

create_branch

async def create_branch(name: str,
                        *,
                        start_point: str | None = None,
                        cwd: str | None = None,
                        envs: dict[str, str] | None = None,
                        timeout: int | None = 30) -> CommandResult

Create and check out a new branch.

checkout_branch

async def checkout_branch(name: str,
                          *,
                          cwd: str | None = None,
                          envs: dict[str, str] | None = None,
                          timeout: int | None = 30) -> CommandResult

Check out an existing branch.

delete_branch

async def delete_branch(name: str,
                        *,
                        force: bool = False,
                        cwd: str | None = None,
                        envs: dict[str, str] | None = None,
                        timeout: int | None = 30) -> CommandResult

Delete a branch.

remote_add

async def remote_add(name: str,
                     url: str,
                     *,
                     fetch: bool = False,
                     cwd: str | None = None,
                     envs: dict[str, str] | None = None,
                     timeout: int | None = 30) -> CommandResult

Add a remote.

remote_get

async def remote_get(name: str = "origin",
                     *,
                     cwd: str | None = None,
                     envs: dict[str, str] | None = None,
                     timeout: int | None = 30) -> str | None

Get the URL of a remote. Returns None if not found.

reset

async def reset(*,
                mode: str | None = None,
                ref: str | None = None,
                paths: list[str] | None = None,
                cwd: str | None = None,
                envs: dict[str, str] | None = None,
                timeout: int | None = 30) -> CommandResult

Reset the current HEAD.

restore

async def restore(paths: list[str],
                  *,
                  staged: bool = False,
                  worktree: bool = False,
                  source: str | None = None,
                  cwd: str | None = None,
                  envs: dict[str, str] | None = None,
                  timeout: int | None = 30) -> CommandResult

Restore working-tree files or unstage changes.

set_config

async def set_config(key: str,
                     value: str,
                     *,
                     scope: str = "local",
                     cwd: str | None = None,
                     envs: dict[str, str] | None = None,
                     timeout: int | None = 30) -> CommandResult

Set a git config value.

get_config

async def get_config(key: str,
                     *,
                     scope: str = "local",
                     cwd: str | None = None,
                     envs: dict[str, str] | None = None,
                     timeout: int | None = 30) -> str | None

Get a git config value. Returns None if not set.

configure_user

async def configure_user(name: str,
                         email: str,
                         *,
                         scope: str = "global",
                         cwd: str | None = None,
                         envs: dict[str, str] | None = None,
                         timeout: int | None = 30) -> None

Configure git user name and email.

dangerously_authenticate

async def dangerously_authenticate(username: str,
                                   password: str,
                                   host: str = "github.com",
                                   protocol: str = "https",
                                   *,
                                   cwd: str | None = None,
                                   envs: dict[str, str] | None = None,
                                   timeout: int | None = 30) -> None

Persist git credentials via the credential store.

.. warning::

Credentials are written in plain text to the capsule filesystem. Prefer per-operation username/password parameters instead.