## What's New? - Updated the SDK to support v0.2.0 - Improved the test suite - Minor bugfix - No breaking changes Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com> Reviewed-on: #9 Co-authored-by: pptx704 <rafeed@omukk.dev> Co-committed-by: pptx704 <rafeed@omukk.dev>
91 KiB
wrenn
wrenn.client
CapsulesResource Objects
class CapsulesResource()
Sync capsule control-plane operations.
create
def create(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout_sec: int | None = None) -> CapsuleModel
Create a new capsule.
Arguments:
templatestr | None - Template name to boot from.vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeout_secint | None - Inactivity TTL in seconds before auto-pause.0disables auto-pause.
Returns:
CapsuleModel- The newly created capsule.
list
def list() -> list[CapsuleModel]
List all capsules for the authenticated team.
Returns:
list[CapsuleModel]- All capsules belonging to the team.
get
def get(id: str) -> CapsuleModel
Get a capsule by ID.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Current state of the capsule.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
destroy
def destroy(id: str) -> None
Destroy a capsule permanently.
Arguments:
idstr - Capsule ID.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
pause
def pause(id: str) -> CapsuleModel
Pause a running capsule.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Updated capsule state.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
resume
def resume(id: str) -> CapsuleModel
Resume a paused capsule.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Updated capsule state.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
ping
def ping(id: str) -> None
Reset the inactivity timer for a capsule.
Arguments:
idstr - Capsule ID.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
AsyncCapsulesResource Objects
class AsyncCapsulesResource()
Async capsule control-plane operations.
create
async def create(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout_sec: int | None = None) -> CapsuleModel
Create a new capsule.
Arguments:
templatestr | None - Template name to boot from.vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeout_secint | None - Inactivity TTL in seconds before auto-pause.0disables auto-pause.
Returns:
CapsuleModel- The newly created capsule.
list
async def list() -> list[CapsuleModel]
List all capsules for the authenticated team.
Returns:
list[CapsuleModel]- All capsules belonging to the team.
get
async def get(id: str) -> CapsuleModel
Get a capsule by ID.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Current state of the capsule.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
destroy
async def destroy(id: str) -> None
Destroy a capsule permanently.
Arguments:
idstr - Capsule ID.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
pause
async def pause(id: str) -> CapsuleModel
Pause a running capsule.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Updated capsule state.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
resume
async def resume(id: str) -> CapsuleModel
Resume a paused capsule.
Arguments:
idstr - Capsule ID.
Returns:
CapsuleModel- Updated capsule state.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
ping
async def ping(id: str) -> None
Reset the inactivity timer for a capsule.
Arguments:
idstr - Capsule ID.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
SnapshotsResource Objects
class SnapshotsResource()
Sync snapshot operations.
create
def create(capsule_id: str,
name: str | None = None,
overwrite: bool = False) -> Template
Create a snapshot template from a running capsule.
Arguments:
capsule_idstr - ID of the capsule to snapshot.namestr | None - Name for the snapshot template. Auto-generated if not provided.overwritebool - IfTrue, overwrite an existing template with the same name. Defaults toFalse.
Returns:
Template- The created snapshot template.
list
def list(type: str | None = None) -> list[Template]
List snapshot templates.
Arguments:
typestr | None - Filter by template type. Returns all templates if not provided.
Returns:
list[Template]- Matching snapshot templates.
delete
def delete(name: str) -> None
Delete a snapshot template by name.
Arguments:
namestr - Template name to delete.
Raises:
WrennNotFoundError- If no template with the given name exists.
AsyncSnapshotsResource Objects
class AsyncSnapshotsResource()
Async snapshot operations.
create
async def create(capsule_id: str,
name: str | None = None,
overwrite: bool = False) -> Template
Create a snapshot template from a running capsule.
Arguments:
capsule_idstr - ID of the capsule to snapshot.namestr | None - Name for the snapshot template. Auto-generated if not provided.overwritebool - IfTrue, overwrite an existing template with the same name. Defaults toFalse.
Returns:
Template- The created snapshot template.
list
async def list(type: str | None = None) -> list[Template]
List snapshot templates.
Arguments:
typestr | None - Filter by template type. Returns all templates if not provided.
Returns:
list[Template]- Matching snapshot templates.
delete
async def delete(name: str) -> None
Delete a snapshot template by name.
Arguments:
namestr - Template name to delete.
Raises:
WrennNotFoundError- If no template with the given name exists.
WrennClient Objects
class WrennClient()
Synchronous client for the Wrenn API.
Authenticates with an API key.
Arguments:
api_key- API key (wrn_...). Falls back toWRENN_API_KEYenv var.base_url- Wrenn API base URL. Falls back toWRENN_BASE_URLenv var.proxy_domain- Host suffix for capsule proxy URLs ({port}-{capsule_id}.<domain>). Falls back toWRENN_PROXY_DOMAINenv, thenwrenn.devwhenbase_urlis the defaultapp.wrenn.devhost, else thebase_urlhost.timeout- HTTP timeout. Acceptshttpx.Timeout, a float (seconds), orNonefor the default (30s read/write/pool, 10s connect).
http
@property
def http() -> httpx.Client
The underlying httpx.Client (for sub-objects that need direct access).
close
def close() -> None
Close the underlying HTTP connection pool.
AsyncWrennClient Objects
class AsyncWrennClient()
Asynchronous client for the Wrenn API.
Authenticates with an API key.
Arguments:
api_key- API key (wrn_...). Falls back toWRENN_API_KEYenv var.base_url- Wrenn API base URL. Falls back toWRENN_BASE_URLenv var.proxy_domain- Host suffix for capsule proxy URLs ({port}-{capsule_id}.<domain>). Falls back toWRENN_PROXY_DOMAINenv, thenwrenn.devwhenbase_urlis the defaultapp.wrenn.devhost, else thebase_urlhost.timeout- HTTP timeout. Acceptshttpx.Timeout, a float (seconds), orNonefor the default (30s read/write/pool, 10s connect).
http
@property
def http() -> httpx.AsyncClient
The underlying httpx.AsyncClient.
aclose
async def aclose() -> None
Close the underlying async HTTP connection pool.
wrenn.sandbox
wrenn.commands
CommandResult Objects
@dataclass
class CommandResult()
Result from a foreground command execution.
CommandHandle Objects
@dataclass
class CommandHandle()
Handle for a background process.
ProcessInfo Objects
@dataclass
class ProcessInfo()
Information about a running process.
StreamEvent Objects
class StreamEvent()
Base class for streaming exec events.
Commands Objects
class Commands()
Sync command execution interface. Accessed via capsule.commands.
run
def run(cmd: str,
*,
background: bool = False,
timeout: int | None = 30,
envs: dict[str, str] | None = None,
cwd: str | None = None,
tag: str | None = None) -> CommandResult | CommandHandle
Execute a shell command inside the capsule.
Arguments:
cmdstr - Shell command string to execute.backgroundbool - IfTrue, launch the process in the background and return a :class:CommandHandleimmediately. Defaults toFalse.timeoutint | None - Seconds before the foreground command times out. Ignored for background commands. Defaults to30.envsdict[str, str] | None - Additional environment variables to set for the process.cwdstr | None - Working directory for the process.tagstr | None - Optional label attached to background processes for later retrieval via :meth:connect.
Returns:
-
CommandResult- stdout, stderr, exit code, and duration for foreground commands (background=False). -
CommandHandle- PID and tag for background commands (background=True).
list
def list() -> list[ProcessInfo]
List all running background processes in the capsule.
Returns:
list[ProcessInfo]- Running processes with their PID, tag, and command information.
kill
def kill(pid: int) -> None
Send SIGKILL to a background process.
Arguments:
pidint - PID of the process to kill.
Raises:
WrennNotFoundError- If no process with the given PID exists.
connect
def connect(pid: int) -> Iterator[StreamEvent]
Connect to a running background process and stream its output.
Arguments:
pidint - PID of the background process to attach to.
Yields:
StreamEvent- Successive output events. Stops on :class:StreamExitEventor :class:StreamErrorEvent.
stream
def stream(cmd: str,
args: builtins.list[str] | None = None) -> Iterator[StreamEvent]
Execute a command via WebSocket, streaming output as events.
Arguments:
cmdstr - Command to execute.argslist[str] | None - Additional arguments for the command. When omitted, cmd is interpreted as a shell command string and executed via/bin/sh -c.
Yields:
StreamEvent- Successive events including :class:StreamStartEvent, :class:StreamStdoutEvent, :class:StreamStderrEvent, :class:StreamExitEvent, and :class:StreamErrorEvent.
AsyncCommands Objects
class AsyncCommands()
Async command execution interface. Accessed via capsule.commands.
run
async def run(cmd: str,
*,
background: bool = False,
timeout: int | None = 30,
envs: dict[str, str] | None = None,
cwd: str | None = None,
tag: str | None = None) -> CommandResult | CommandHandle
Execute a shell command inside the capsule.
Arguments:
cmdstr - Shell command string to execute.backgroundbool - IfTrue, launch the process in the background and return a :class:CommandHandleimmediately. Defaults toFalse.timeoutint | None - Seconds before the foreground command times out. Ignored for background commands. Defaults to30.envsdict[str, str] | None - Additional environment variables to set for the process.cwdstr | None - Working directory for the process.tagstr | None - Optional label attached to background processes for later retrieval via :meth:connect.
Returns:
-
CommandResult- stdout, stderr, exit code, and duration for foreground commands (background=False). -
CommandHandle- PID and tag for background commands (background=True).
list
async def list() -> list[ProcessInfo]
List all running background processes in the capsule.
Returns:
list[ProcessInfo]- Running processes with their PID, tag, and command information.
kill
async def kill(pid: int) -> None
Send SIGKILL to a background process.
Arguments:
pidint - PID of the process to kill.
Raises:
WrennNotFoundError- If no process with the given PID exists.
connect
async def connect(pid: int) -> AsyncIterator[StreamEvent]
Connect to a running background process and stream its output.
Arguments:
pidint - PID of the background process to attach to.
Yields:
StreamEvent- Successive output events. Stops on :class:StreamExitEventor :class:StreamErrorEvent.
stream
async def stream(
cmd: str,
args: builtins.list[str] | None = None) -> AsyncIterator[StreamEvent]
Execute a command via WebSocket, streaming output as events.
Arguments:
cmdstr - Command to execute.argslist[str] | None - Additional arguments for the command. When omitted, cmd is interpreted as a shell command string and executed via/bin/sh -c.
Yields:
StreamEvent- Successive events including :class:StreamStartEvent, :class:StreamStdoutEvent, :class:StreamStderrEvent, :class:StreamExitEvent, and :class:StreamErrorEvent.
wrenn.files
Files Objects
class Files()
Sync filesystem interface. Accessed via capsule.files.
read
def read(path: str) -> str
Read a file as a UTF-8 string.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Returns:
str- File contents decoded as UTF-8.
Raises:
WrennNotFoundError- If the path does not exist.
read_bytes
def read_bytes(path: str) -> bytes
Read a file as raw bytes.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Returns:
bytes- Raw file contents.
Raises:
WrennNotFoundError- If the path does not exist.
write
def write(path: str, data: str | bytes) -> None
Write data to a file inside the capsule.
Creates parent directories if they do not exist.
Arguments:
pathstr - Absolute destination path inside the capsule.datastr | bytes - Content to write. Strings are UTF-8 encoded.
list
def list(path: str, depth: int = 1) -> list[FileEntry]
List directory contents.
Arguments:
pathstr - Absolute path to the directory inside the capsule.depthint - Recursion depth.1lists only immediate children. Defaults to1.
Returns:
list[FileEntry]- Entries in the directory.
Raises:
WrennNotFoundError- If the path does not exist.
exists
def exists(path: str) -> bool
Check whether a path exists inside the capsule.
Arguments:
pathstr - Absolute path to check.
Returns:
bool-Trueif the path exists.
make_dir
def make_dir(path: str) -> FileEntry
Create a directory (with parents). Idempotent.
Arguments:
pathstr - Absolute path of the directory to create.
Returns:
FileEntry- The created (or already-existing) directory entry.
remove
def remove(path: str) -> None
Remove a file or directory recursively.
Arguments:
pathstr - Absolute path to remove.
Raises:
WrennNotFoundError- If the path does not exist.
upload_stream
def upload_stream(path: str, stream: Iterator[bytes]) -> None
Stream a large file into the capsule.
Prefer this over :meth:write when the file is too large to hold in
memory.
Arguments:
pathstr - Absolute destination path inside the capsule.streamIterator[bytes] - Iterable of byte chunks to upload.
download_stream
def download_stream(path: str) -> Iterator[bytes]
Stream a large file out of the capsule.
Prefer this over :meth:read_bytes when the file is too large to hold
in memory.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Yields:
bytes- Successive byte chunks of the file.
Raises:
WrennNotFoundError- If the path does not exist.
AsyncFiles Objects
class AsyncFiles()
Async filesystem interface. Accessed via capsule.files.
read
async def read(path: str) -> str
Read a file as a UTF-8 string.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Returns:
str- File contents decoded as UTF-8.
Raises:
WrennNotFoundError- If the path does not exist.
read_bytes
async def read_bytes(path: str) -> bytes
Read a file as raw bytes.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Returns:
bytes- Raw file contents.
Raises:
WrennNotFoundError- If the path does not exist.
write
async def write(path: str, data: str | bytes) -> None
Write data to a file inside the capsule.
Creates parent directories if they do not exist.
Arguments:
pathstr - Absolute destination path inside the capsule.datastr | bytes - Content to write. Strings are UTF-8 encoded.
list
async def list(path: str, depth: int = 1) -> list[FileEntry]
List directory contents.
Arguments:
pathstr - Absolute path to the directory inside the capsule.depthint - Recursion depth.1lists only immediate children. Defaults to1.
Returns:
list[FileEntry]- Entries in the directory.
Raises:
WrennNotFoundError- If the path does not exist.
exists
async def exists(path: str) -> bool
Check whether a path exists inside the capsule.
Arguments:
pathstr - Absolute path to check.
Returns:
bool-Trueif the path exists.
make_dir
async def make_dir(path: str) -> FileEntry
Create a directory (with parents). Idempotent.
Arguments:
pathstr - Absolute path of the directory to create.
Returns:
FileEntry- The created (or already-existing) directory entry.
remove
async def remove(path: str) -> None
Remove a file or directory recursively.
Arguments:
pathstr - Absolute path to remove.
Raises:
WrennNotFoundError- If the path does not exist.
upload_stream
async def upload_stream(path: str, stream: AsyncIterator[bytes]) -> None
Stream a large file into the capsule.
Prefer this over :meth:write when the file is too large to hold in
memory.
Arguments:
pathstr - Absolute destination path inside the capsule.streamAsyncIterator[bytes] - Async iterable of byte chunks to upload.
download_stream
async def download_stream(path: str) -> AsyncIterator[bytes]
Stream a large file out of the capsule.
Prefer this over :meth:read_bytes when the file is too large to hold
in memory.
Arguments:
pathstr - Absolute path to the file inside the capsule.
Yields:
bytes- Successive byte chunks of the file.
Raises:
WrennNotFoundError- If the path does not exist.
wrenn.code_interpreter.models
Deprecated — use :mod:wrenn.code_runner.models.
wrenn.code_interpreter.async_capsule
Deprecated — use :mod:wrenn.code_runner.async_capsule.
wrenn.code_interpreter
Deprecated alias for :mod:wrenn.code_runner.
Importing from wrenn.code_interpreter emits a FutureWarning.
Use wrenn.code_runner instead.
wrenn.code_interpreter.capsule
Deprecated — use :mod:wrenn.code_runner.capsule.
wrenn.exceptions
WrennError Objects
class WrennError(Exception)
Base exception for all Wrenn SDK errors.
All SDK exceptions inherit from this class, so you can catch
WrennError to handle any API error generically.
Attributes:
codestr - Machine-readable error code from the API (e.g."not_found").messagestr - Human-readable error description.status_codeint - HTTP status code of the response.
__init__
def __init__(code: str, message: str, status_code: int) -> None
Initialize a WrennError.
Arguments:
codestr - Machine-readable error code.messagestr - Human-readable error description.status_codeint - HTTP status code of the response.
WrennValidationError Objects
class WrennValidationError(WrennError)
400 — Invalid request parameters.
WrennAuthenticationError Objects
class WrennAuthenticationError(WrennError)
401 — Invalid or missing authentication.
WrennForbiddenError Objects
class WrennForbiddenError(WrennError)
403 — Authenticated but not authorized.
WrennNotFoundError Objects
class WrennNotFoundError(WrennError)
404 — Resource not found.
WrennConflictError Objects
class WrennConflictError(WrennError)
409 — State conflict (e.g. invalid_state).
WrennHostHasCapsulesError Objects
class WrennHostHasCapsulesError(WrennConflictError)
409 — Host still has running capsules.
Attributes:
capsule_idslist[str] - IDs of the capsules still running on the host.
__init__
def __init__(code: str, message: str, status_code: int,
capsule_ids: list[str]) -> None
Initialize a WrennHostHasCapsulesError.
Arguments:
codestr - Machine-readable error code.messagestr - Human-readable error description.status_codeint - HTTP status code of the response.capsule_idslist[str] - IDs of capsules still on the host.
WrennHostUnavailableError Objects
class WrennHostUnavailableError(WrennError)
503 — No suitable host available.
WrennAgentError Objects
class WrennAgentError(WrennError)
502 — Host agent returned an error.
WrennInternalError Objects
class WrennInternalError(WrennError)
500 — Unexpected server error.
wrenn.async_capsule
AsyncCapsule Objects
class AsyncCapsule()
Async Wrenn capsule with e2b-compatible interface.
Create via classmethod::
capsule = await AsyncCapsule.create(template="minimal")
Use as async context manager::
async with await AsyncCapsule.create() as capsule: await capsule.commands.run("echo hello")
capsule_id
@property
def capsule_id() -> str
The capsule's unique identifier.
Returns:
str- Capsule ID assigned by the Wrenn API.
info
@property
def info() -> CapsuleModel | None
Cached capsule metadata from the last API call.
Returns:
CapsuleModel | None: The last-fetched capsule model, or None
if the capsule was connected without an initial fetch.
create
@classmethod
async def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
Create a new capsule.
Arguments:
templatestr | None - Template name to boot from.vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeoutint | None - Inactivity TTL in seconds before auto-pause.waitbool - Await until the capsule reachesrunningstatus.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
AsyncCapsule- A new capsule instance.
connect
@classmethod
async def connect(cls,
capsule_id: str,
*,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
Connect to an existing capsule, resuming it if paused.
Arguments:
capsule_idstr - ID of the capsule to connect to.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
AsyncCapsule- A capsule instance bound to the existing capsule.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
ping
async def ping() -> None
Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the inactivity TTL is set.
wait_ready
async def wait_ready(timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None
Await until capsule status is running.
Raises:
TimeoutError- If capsule does not reachrunningwithintimeout.RuntimeError- If capsule enters error/stopped/missing while waiting.
is_running
async def is_running() -> bool
Check whether the capsule is currently running.
Makes a live API call to fetch current status.
Returns:
bool-Trueif the capsule status isrunning.
list
@classmethod
async def list(cls,
*,
api_key: str | None = None,
base_url: str | None = None) -> list[CapsuleModel]
List all capsules belonging to the team.
Arguments:
api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
list[CapsuleModel]- All capsules for the authenticated team.
pty
@asynccontextmanager
async def pty(cmd: str = "/bin/bash",
args: builtins.list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None) -> AsyncIterator[AsyncPtySession]
Open an async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over
:class:PtyEvent objects::
async with capsule.pty() as term: await term.write(b"echo hello\n") async for event in term: if event.type == "output": print(event.data.decode())
Arguments:
cmdstr - Command to run inside the PTY. Defaults to"/bin/bash".argslist[str] | None - Additional arguments forcmd.colsint - Initial terminal column count. Defaults to80.rowsint - Initial terminal row count. Defaults to24.envsdict[str, str] | None - Additional environment variables to inject into the process.cwdstr | None - Working directory for the process.
Yields:
AsyncPtySession- An interactive async PTY session.
pty_connect
@asynccontextmanager
async def pty_connect(tag: str) -> AsyncIterator[AsyncPtySession]
Reconnect to an existing PTY session by tag.
Arguments:
tagstr - Session tag returned in thestartedPTY event.
Yields:
AsyncPtySession- The reconnected async PTY session.
get_url
def get_url(port: int) -> str
Get the HTTP proxy URL for a port exposed inside this capsule.
Arguments:
portint - Port number to proxy.
Returns:
str- Ahttps://(orhttp://) URL that proxies HTTP requests to the given port inside the capsule. For raw WebSocket access, see the lower-level_build_proxy_urlhelper or thepty()API.
create_snapshot
async def create_snapshot(name: str | None = None,
overwrite: bool = False) -> Template
Create a snapshot template from this capsule's current state.
Arguments:
namestr | None - Name for the snapshot template. Auto-generated if not provided.overwritebool - IfTrue, overwrite an existing template with the same name. Defaults toFalse.
Returns:
Template- The created snapshot template.
wrenn.pty
PtySession Objects
class PtySession()
Interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over events::
with sb.pty(cmd="/bin/bash") as term: term.write(b"ls -la\n") for event in term: if event.type == "output": sys.stdout.buffer.write(event.data) elif event.type == "exit": break
tag
@property
def tag() -> str | None
Session tag. Available after the started event.
pid
@property
def pid() -> int | None
Process PID. Available after the started event.
write
def write(data: bytes) -> None
Send raw bytes to the PTY stdin.
Arguments:
data- Raw bytes to send. Base64-encoded internally.
resize
def resize(cols: int, rows: int) -> None
Resize the PTY terminal.
Arguments:
cols- New column count. Must be > 0.rows- New row count. Must be > 0.
Raises:
ValueError- If cols or rows is 0.
kill
def kill() -> None
Send SIGKILL to the PTY process.
AsyncPtySession Objects
class AsyncPtySession()
Async interactive PTY session backed by a WebSocket.
Use as an async context manager and async iterate over events::
async with sb.pty(cmd="/bin/bash") as term: await term.write(b"ls -la\n") async for event in term: if event.type == "output": sys.stdout.buffer.write(event.data) elif event.type == "exit": break
tag
@property
def tag() -> str | None
Session tag. Available after the started event.
pid
@property
def pid() -> int | None
Process PID. Available after the started event.
write
async def write(data: bytes) -> None
Send raw bytes to the PTY stdin.
Arguments:
data- Raw bytes to send. Base64-encoded internally.
resize
async def resize(cols: int, rows: int) -> None
Resize the PTY terminal.
Arguments:
cols- New column count. Must be > 0.rows- New row count. Must be > 0.
Raises:
ValueError- If cols or rows is 0.
kill
async def kill() -> None
Send SIGKILL to the PTY process.
wrenn.models._generated
SessionResponse Objects
class SessionResponse(BaseModel)
Returned by login, activate, and switch-team. The actual auth credential is the wrenn_sid cookie set on the response. The body carries identity data the SPA needs to bootstrap.
Peaks Objects
class Peaks(BaseModel)
Maximum values over the last 30 days.
Series Objects
class Series(BaseModel)
Parallel arrays for chart rendering.
Encoding Objects
class Encoding(StrEnum)
Output encoding. "base64" when stdout/stderr contain binary data.
Type2 Objects
class Type2(StrEnum)
Host type. Regular hosts are shared; BYOC hosts belong to a team.
Outcome Objects
class Outcome(StrEnum)
Present for action events (capsule.* except state.changed, template.snapshot.*). Absent for host.up/down, capsule.state.changed, and the connected sentinel.
SSEEvent Objects
class SSEEvent(BaseModel)
Wire format of one SSE message body. The event name (event: line) is
the kind and the JSON below is the data: line.
wrenn.models
wrenn.capsule
Capsule Objects
class Capsule()
A Wrenn capsule (sandbox) with e2b-compatible interface.
Create directly::
capsule = Capsule(api_key="wrn_...") capsule = Capsule(template="minimal") # reads WRENN_API_KEY env
Or via classmethod::
capsule = Capsule.create(template="minimal")
Use as context manager for automatic cleanup::
with Capsule() as capsule: capsule.commands.run("echo hello")
__init__
def __init__(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None,
_capsule_id: str | None = None,
_client: WrennClient | None = None,
_info: CapsuleModel | None = None) -> None
Create and start a new capsule.
Arguments:
templatestr | None - Template name to boot from. Defaults to the server-side default ("minimal").vcpusint | None - Number of virtual CPUs. Defaults to the server-side default.memory_mbint | None - Memory in MiB. Defaults to the server-side default.timeoutint | None - Inactivity TTL in seconds before the capsule is auto-paused.0disables auto-pause.waitbool - IfTrue, block until the capsule status isrunningbefore returning.api_keystr | None - Wrenn API key (wrn_...). Falls back to theWRENN_API_KEYenvironment variable.base_urlstr | None - Wrenn API base URL. Falls back toWRENN_BASE_URLor the default production endpoint.
capsule_id
@property
def capsule_id() -> str
The capsule's unique identifier.
Returns:
str- Capsule ID assigned by the Wrenn API.
info
@property
def info() -> CapsuleModel | None
Cached capsule metadata from the last API call.
Returns:
CapsuleModel | None: The last-fetched capsule model, or None
if the capsule was connected without an initial fetch.
create
@classmethod
def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
Create a new capsule.
Equivalent to calling Capsule(...) directly.
Arguments:
templatestr | None - Template name to boot from.vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeoutint | None - Inactivity TTL in seconds before auto-pause.waitbool - Block until the capsule reachesrunningstatus.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
Capsule- A new capsule instance.
connect
@classmethod
def connect(cls,
capsule_id: str,
*,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
Connect to an existing capsule, resuming it if paused.
Arguments:
capsule_idstr - ID of the capsule to connect to.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
Capsule- A capsule instance bound to the existing capsule.
Raises:
WrennNotFoundError- If no capsule with the given ID exists.
ping
def ping() -> None
Reset the capsule inactivity timer.
Call this to prevent the capsule from being auto-paused when the inactivity TTL is set.
wait_ready
def wait_ready(timeout: float = _DEFAULT_WAIT_TIMEOUT) -> None
Block until capsule status is running.
Raises:
TimeoutError- If capsule does not reachrunningwithintimeout.RuntimeError- If capsule enters error/stopped/missing while waiting.
is_running
def is_running() -> bool
Check whether the capsule is currently running.
Makes a live API call to fetch current status.
Returns:
bool-Trueif the capsule status isrunning.
list
@classmethod
def list(cls,
*,
api_key: str | None = None,
base_url: str | None = None) -> list[CapsuleModel]
List all capsules belonging to the team.
Arguments:
api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
list[CapsuleModel]- All capsules for the authenticated team.
pty
@contextmanager
def pty(cmd: str = "/bin/bash",
args: builtins.list[str] | None = None,
cols: int = 80,
rows: int = 24,
envs: dict[str, str] | None = None,
cwd: str | None = None) -> Iterator[PtySession]
Open an interactive PTY session backed by a WebSocket.
Use as a context manager and iterate over :class:PtyEvent objects::
with capsule.pty() as term: term.write(b"echo hello\n") for event in term: if event.type == "output": print(event.data.decode())
Arguments:
cmdstr - Command to run inside the PTY. Defaults to"/bin/bash".argslist[str] | None - Additional arguments forcmd.colsint - Initial terminal column count. Defaults to80.rowsint - Initial terminal row count. Defaults to24.envsdict[str, str] | None - Additional environment variables to inject into the process.cwdstr | None - Working directory for the process.
Yields:
PtySession- An interactive PTY session.
pty_connect
@contextmanager
def pty_connect(tag: str) -> Iterator[PtySession]
Reconnect to an existing PTY session by tag.
Arguments:
tagstr - Session tag returned in thestartedPTY event.
Yields:
PtySession- The reconnected PTY session.
get_url
def get_url(port: int) -> str
Get the HTTP proxy URL for a port exposed inside this capsule.
Arguments:
portint - Port number to proxy.
Returns:
str- Ahttps://(orhttp://) URL that proxies HTTP requests to the given port inside the capsule. For raw WebSocket access, see the lower-level_build_proxy_urlhelper or thepty()API.
create_snapshot
def create_snapshot(name: str | None = None,
overwrite: bool = False) -> Template
Create a snapshot template from this capsule's current state.
Arguments:
namestr | None - Name for the snapshot template. Auto-generated if not provided.overwritebool - IfTrue, overwrite an existing template with the same name. Defaults toFalse.
Returns:
Template- The created snapshot template.
wrenn.code_runner.models
ExecutionError Objects
@dataclass
class ExecutionError()
Error raised during code execution.
Attributes:
name- Exception class name (e.g."NameError").value- Exception message.traceback- Full traceback string.
Logs Objects
@dataclass
class Logs()
Captured stdout/stderr streams.
Each element in the list is one chunk of text as it arrived from the kernel.
Result Objects
@dataclass
class Result()
A single rich output from code execution.
Jupyter cells can produce multiple outputs — one execute_result
(the expression value) and zero or more display_data messages
(from plt.show(), display(), etc.). Each becomes a
Result.
Known MIME types are unpacked into named attributes; anything else
lands in :pyattr:extra.
text
text/plain representation.
html
text/html representation.
markdown
text/markdown representation.
svg
image/svg+xml representation.
png
image/png — base64-encoded.
jpeg
image/jpeg — base64-encoded.
gif
image/gif — base64-encoded.
application/pdf — base64-encoded.
latex
text/latex representation.
json
application/json representation.
javascript
application/javascript representation.
plotly
application/vnd.plotly.v1+json representation.
extra
MIME types not covered by the named fields above.
is_main_result
True when this came from an execute_result message
(i.e. the value of the last expression in the cell). False
for display_data outputs.
from_bundle
@classmethod
def from_bundle(cls,
bundle: dict[str, str],
*,
is_main_result: bool = False) -> Result
Build a Result from a Jupyter MIME bundle dict.
formats
def formats() -> list[str]
Return names of non-None MIME-type fields.
Execution Objects
@dataclass
class Execution()
Complete result of a run_code call.
Attributes:
results- All rich outputs produced by the cell — charts, tables, images, expression values, etc.logs- Captured stdout/stderr text.error- Populated when the cell raised an exception.execution_count- Jupyter execution counter (the[N]number).
timed_out
True when execution was cut short by the timeout parameter
(or by the kernel WebSocket dropping). Pairs with error of name
"Timeout" or "Disconnected".
text
@property
def text() -> str | None
Convenience — text/plain of the main execute_result,
or None if the cell had no expression value.
wrenn.code_runner.async_capsule
AsyncCapsule Objects
class AsyncCapsule(BaseAsyncCapsule)
Async code runner capsule with run_code support.
Uses code-runner-beta template and the wrenn Jupyter
kernelspec by default::
from wrenn.code_runner import AsyncCapsule
capsule = await AsyncCapsule.create() result = await capsule.run_code("print('hello')")
create
@classmethod
async def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> AsyncCapsule
Create a new async code runner capsule.
Arguments:
templatestr | None - Template to boot from. Defaults to"code-runner-beta".vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeoutint | None - Inactivity TTL in seconds before auto-pause.kernelstr | None - Jupyter kernelspec name. Defaults to"wrenn".waitbool - Await until the capsule reachesrunningstatus.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
AsyncCapsule- A new async code runner capsule instance.
run_code
async def run_code(
code: str,
language: str = "python",
timeout: float = 30,
jupyter_timeout: float = 30,
on_result: Callable[[Result], Any] | None = None,
on_stdout: Callable[[str], Any] | None = None,
on_stderr: Callable[[str], Any] | None = None,
on_error: Callable[[ExecutionError], Any] | None = None) -> Execution
Execute code in a persistent Jupyter kernel (async).
Variables, imports, and function definitions survive across calls.
Arguments:
code- Code string to execute.language- Execution backend language. Currently only"python"is supported; passing anything else raisesValueError. To target a non-Python kernel, setkernel=on the capsule constructor.timeout- Maximum seconds to wait for execution to complete.jupyter_timeout- Maximum seconds to wait for Jupyter to become available.on_result- Called for each rich output (charts, images, expression values).on_stdout- Called for each stdout chunk.on_stderr- Called for each stderr chunk.on_error- Called when the cell raises an exception.
Returns:
An :class:Execution with .results, .logs, .error,
and a convenience .text property.
wrenn.code_runner
Code runner — execute code in persistent Jupyter kernels.
Uses the code-runner-beta template and the wrenn Jupyter
kernelspec by default.
Example::
from wrenn.code_runner import Capsule
with Capsule(wait=True) as capsule: result = capsule.run_code("print('hello')") print(result.logs.stdout)
wrenn.code_runner.capsule
Capsule Objects
class Capsule(BaseCapsule)
Code runner capsule with run_code support.
Uses code-runner-beta template and the wrenn Jupyter
kernelspec by default::
from wrenn.code_runner import Capsule
capsule = Capsule() result = capsule.run_code("print('hello')") print(result.logs.stdout) # ["hello\n"]
__init__
def __init__(template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
**kwargs) -> None
Create a code runner capsule.
Arguments:
templatestr | None - Template to boot from. Defaults to"code-runner-beta".vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeoutint | None - Inactivity TTL in seconds before auto-pause.kernelstr | None - Jupyter kernelspec name. Defaults to"wrenn".api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
create
@classmethod
def create(cls,
template: str | None = None,
vcpus: int | None = None,
memory_mb: int | None = None,
timeout: int | None = None,
*,
kernel: str | None = None,
wait: bool = False,
api_key: str | None = None,
base_url: str | None = None) -> Capsule
Create a new code runner capsule.
Arguments:
templatestr | None - Template to boot from. Defaults to"code-runner-beta".vcpusint | None - Number of virtual CPUs.memory_mbint | None - Memory in MiB.timeoutint | None - Inactivity TTL in seconds before auto-pause.kernelstr | None - Jupyter kernelspec name. Defaults to"wrenn".waitbool - Block until the capsule reachesrunningstatus.api_keystr | None - Wrenn API key. Falls back toWRENN_API_KEYenv var.base_urlstr | None - API base URL override.
Returns:
Capsule- A new code runner capsule instance.
run_code
def run_code(
code: str,
language: str = "python",
timeout: float = 30,
jupyter_timeout: float = 30,
on_result: Callable[[Result], Any] | None = None,
on_stdout: Callable[[str], Any] | None = None,
on_stderr: Callable[[str], Any] | None = None,
on_error: Callable[[ExecutionError], Any] | None = None) -> Execution
Execute code in a persistent Jupyter kernel.
Variables, imports, and function definitions survive across calls.
Arguments:
code- Code string to execute.language- Execution backend language. Currently only"python"is supported; passing anything else raisesValueError. To target a non-Python kernel, setkernel=on the capsule constructor.timeout- Maximum seconds to wait for execution to complete.jupyter_timeout- Maximum seconds to wait for Jupyter to become available.on_result- Called for each rich output (charts, images, expression values).on_stdout- Called for each stdout chunk.on_stderr- Called for each stderr chunk.on_error- Called when the cell raises an exception.
Returns:
An :class:Execution with .results, .logs, .error,
and a convenience .text property.
wrenn.code_runner._protocol
Shared Jupyter protocol helpers used by both sync and async capsules.
Pure functions only — no I/O, no sync/async coupling.
build_execute_request
def build_execute_request(code: str) -> dict
Build a Jupyter execute_request message envelope.
Returns:
dict- A fully-formed Jupyter shell-channel message ready to be JSON-serialized over the kernel WebSocket. The caller is expected to readmsg["header"]["msg_id"]to correlate responses.
pick_kernel_id
def pick_kernel_id(kernels: list[dict], kernel_name: str) -> str | None
Return the ID of the first kernel matching kernel_name, else None.
apply_kernel_message
def apply_kernel_message(data: dict, msg_id: str, execution: Execution,
emit_error: Callable[[ExecutionError], None],
on_result: Callable[[Result], Any] | None,
on_stdout: Callable[[str], Any] | None,
on_stderr: Callable[[str], Any] | None) -> bool
Apply one Jupyter IOPub message to execution.
Returns True when the message marks idle (cell done); the caller
should stop reading further messages.
build_ws_url
def build_ws_url(base_url: str,
capsule_id: str,
kernel_id: str,
proxy_domain: str | None = None) -> str
Build the Jupyter kernel WebSocket URL for the given capsule.
wrenn._config
wrenn._git._auth
embed_credentials
def embed_credentials(url: str, username: str, password: str) -> str
Embed HTTP(S) credentials into a git URL.
Arguments:
url- Git repository URL.username- Username for authentication.password- Password or personal access token.
Returns:
URL with username:password@ embedded in the netloc.
Raises:
ValueError- If the URL scheme is nothttporhttps.
strip_credentials
def strip_credentials(url: str) -> str
Remove embedded credentials from a git URL.
Arguments:
url- Git repository URL, possibly with credentials.
Returns:
URL with credentials removed. Non-HTTP(S) URLs are returned unchanged.
is_auth_error
def is_auth_error(stderr: str) -> bool
Check whether git stderr indicates an authentication failure.
Arguments:
stderr- Combined stderr output from a git command.
Returns:
True if any known auth-failure pattern is found.
build_credential_approve_cmd
def build_credential_approve_cmd(username: str,
password: str,
host: str = "github.com",
protocol: str = "https") -> str
Build a shell command that pipes credentials into git credential approve.
Arguments:
username- Git username.password- Password or personal access token.host- Target host. Defaults to"github.com".protocol- Protocol. Defaults to"https".
Returns:
A shell command string safe to pass to commands.run().
wrenn._git._cmd
Pure functions that build git argument lists and parse git output.
No I/O, no network, no imports from wrenn. Every build_* function
returns a list[str] suitable for shlex.join(). Every parse_*
function takes raw stdout and returns a typed structure.
FileStatus Objects
@dataclass
class FileStatus()
A single entry from git status --porcelain=v1.
Attributes:
pathstr - File path relative to the repository root.index_statusstr - Index (staged) status character.work_tree_statusstr - Working-tree status character.renamed_fromstr | None - Original path when status is a rename.
staged
@property
def staged() -> bool
Whether the change is staged in the index.
status
@property
def status() -> str
Normalized human-readable status label.
GitStatus Objects
@dataclass
class GitStatus()
Parsed output of git status --porcelain=v1 --branch.
Attributes:
branchstr | None - Current branch name, orNoneif detached.upstreamstr | None - Upstream tracking branch.aheadint - Commits ahead of upstream.behindint - Commits behind upstream.detachedbool - Whether HEAD is detached.fileslist[FileStatus] - Per-file status entries.
is_clean
@property
def is_clean() -> bool
True when there are no changed or untracked files.
has_staged
@property
def has_staged() -> bool
True when at least one file has staged changes.
has_untracked
@property
def has_untracked() -> bool
True when at least one file is untracked.
has_conflicts
@property
def has_conflicts() -> bool
True when at least one file has merge conflicts.
GitBranch Objects
@dataclass
class GitBranch()
A single branch entry.
Attributes:
namestr - Branch name (short ref).is_currentbool - Whether this is the checked-out branch.
build_clone
def build_clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None) -> list[str]
Build git clone arguments.
build_init
def build_init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None) -> list[str]
Build git init arguments.
build_add
def build_add(paths: list[str] | None = None,
*,
all: bool = False) -> list[str]
Build git add arguments.
build_commit
def build_commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None) -> list[str]
Build git commit arguments.
build_push
def build_push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False) -> list[str]
Build git push arguments.
build_pull
def build_pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False) -> list[str]
Build git pull arguments.
build_status
def build_status() -> list[str]
Build git status arguments for porcelain parsing.
build_branches
def build_branches() -> list[str]
Build git branch arguments for structured parsing.
build_create_branch
def build_create_branch(name: str,
*,
start_point: str | None = None) -> list[str]
Build git checkout -b arguments.
build_checkout
def build_checkout(name: str) -> list[str]
Build git checkout arguments.
build_delete_branch
def build_delete_branch(name: str, *, force: bool = False) -> list[str]
Build git branch -d/-D arguments.
build_remote_add
def build_remote_add(name: str, url: str, *, fetch: bool = False) -> list[str]
Build git remote add arguments.
build_remote_get_url
def build_remote_get_url(name: str = "origin") -> list[str]
Build git remote get-url arguments.
build_remote_set_url
def build_remote_set_url(name: str, url: str) -> list[str]
Build git remote set-url arguments.
build_reset
def build_reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None) -> list[str]
Build git reset arguments.
Arguments:
mode- Reset mode (soft,mixed,hard,merge,keep).ref- Commit, branch, or ref to reset to.paths- Paths to reset (mutually exclusive withmode).
build_restore
def build_restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None) -> list[str]
Build git restore arguments.
Arguments:
paths- Paths to restore.staged- Restore the index (unstage).worktree- Restore working-tree files.source- Commit or ref to restore from.
build_config_set
def build_config_set(key: str,
value: str,
*,
scope: str = "local",
repo_path: str | None = None) -> list[str]
Build git config set arguments.
build_config_get
def build_config_get(key: str,
*,
scope: str = "local",
repo_path: str | None = None) -> list[str]
Build git config --get arguments.
parse_status
def parse_status(stdout: str) -> GitStatus
Parse git status --porcelain=v1 --branch output.
Arguments:
stdout- Raw stdout from the git status command.
Returns:
Parsed :class:GitStatus.
parse_branches
def parse_branches(stdout: str) -> list[GitBranch]
Parse git branch --format=%(refname:short)\t%(HEAD) output.
Arguments:
stdout- Raw stdout from the git branch command.
Returns:
List of :class:GitBranch.
wrenn._git.exceptions
GitError Objects
class GitError(Exception)
Base exception for all git operations inside a capsule.
Not a subclass of :class:WrennError because git errors originate
from a process exit code, not an HTTP response.
Attributes:
messagestr - Human-readable error description.stderrstr - Raw stderr output from the git process.exit_codeint - Process exit code.
GitCommandError Objects
class GitCommandError(GitError)
A git command exited with a non-zero exit code.
GitAuthError Objects
class GitAuthError(GitError)
Authentication failed when communicating with a remote.
wrenn._git
Git operations inside a Wrenn capsule.
Provides :class:Git (sync) and :class:AsyncGit (async) interfaces
accessed via capsule.git. All operations execute the real git
binary inside the capsule through :class:~wrenn.commands.Commands.
Git Objects
class Git()
Sync git interface. Accessed via capsule.git.
Executes the real git binary inside the capsule through
:meth:Commands.run. Methods raise :class:GitCommandError (or
:class:GitAuthError) on non-zero exit codes.
clone
def clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None,
username: str | None = None,
password: str | None = None,
dangerously_store_credentials: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 300) -> CommandResult
Clone a remote repository into the capsule.
Arguments:
url- Remote repository URL.dest- Destination path. Defaults to the repository name derived from the URL.branch- Branch or tag to check out.depth- Create a shallow clone with this many commits.username- Username for HTTP(S) authentication.password- Password or token for HTTP(S) authentication.dangerously_store_credentials- IfTrue, leave credentials embedded in the remote URL after cloning.cwd- Working directory for the command.envs- Extra environment variables.timeout- Command timeout in seconds. Defaults to300.
Returns:
Command result with stdout, stderr, exit_code, and duration.
Raises:
GitAuthError- If the remote rejected authentication.GitCommandError- If clone failed for another reason.ValueError- If password is provided without username.
init
def init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Initialize a new git repository.
Arguments:
path- Destination path for the repository.bare- Create a bare repository.initial_branch- Name for the initial branch (e.g."main").cwd- Working directory for the command.envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If init failed.
add
def add(paths: list[str] | None = None,
*,
all: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Stage files for commit.
Arguments:
paths- Specific files to stage. IfNone, stages the current directory (or all withall=True).all- Stage all changes including untracked files.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If add failed.
commit
def commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Create a commit.
Arguments:
message- Commit message.allow_empty- Allow creating a commit with no changes.author_name- Override the commit author name.author_email- Override the commit author email.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If commit failed.
push
def push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
Push commits to a remote.
Arguments:
remote- Remote name. Defaults to"origin".branch- Branch to push. Defaults to the current branch.force- Force-push.set_upstream- Set upstream tracking reference.username- Username for HTTP(S) authentication.password- Password or token for HTTP(S) authentication.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitAuthError- If authentication failed.GitCommandError- If push failed.
pull
def pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
Pull changes from a remote.
Arguments:
remote- Remote name. Defaults to"origin".branch- Branch to pull.rebase- Rebase instead of merge.ff_only- Only allow fast-forward merges.username- Username for HTTP(S) authentication.password- Password or token for HTTP(S) authentication.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitAuthError- If authentication failed.GitCommandError- If pull failed.
status
def status(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> GitStatus
Get repository status.
Arguments:
cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Parsed :class:GitStatus with branch info and file changes.
Raises:
GitCommandError- If the command failed.
branches
def branches(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> list[GitBranch]
List local branches.
Arguments:
cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
List of :class:GitBranch.
Raises:
GitCommandError- If the command failed.
create_branch
def create_branch(name: str,
*,
start_point: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Create and check out a new branch.
Arguments:
name- Branch name.start_point- Commit or ref to branch from.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
checkout_branch
def checkout_branch(name: str,
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Check out an existing branch.
Arguments:
name- Branch name.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
delete_branch
def delete_branch(name: str,
*,
force: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Delete a branch.
Arguments:
name- Branch name.force- Force-delete with-D.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
remote_add
def remote_add(name: str,
url: str,
*,
fetch: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Add a remote.
Arguments:
name- Remote name (e.g."origin").url- Remote URL.fetch- Fetch after adding.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
remote_get
def remote_get(name: str = "origin",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
Get the URL of a remote.
Returns None if the remote does not exist rather than raising.
Arguments:
name- Remote name. Defaults to"origin".cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Remote URL or None.
reset
def reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Reset the current HEAD.
Arguments:
mode- Reset mode (soft,mixed,hard,merge,keep).ref- Commit, branch, or ref to reset to.paths- Paths to reset.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
restore
def restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Restore working-tree files or unstage changes.
Arguments:
paths- Paths to restore.staged- Restore the index (unstage).worktree- Restore working-tree files.source- Commit or ref to restore from.cwd- Working directory (repository root).envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
set_config
def set_config(key: str,
value: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Set a git config value.
Arguments:
key- Config key (e.g."user.name").value- Config value.scope- Config scope:"local","global", or"system".cwd- Working directory (repository root). Required when scope is"local".envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Command result.
Raises:
GitCommandError- If the command failed.
get_config
def get_config(key: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
Get a git config value.
Returns None if the key is not set rather than raising.
Arguments:
key- Config key (e.g."user.name").scope- Config scope:"local","global", or"system".cwd- Working directory (repository root). Required when scope is"local".envs- Extra environment variables.timeout- Command timeout in seconds.
Returns:
Config value or None.
configure_user
def configure_user(name: str,
email: str,
*,
scope: str = "global",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
Configure git user name and email.
Arguments:
name- Git user name.email- Git user email.scope- Config scope. Defaults to"global".cwd- Working directory (repository root). Required when scope is"local".envs- Extra environment variables.timeout- Command timeout in seconds.
Raises:
ValueError- If name or email is empty.GitCommandError- If a config command failed.
dangerously_authenticate
def dangerously_authenticate(username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
Persist git credentials via the credential store.
.. warning::
Credentials are written in plain text to the capsule
filesystem and are accessible to any process running inside
the capsule. Prefer per-operation username/password
parameters on :meth:clone, :meth:push, and :meth:pull
instead.
Arguments:
username- Git username.password- Password or personal access token.host- Target host. Defaults to"github.com".protocol- Protocol. Defaults to"https".cwd- Working directory.envs- Extra environment variables.timeout- Command timeout in seconds.
Raises:
ValueError- If username or password is empty.GitCommandError- If a command failed.
AsyncGit Objects
class AsyncGit()
Async git interface. Accessed via capsule.git.
Async mirror of :class:Git. See that class for full method
documentation.
clone
async def clone(url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None,
username: str | None = None,
password: str | None = None,
dangerously_store_credentials: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 300) -> CommandResult
Clone a remote repository into the capsule.
init
async def init(path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Initialize a new git repository.
add
async def add(paths: list[str] | None = None,
*,
all: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Stage files for commit.
commit
async def commit(message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Create a commit.
push
async def push(remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
Push commits to a remote.
pull
async def pull(remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False,
username: str | None = None,
password: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 60) -> CommandResult
Pull changes from a remote.
status
async def status(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> GitStatus
Get repository status.
branches
async def branches(*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> list[GitBranch]
List local branches.
create_branch
async def create_branch(name: str,
*,
start_point: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Create and check out a new branch.
checkout_branch
async def checkout_branch(name: str,
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Check out an existing branch.
delete_branch
async def delete_branch(name: str,
*,
force: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Delete a branch.
remote_add
async def remote_add(name: str,
url: str,
*,
fetch: bool = False,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Add a remote.
remote_get
async def remote_get(name: str = "origin",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
Get the URL of a remote. Returns None if not found.
reset
async def reset(*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Reset the current HEAD.
restore
async def restore(paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Restore working-tree files or unstage changes.
set_config
async def set_config(key: str,
value: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> CommandResult
Set a git config value.
get_config
async def get_config(key: str,
*,
scope: str = "local",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> str | None
Get a git config value. Returns None if not set.
configure_user
async def configure_user(name: str,
email: str,
*,
scope: str = "global",
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
Configure git user name and email.
dangerously_authenticate
async def dangerously_authenticate(username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
*,
cwd: str | None = None,
envs: dict[str, str] | None = None,
timeout: int | None = 30) -> None
Persist git credentials via the credential store.
.. warning::
Credentials are written in plain text to the capsule
filesystem. Prefer per-operation username/password
parameters instead.