v0.2.0 #14

Merged
pptx704 merged 59 commits from dev into main 2026-05-24 05:02:08 +00:00
49 changed files with 1807 additions and 11923 deletions
Showing only changes of commit ad64c85393 - Show all commits

View File

@ -215,6 +215,73 @@ for chunk in capsule.files.download_stream("/data/large.bin"):
process(chunk) process(chunk)
``` ```
### Git
Git operations are accessed via `capsule.git`. All commands execute the real `git` binary inside the capsule:
```python
# Initialize a repo
capsule.git.init("/app", initial_branch="main")
# Configure user
capsule.git.configure_user("Alice", "alice@example.com", cwd="/app")
# Stage and commit
capsule.git.add(all=True, cwd="/app")
capsule.git.commit("initial commit", cwd="/app")
# Check status
status = capsule.git.status(cwd="/app")
print(status.branch) # "main"
print(status.is_clean) # True
for f in status.files:
print(f.path, f.index_status, f.work_tree_status)
# Branches
branches = capsule.git.branches(cwd="/app")
capsule.git.create_branch("feature", cwd="/app")
capsule.git.checkout_branch("main", cwd="/app")
capsule.git.delete_branch("feature", cwd="/app")
```
#### Clone with Authentication
```python
# Clone a private repo (credentials are stripped from remote URL after clone)
capsule.git.clone(
"https://github.com/org/repo.git",
username="user",
password="ghp_token",
cwd="/app",
)
# Push/pull with inline credentials (temporarily embedded, then restored)
capsule.git.push("origin", "main", username="user", password="ghp_token", cwd="/app")
capsule.git.pull("origin", "main", username="user", password="ghp_token", cwd="/app")
```
#### Configuration and Remotes
```python
capsule.git.set_config("core.autocrlf", "false", cwd="/app")
value = capsule.git.get_config("user.name", cwd="/app") # str | None
capsule.git.remote_add("upstream", "https://github.com/org/repo.git", cwd="/app")
url = capsule.git.remote_get("origin", cwd="/app") # str | None
```
Git errors raise `GitCommandError` (or `GitAuthError` for authentication failures), both inheriting from `GitError`:
```python
from wrenn import GitCommandError, GitAuthError
try:
capsule.git.push("origin", "main", username="user", password="bad", cwd="/app")
except GitAuthError as e:
print(e.stderr)
print(e.exit_code)
```
### Interactive Terminal (PTY) ### Interactive Terminal (PTY)
```python ```python
@ -533,14 +600,24 @@ make test-integration
### Running Integration Tests ### Running Integration Tests
Integration tests require a live Wrenn server: Integration tests require a live Wrenn server. Set credentials via environment or a `.env` file at the project root:
```bash ```bash
# Option 1: environment variable
export WRENN_API_KEY="wrn_..." export WRENN_API_KEY="wrn_..."
export WRENN_BASE_URL="http://localhost:8080" # optional
# Option 2: .env file
echo 'WRENN_API_KEY=wrn_...' > .env
```
Then run:
```bash
make test-integration make test-integration
``` ```
Tests are automatically skipped when `WRENN_API_KEY` is not available.
## License ## License
MIT MIT

View File

@ -4,8 +4,8 @@ version = "0.1.0"
description = "Python SDK for Wrenn" description = "Python SDK for Wrenn"
readme = "README.md" readme = "README.md"
authors = [ authors = [
{ name = "Rafeed M. Bhuiyan", email = "rafeed@omukk.dev" } { name = "Rafeed M. Bhuiyan", email = "rafeed@omukk.dev" },
{ name = "Tasnim Kabir Sadik", email = "tksadik@omukk.dev" } { name = "Tasnim Kabir Sadik", email = "tksadik@omukk.dev" },
] ]
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [

View File

@ -1,3 +1,13 @@
from wrenn._git import (
AsyncGit,
FileStatus,
Git,
GitAuthError,
GitBranch,
GitCommandError,
GitError,
GitStatus,
)
from wrenn.async_capsule import AsyncCapsule from wrenn.async_capsule import AsyncCapsule
from wrenn.capsule import Capsule from wrenn.capsule import Capsule
from wrenn.client import AsyncWrennClient, WrennClient from wrenn.client import AsyncWrennClient, WrennClient
@ -32,12 +42,20 @@ __version__ = "0.1.0"
__all__ = [ __all__ = [
"__version__", "__version__",
"AsyncCapsule", "AsyncCapsule",
"AsyncGit",
"AsyncPtySession", "AsyncPtySession",
"AsyncWrennClient", "AsyncWrennClient",
"Capsule", "Capsule",
"CommandHandle", "CommandHandle",
"CommandResult", "CommandResult",
"FileEntry", "FileEntry",
"FileStatus",
"Git",
"GitAuthError",
"GitBranch",
"GitCommandError",
"GitError",
"GitStatus",
"ProcessInfo", "ProcessInfo",
"PtyEvent", "PtyEvent",
"PtyEventType", "PtyEventType",

1423
src/wrenn/_git/__init__.py Normal file

File diff suppressed because it is too large Load Diff

104
src/wrenn/_git/_auth.py Normal file
View File

@ -0,0 +1,104 @@
from __future__ import annotations
import shlex
from urllib.parse import urlparse, urlunparse
def embed_credentials(url: str, username: str, password: str) -> str:
"""Embed HTTP(S) credentials into a git URL.
Args:
url: Git repository URL.
username: Username for authentication.
password: Password or personal access token.
Returns:
URL with ``username:password@`` embedded in the netloc.
Raises:
ValueError: If the URL scheme is not ``http`` or ``https``.
"""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(
"Only http(s) URLs support embedded credentials."
)
netloc = f"{username}:{password}@{parsed.hostname}"
if parsed.port:
netloc = f"{netloc}:{parsed.port}"
return urlunparse(parsed._replace(netloc=netloc))
def strip_credentials(url: str) -> str:
"""Remove embedded credentials from a git URL.
Args:
url: Git repository URL, possibly with credentials.
Returns:
URL with credentials removed. Non-HTTP(S) URLs are returned
unchanged.
"""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return url
if not parsed.username and not parsed.password:
return url
host = parsed.hostname or ""
if parsed.port:
host = f"{host}:{parsed.port}"
return urlunparse(parsed._replace(netloc=host))
def is_auth_error(stderr: str) -> bool:
"""Check whether git stderr indicates an authentication failure.
Args:
stderr: Combined stderr output from a git command.
Returns:
``True`` if any known auth-failure pattern is found.
"""
lower = stderr.lower()
patterns = (
"authentication failed",
"terminal prompts disabled",
"could not read username",
"invalid username or password",
"access denied",
"permission denied",
"not authorized",
)
return any(p in lower for p in patterns)
def build_credential_approve_cmd(
username: str,
password: str,
host: str = "github.com",
protocol: str = "https",
) -> str:
"""Build a shell command that pipes credentials into ``git credential approve``.
Args:
username: Git username.
password: Password or personal access token.
host: Target host. Defaults to ``"github.com"``.
protocol: Protocol. Defaults to ``"https"``.
Returns:
A shell command string safe to pass to ``commands.run()``.
"""
if "\n" in username or "\n" in password:
raise ValueError("Credentials must not contain newline characters.")
target_host = host.strip() or "github.com"
target_protocol = protocol.strip() or "https"
credential_input = "\n".join([
f"protocol={target_protocol}",
f"host={target_host}",
f"username={username}",
f"password={password}",
"",
"",
])
return f"printf %s {shlex.quote(credential_input)} | git credential approve"

495
src/wrenn/_git/_cmd.py Normal file
View File

@ -0,0 +1,495 @@
"""Pure functions that build git argument lists and parse git output.
No I/O, no network, no imports from ``wrenn``. Every ``build_*`` function
returns a ``list[str]`` suitable for ``shlex.join()``. Every ``parse_*``
function takes raw stdout and returns a typed structure.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
# ── Data types ─────────────────────────────────────────────────────
@dataclass
class FileStatus:
"""A single entry from ``git status --porcelain=v1``.
Attributes:
path (str): File path relative to the repository root.
index_status (str): Index (staged) status character.
work_tree_status (str): Working-tree status character.
renamed_from (str | None): Original path when status is a rename.
"""
path: str
index_status: str
work_tree_status: str
renamed_from: str | None = None
@property
def staged(self) -> bool:
"""Whether the change is staged in the index."""
return self.index_status not in (" ", "?")
@property
def status(self) -> str:
"""Normalized human-readable status label."""
return _derive_status(self.index_status, self.work_tree_status)
@dataclass
class GitStatus:
"""Parsed output of ``git status --porcelain=v1 --branch``.
Attributes:
branch (str | None): Current branch name, or ``None`` if detached.
upstream (str | None): Upstream tracking branch.
ahead (int): Commits ahead of upstream.
behind (int): Commits behind upstream.
detached (bool): Whether HEAD is detached.
files (list[FileStatus]): Per-file status entries.
"""
branch: str | None = None
upstream: str | None = None
ahead: int = 0
behind: int = 0
detached: bool = False
files: list[FileStatus] = field(default_factory=list)
@property
def is_clean(self) -> bool:
"""``True`` when there are no changed or untracked files."""
return len(self.files) == 0
@property
def has_staged(self) -> bool:
"""``True`` when at least one file has staged changes."""
return any(f.staged for f in self.files)
@property
def has_untracked(self) -> bool:
"""``True`` when at least one file is untracked."""
return any(f.status == "untracked" for f in self.files)
@property
def has_conflicts(self) -> bool:
"""``True`` when at least one file has merge conflicts."""
return any(f.status == "conflict" for f in self.files)
@dataclass
class GitBranch:
"""A single branch entry.
Attributes:
name (str): Branch name (short ref).
is_current (bool): Whether this is the checked-out branch.
"""
name: str
is_current: bool = False
# ── Argument builders ──────────────────────────────────────────────
def build_clone(
url: str,
dest: str | None = None,
*,
branch: str | None = None,
depth: int | None = None,
) -> list[str]:
"""Build ``git clone`` arguments."""
args = ["git", "clone"]
if branch:
args.extend(["--branch", branch, "--single-branch"])
if depth is not None:
args.extend(["--depth", str(depth)])
args.append(url)
if dest:
args.append(dest)
return args
def build_init(
path: str = ".",
*,
bare: bool = False,
initial_branch: str | None = None,
) -> list[str]:
"""Build ``git init`` arguments."""
args = ["git", "init"]
if initial_branch:
args.extend(["--initial-branch", initial_branch])
if bare:
args.append("--bare")
args.append(path)
return args
def build_add(
paths: list[str] | None = None,
*,
all: bool = False,
) -> list[str]:
"""Build ``git add`` arguments."""
args = ["git", "add"]
if not paths:
args.append("-A" if all else ".")
else:
args.append("--")
args.extend(paths)
return args
def build_commit(
message: str,
*,
allow_empty: bool = False,
author_name: str | None = None,
author_email: str | None = None,
) -> list[str]:
"""Build ``git commit`` arguments."""
args = ["git"]
if author_name:
args.extend(["-c", f"user.name={author_name}"])
if author_email:
args.extend(["-c", f"user.email={author_email}"])
args.extend(["commit", "-m", message])
if allow_empty:
args.append("--allow-empty")
return args
def build_push(
remote: str = "origin",
branch: str | None = None,
*,
force: bool = False,
set_upstream: bool = False,
) -> list[str]:
"""Build ``git push`` arguments."""
args = ["git", "push"]
if force:
args.append("--force")
if set_upstream:
args.append("--set-upstream")
args.append(remote)
if branch:
args.append(branch)
return args
def build_pull(
remote: str = "origin",
branch: str | None = None,
*,
rebase: bool = False,
ff_only: bool = False,
) -> list[str]:
"""Build ``git pull`` arguments."""
args = ["git", "pull"]
if rebase:
args.append("--rebase")
if ff_only:
args.append("--ff-only")
args.append(remote)
if branch:
args.append(branch)
return args
def build_status() -> list[str]:
"""Build ``git status`` arguments for porcelain parsing."""
return ["git", "status", "--porcelain=v1", "--branch"]
def build_branches() -> list[str]:
"""Build ``git branch`` arguments for structured parsing."""
return ["git", "branch", "--format=%(refname:short)\t%(HEAD)"]
def build_create_branch(
name: str,
*,
start_point: str | None = None,
) -> list[str]:
"""Build ``git checkout -b`` arguments."""
args = ["git", "checkout", "-b", name]
if start_point:
args.append(start_point)
return args
def build_checkout(name: str) -> list[str]:
"""Build ``git checkout`` arguments."""
return ["git", "checkout", name]
def build_delete_branch(
name: str,
*,
force: bool = False,
) -> list[str]:
"""Build ``git branch -d/-D`` arguments."""
return ["git", "branch", "-D" if force else "-d", name]
def build_remote_add(name: str, url: str, *, fetch: bool = False) -> list[str]:
"""Build ``git remote add`` arguments."""
args = ["git", "remote", "add"]
if fetch:
args.append("-f")
args.extend([name, url])
return args
def build_remote_get_url(name: str = "origin") -> list[str]:
"""Build ``git remote get-url`` arguments."""
return ["git", "remote", "get-url", name]
def build_remote_set_url(name: str, url: str) -> list[str]:
"""Build ``git remote set-url`` arguments."""
return ["git", "remote", "set-url", name, url]
def build_reset(
*,
mode: str | None = None,
ref: str | None = None,
paths: list[str] | None = None,
) -> list[str]:
"""Build ``git reset`` arguments.
Args:
mode: Reset mode (``soft``, ``mixed``, ``hard``, ``merge``, ``keep``).
ref: Commit, branch, or ref to reset to.
paths: Paths to reset (mutually exclusive with ``mode``).
"""
_ALLOWED_MODES = {"soft", "mixed", "hard", "merge", "keep"}
if mode and mode not in _ALLOWED_MODES:
raise ValueError(
f"Reset mode must be one of {', '.join(sorted(_ALLOWED_MODES))}."
)
args = ["git", "reset"]
if mode:
args.append(f"--{mode}")
if ref:
args.append(ref)
if paths:
args.append("--")
args.extend(paths)
return args
def build_restore(
paths: list[str],
*,
staged: bool = False,
worktree: bool = False,
source: str | None = None,
) -> list[str]:
"""Build ``git restore`` arguments.
Args:
paths: Paths to restore.
staged: Restore the index (unstage).
worktree: Restore working-tree files.
source: Commit or ref to restore from.
"""
if not paths:
raise ValueError("At least one path is required.")
if not staged and not worktree:
worktree = True
args = ["git", "restore"]
if worktree:
args.append("--worktree")
if staged:
args.append("--staged")
if source:
args.extend(["--source", source])
args.append("--")
args.extend(paths)
return args
def build_config_set(
key: str,
value: str,
*,
scope: str = "local",
repo_path: str | None = None,
) -> list[str]:
"""Build ``git config`` set arguments."""
scope_flag = _resolve_scope_flag(scope)
args = ["git"]
if scope == "local" and repo_path:
args.extend(["-C", repo_path])
args.extend(["config", scope_flag, key, value])
return args
def build_config_get(
key: str,
*,
scope: str = "local",
repo_path: str | None = None,
) -> list[str]:
"""Build ``git config --get`` arguments."""
scope_flag = _resolve_scope_flag(scope)
args = ["git"]
if scope == "local" and repo_path:
args.extend(["-C", repo_path])
args.extend(["config", scope_flag, "--get", key])
return args
def build_has_upstream() -> list[str]:
"""Build arguments to check if current branch has upstream tracking."""
return ["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"]
# ── Parsers ────────────────────────────────────────────────────────
def parse_status(stdout: str) -> GitStatus:
"""Parse ``git status --porcelain=v1 --branch`` output.
Args:
stdout: Raw stdout from the git status command.
Returns:
Parsed :class:`GitStatus`.
"""
lines = [line for line in stdout.split("\n") if line.rstrip()]
if not lines:
return GitStatus()
status = GitStatus()
branch_line = lines[0]
if branch_line.startswith("## "):
_parse_branch_line(branch_line[3:], status)
for line in lines[1:]:
if line.startswith("?? "):
status.files.append(FileStatus(
path=line[3:],
index_status="?",
work_tree_status="?",
))
continue
if len(line) < 4:
continue
idx = line[0]
wt = line[1]
path = line[3:]
renamed_from = None
if " -> " in path:
renamed_from, path = path.split(" -> ", 1)
status.files.append(FileStatus(
path=path,
index_status=idx,
work_tree_status=wt,
renamed_from=renamed_from,
))
return status
def parse_branches(stdout: str) -> list[GitBranch]:
"""Parse ``git branch --format=%(refname:short)\\t%(HEAD)`` output.
Args:
stdout: Raw stdout from the git branch command.
Returns:
List of :class:`GitBranch`.
"""
branches: list[GitBranch] = []
for line in stdout.split("\n"):
line = line.strip()
if not line:
continue
parts = line.split("\t")
name = parts[0]
is_current = len(parts) > 1 and parts[1] == "*"
branches.append(GitBranch(name=name, is_current=is_current))
return branches
# ── Internal helpers ───────────────────────────────────────────────
def _resolve_scope_flag(scope: str) -> str:
"""Convert a scope name to a git config flag."""
scope = scope.strip().lower()
if scope == "local":
return "--local"
if scope == "global":
return "--global"
if scope == "system":
return "--system"
raise ValueError(
"Git config scope must be one of: local, global, system."
)
def _parse_branch_line(info: str, status: GitStatus) -> None:
"""Parse the ``## branch...upstream [ahead N, behind M]`` header."""
ahead_start = info.find(" [")
branch_part = info if ahead_start == -1 else info[:ahead_start]
ahead_part = None if ahead_start == -1 else info[ahead_start + 2:-1]
if branch_part.startswith("HEAD (detached at "):
status.detached = True
status.branch = branch_part[18:].rstrip(")")
elif "detached" in branch_part or branch_part.startswith("HEAD"):
status.detached = True
elif "..." in branch_part:
local, remote = branch_part.split("...", 1)
status.branch = local or None
status.upstream = remote or None
else:
name = (
branch_part
.replace("No commits yet on ", "")
.replace("Initial commit on ", "")
)
status.branch = name or None
if ahead_part:
m = re.search(r"ahead (\d+)", ahead_part)
if m:
status.ahead = int(m.group(1))
m = re.search(r"behind (\d+)", ahead_part)
if m:
status.behind = int(m.group(1))
def _derive_status(index_status: str, work_tree_status: str) -> str:
"""Derive a normalized status label from porcelain XY characters."""
chars = {index_status, work_tree_status}
if "U" in chars:
return "conflict"
if "R" in chars:
return "renamed"
if "C" in chars:
return "copied"
if "D" in chars:
return "deleted"
if "A" in chars:
return "added"
if "M" in chars:
return "modified"
if "T" in chars:
return "typechange"
if "?" in chars:
return "untracked"
return "unknown"

View File

@ -0,0 +1,30 @@
from __future__ import annotations
class GitError(Exception):
"""Base exception for all git operations inside a capsule.
Not a subclass of :class:`WrennError` because git errors originate
from a process exit code, not an HTTP response.
Attributes:
message (str): Human-readable error description.
stderr (str): Raw stderr output from the git process.
exit_code (int): Process exit code.
"""
def __init__(
self, message: str, *, stderr: str = "", exit_code: int = -1
) -> None:
self.message = message
self.stderr = stderr
self.exit_code = exit_code
super().__init__(message)
class GitCommandError(GitError):
"""A git command exited with a non-zero exit code."""
class GitAuthError(GitError):
"""Authentication failed when communicating with a remote."""

View File

@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
import httpx_ws import httpx_ws
from wrenn._git import AsyncGit
from wrenn.capsule import _DualMethod, _build_proxy_url from wrenn.capsule import _DualMethod, _build_proxy_url
from wrenn.client import AsyncWrennClient from wrenn.client import AsyncWrennClient
from wrenn.commands import AsyncCommands from wrenn.commands import AsyncCommands
@ -42,6 +43,7 @@ class AsyncCapsule:
self.commands = AsyncCommands(_capsule_id, _client.http) self.commands = AsyncCommands(_capsule_id, _client.http)
self.files = AsyncFiles(_capsule_id, _client.http) self.files = AsyncFiles(_capsule_id, _client.http)
self.git = AsyncGit(_capsule_id, _client.http)
# ── Properties ────────────────────────────────────────────── # ── Properties ──────────────────────────────────────────────

View File

@ -8,6 +8,7 @@ from typing import Any
import httpx import httpx
import httpx_ws import httpx_ws
from wrenn._git import Git
from wrenn.client import WrennClient from wrenn.client import WrennClient
from wrenn.commands import Commands from wrenn.commands import Commands
from wrenn.files import Files from wrenn.files import Files
@ -111,6 +112,7 @@ class Capsule:
self.commands = Commands(self._id, self._client.http) self.commands = Commands(self._id, self._client.http)
self.files = Files(self._id, self._client.http) self.files = Files(self._id, self._client.http)
self.git = Git(self._id, self._client.http)
if wait: if wait:
self.wait_ready() self.wait_ready()

View File

@ -183,7 +183,11 @@ class Commands:
CommandHandle: PID and tag for background commands CommandHandle: PID and tag for background commands
(``background=True``). (``background=True``).
""" """
payload: dict = {"cmd": cmd, "background": background} payload: dict = {
"cmd": "/bin/sh",
"args": ["-c", cmd],
"background": background,
}
if timeout is not None and not background: if timeout is not None and not background:
payload["timeout_sec"] = timeout payload["timeout_sec"] = timeout
if envs is not None: if envs is not None:
@ -271,6 +275,8 @@ class Commands:
Args: Args:
cmd (str): Command to execute. cmd (str): Command to execute.
args (list[str] | None): Additional arguments for the command. args (list[str] | None): Additional arguments for the command.
When omitted, *cmd* is interpreted as a shell command
string and executed via ``/bin/sh -c``.
Yields: Yields:
StreamEvent: Successive events including :class:`StreamStartEvent`, StreamEvent: Successive events including :class:`StreamStartEvent`,
@ -281,9 +287,10 @@ class Commands:
f"/v1/capsules/{self._capsule_id}/exec/stream", f"/v1/capsules/{self._capsule_id}/exec/stream",
self._http, self._http,
) as ws: ) as ws:
start_msg: dict = {"type": "start", "cmd": cmd}
if args: if args:
start_msg["args"] = args start_msg: dict = {"type": "start", "cmd": cmd, "args": args}
else:
start_msg = {"type": "start", "cmd": "/bin/sh", "args": ["-c", cmd]}
ws.send_text(json.dumps(start_msg)) ws.send_text(json.dumps(start_msg))
while True: while True:
try: try:
@ -359,7 +366,11 @@ class AsyncCommands:
CommandHandle: PID and tag for background commands CommandHandle: PID and tag for background commands
(``background=True``). (``background=True``).
""" """
payload: dict = {"cmd": cmd, "background": background} payload: dict = {
"cmd": "/bin/sh",
"args": ["-c", cmd],
"background": background,
}
if timeout is not None and not background: if timeout is not None and not background:
payload["timeout_sec"] = timeout payload["timeout_sec"] = timeout
if envs is not None: if envs is not None:
@ -449,6 +460,8 @@ class AsyncCommands:
Args: Args:
cmd (str): Command to execute. cmd (str): Command to execute.
args (list[str] | None): Additional arguments for the command. args (list[str] | None): Additional arguments for the command.
When omitted, *cmd* is interpreted as a shell command
string and executed via ``/bin/sh -c``.
Yields: Yields:
StreamEvent: Successive events including :class:`StreamStartEvent`, StreamEvent: Successive events including :class:`StreamStartEvent`,
@ -459,9 +472,10 @@ class AsyncCommands:
f"/v1/capsules/{self._capsule_id}/exec/stream", f"/v1/capsules/{self._capsule_id}/exec/stream",
self._http, self._http,
) as ws: ) as ws:
start_msg: dict = {"type": "start", "cmd": cmd}
if args: if args:
start_msg["args"] = args start_msg: dict = {"type": "start", "cmd": cmd, "args": args}
else:
start_msg = {"type": "start", "cmd": "/bin/sh", "args": ["-c", cmd]}
await ws.send_text(json.dumps(start_msg)) await ws.send_text(json.dumps(start_msg))
try: try:
while True: while True:

35
tests/conftest.py Normal file
View File

@ -0,0 +1,35 @@
from __future__ import annotations
import os
from pathlib import Path
import pytest
ENV_FILE = Path(__file__).resolve().parent.parent / ".env"
def _read_env_file() -> dict[str, str]:
result: dict[str, str] = {}
if not ENV_FILE.exists():
return result
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip("\"'")
if key:
result[key] = value
return result
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
env_vars = _read_env_file()
has_key = bool(os.environ.get("WRENN_API_KEY") or env_vars.get("WRENN_API_KEY"))
if has_key:
return
skip = pytest.mark.skip(reason="WRENN_API_KEY not set")
for item in items:
if "integration" in item.keywords:
item.add_marker(skip)

1099
tests/test_git.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,568 +1,413 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import Generator import time
from pathlib import Path
import pytest import pytest
from wrenn.client import AsyncWrennClient, WrennClient from wrenn import Capsule, CommandResult
from wrenn.exceptions import WrennNotFoundError, WrennValidationError from wrenn.commands import CommandHandle, ProcessInfo
from wrenn.pty import PtyEventType from wrenn.models import Capsule as CapsuleModel, FileEntry, Status
WRENN_API_KEY = os.environ.get("WRENN_API_KEY") pytestmark = pytest.mark.integration
WRENN_TOKEN = os.environ.get("WRENN_TOKEN")
WRENN_BASE_URL = os.environ.get("WRENN_BASE_URL", "http://localhost:8080") _env_loaded = False
WRENN_TEST_EMAIL = os.environ.get("WRENN_TEST_EMAIL")
WRENN_TEST_PASSWORD = os.environ.get("WRENN_TEST_PASSWORD")
def _has_auth() -> bool: def _ensure_env() -> None:
return bool(WRENN_API_KEY or WRENN_TOKEN) global _env_loaded
if _env_loaded:
return
_env_loaded = True
env_file = Path(__file__).resolve().parent.parent / ".env"
if not env_file.exists():
return
for line in env_file.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key, value = key.strip(), value.strip().strip("\"'")
if key and key not in os.environ:
os.environ[key] = value
requires_auth = pytest.mark.skipif(
not _has_auth(),
reason="Set WRENN_API_KEY or WRENN_TOKEN to run integration tests",
)
@pytest.fixture
def client() -> Generator[WrennClient, None, None]:
with WrennClient(
api_key=WRENN_API_KEY,
token=WRENN_TOKEN,
base_url=WRENN_BASE_URL,
) as c:
yield c
@pytest.fixture
def async_client() -> AsyncWrennClient:
return AsyncWrennClient(
api_key=WRENN_API_KEY,
token=WRENN_TOKEN,
base_url=WRENN_BASE_URL,
)
@pytest.fixture
def bearer_client() -> Generator[WrennClient, None, None]:
if WRENN_TOKEN:
with WrennClient(token=WRENN_TOKEN, base_url=WRENN_BASE_URL) as c:
yield c
elif WRENN_TEST_EMAIL and WRENN_TEST_PASSWORD:
with WrennClient(
api_key=WRENN_API_KEY, token=WRENN_TOKEN, base_url=WRENN_BASE_URL
) as c:
resp = c.auth.login(WRENN_TEST_EMAIL, WRENN_TEST_PASSWORD)
with WrennClient(token=resp.token, base_url=WRENN_BASE_URL) as c:
yield c
else:
pytest.skip(
"Set WRENN_TOKEN or WRENN_TEST_EMAIL+WRENN_TEST_PASSWORD for bearer-auth tests"
)
@requires_auth
class TestCapsuleLifecycle: class TestCapsuleLifecycle:
def test_create_exec_destroy(self, client): """Each test manages its own capsule to test create/destroy paths."""
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
result = cap.exec("echo", args=["hello"])
assert result.exit_code == 0
assert "hello" in result.stdout
def test_exec_with_args(self, client): def setup_method(self):
with client.capsules.create(template="minimal", timeout_sec=120) as cap: _ensure_env()
cap.wait_ready(timeout=60, interval=1)
result = cap.exec("echo", args=["hello", "world"])
assert result.exit_code == 0
assert "hello world" in result.stdout
def test_exec_nonzero_exit(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
result = cap.exec("sh", args=["-c", "exit 42"])
assert result.exit_code == 42
def test_exec_stderr(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
result = cap.exec("sh", args=["-c", "echo err>&2"])
assert result.exit_code == 0
assert "err" in result.stderr
def test_context_manager_cleanup(self, client):
cap = client.capsules.create(template="minimal", timeout_sec=120)
cap_id = cap.id
with cap:
cap.wait_ready(timeout=60, interval=1)
fetched = client.capsules.get(cap_id)
assert fetched.status in ("stopped", "destroyed")
@requires_auth
class TestFileIO:
def test_upload_and_download(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
content = b"Hello from integration test!"
cap.upload("/tmp/test_file.txt", content)
downloaded = cap.download("/tmp/test_file.txt")
assert downloaded == content
def test_download_nonexistent_file(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with pytest.raises(Exception):
cap.download("/tmp/no_such_file_12345")
@requires_auth
class TestPauseResume:
def test_pause_and_resume(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.pause()
assert cap.status == "paused"
cap.resume()
cap.wait_ready(timeout=60, interval=1)
result = cap.exec("echo", args=["resumed"])
assert result.exit_code == 0
assert "resumed" in result.stdout
@requires_auth
class TestPing:
def test_ping_resets_timer(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.ping()
result = cap.exec("echo", args=["still_alive"])
assert result.exit_code == 0
assert "still_alive" in result.stdout
@requires_auth
class TestProxy:
def test_get_url(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
url = cap.get_url(8888)
assert cap.id in url
assert "8888" in url
@requires_auth
class TestListAndGet:
def test_list_capsules(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
boxes = client.capsules.list()
ids = [b.id for b in boxes]
assert cap.id in ids
def test_get_existing_capsule(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
fetched = client.capsules.get(cap.id)
assert fetched.id == cap.id
assert fetched.status == "running"
def test_get_nonexistent_capsule(self, client):
with pytest.raises((WrennNotFoundError, WrennValidationError)):
client.capsules.get("cl-nonexistent00000000000000000")
@requires_auth
class TestSnapshots:
def test_list_templates(self, client):
templates = client.snapshots.list()
assert isinstance(templates, list)
@requires_auth
class TestAPIKeys:
def test_create_list_delete(self, bearer_client):
key_resp = bearer_client.api_keys.create(name="integration-test-key")
assert key_resp.name == "integration-test-key"
assert key_resp.key is not None
assert key_resp.id is not None
def test_create_and_destroy(self):
capsule = Capsule()
capsule_id = capsule.capsule_id
try: try:
keys = bearer_client.api_keys.list() assert capsule_id
ids = [k.id for k in keys] assert capsule.info is not None
assert key_resp.id in ids
finally: finally:
bearer_client.api_keys.delete(key_resp.id) capsule.destroy()
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
def test_create_with_wait(self):
capsule = Capsule(wait=True)
try:
assert capsule.info is not None
assert capsule.info.status == Status.running
finally:
capsule.destroy()
def test_context_manager_destroys(self):
with Capsule(wait=True) as capsule:
capsule_id = capsule.capsule_id
assert capsule.is_running()
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
def test_get_info(self):
capsule = Capsule(wait=True)
try:
info = capsule.get_info()
assert isinstance(info, CapsuleModel)
assert info.id == capsule.capsule_id
assert info.status == Status.running
finally:
capsule.destroy()
def test_pause_and_resume(self):
capsule = Capsule(wait=True)
try:
paused = capsule.pause()
assert paused.status == Status.paused
assert not capsule.is_running()
resumed = capsule.resume()
assert resumed.status == Status.running
finally:
capsule.destroy()
def test_static_destroy(self):
capsule = Capsule(wait=True)
capsule_id = capsule.capsule_id
try:
Capsule.destroy(capsule_id)
except Exception:
capsule.destroy()
raise
info = Capsule.get_info(capsule_id)
assert info.status in (Status.stopped, Status.missing)
def test_connect_to_existing(self):
capsule = Capsule(wait=True)
try:
connected = Capsule.connect(capsule.capsule_id)
assert connected.capsule_id == capsule.capsule_id
assert connected.info is not None
assert connected.info.status == Status.running
finally:
capsule.destroy()
def test_connect_resumes_paused(self):
capsule = Capsule(wait=True)
try:
capsule.pause()
connected = Capsule.connect(capsule.capsule_id)
assert connected.info is not None
assert connected.info.status == Status.running
finally:
capsule.destroy()
def test_list_capsules(self):
capsule = Capsule(wait=True)
try:
capsules = Capsule.list()
assert isinstance(capsules, list)
ids = [c.id for c in capsules]
assert capsule.capsule_id in ids
finally:
capsule.destroy()
def test_wait_ready(self):
capsule = Capsule()
try:
capsule.wait_ready(timeout=60)
assert capsule.is_running()
finally:
capsule.destroy()
def test_ping(self):
capsule = Capsule(wait=True)
try:
capsule.ping()
finally:
capsule.destroy()
@requires_auth class TestCommands:
class TestRunCode: """Shared capsule for command execution tests."""
def test_basic_execution(self, client):
with client.capsules.create(
template="python-interpreter-v0-beta", timeout_sec=120
) as cap:
cap.wait_ready(timeout=60, interval=1)
r = cap.run_code("x = 42") capsule: Capsule
assert r.error is None
r = cap.run_code("x * 2") @classmethod
assert r.text == "84" def setup_class(cls):
_ensure_env()
cls.capsule = Capsule(wait=True)
def test_state_persists(self, client): @classmethod
with client.capsules.create( def teardown_class(cls):
template="python-interpreter-v0-beta", timeout_sec=120 try:
) as cap: cls.capsule.destroy()
cap.wait_ready(timeout=60, interval=1) except Exception:
pass
cap.run_code("def greet(name): return f'hello {name}'") def test_run_foreground(self):
r = cap.run_code("greet('capsule')") result = self.capsule.commands.run("echo hello")
assert "hello capsule" in (r.text or "") assert isinstance(result, CommandResult)
assert result.exit_code == 0
assert "hello" in result.stdout
def test_error_traceback(self, client): def test_run_stderr(self):
with client.capsules.create( result = self.capsule.commands.run("echo error >&2")
template="python-interpreter-v0-beta", timeout_sec=120 assert "error" in result.stderr
) as cap:
cap.wait_ready(timeout=60, interval=1)
r = cap.run_code("1/0") def test_run_exit_code(self):
assert r.error is not None result = self.capsule.commands.run("exit 42")
assert "ZeroDivisionError" in r.error assert result.exit_code == 42
def test_stdout_capture(self, client): def test_run_with_envs(self):
with client.capsules.create( result = self.capsule.commands.run(
template="python-interpreter-v0-beta", timeout_sec=120 "export MY_VAR=test_value && echo $MY_VAR"
) as cap: )
cap.wait_ready(timeout=60, interval=1) assert "test_value" in result.stdout
r = cap.run_code("print('hello from kernel')") def test_run_with_cwd(self):
assert "hello from kernel" in r.stdout result = self.capsule.commands.run("cd /tmp && pwd")
assert result.stdout.strip() == "/tmp"
def test_run_multiline_output(self):
result = self.capsule.commands.run("echo -e 'line1\\nline2\\nline3'")
assert result.exit_code == 0
lines = result.stdout.strip().splitlines()
assert len(lines) == 3
def test_run_background(self):
handle = self.capsule.commands.run(
"sleep 30", background=True, tag="bg-test"
)
assert isinstance(handle, CommandHandle)
assert handle.pid > 0
assert handle.tag == "bg-test"
assert handle.capsule_id == self.capsule.capsule_id
self.capsule.commands.kill(handle.pid)
def test_list_processes(self):
handle = self.capsule.commands.run(
"sleep 30", background=True, tag="list-test"
)
try:
time.sleep(0.5)
processes = self.capsule.commands.list()
assert isinstance(processes, list)
pids = [p.pid for p in processes]
assert handle.pid in pids
proc = next(p for p in processes if p.pid == handle.pid)
assert isinstance(proc, ProcessInfo)
finally:
self.capsule.commands.kill(handle.pid)
def test_kill_process(self):
handle = self.capsule.commands.run(
"sleep 30", background=True
)
self.capsule.commands.kill(handle.pid)
time.sleep(0.5)
processes = self.capsule.commands.list()
pids = [p.pid for p in processes]
assert handle.pid not in pids
def test_run_duration_ms(self):
result = self.capsule.commands.run("sleep 1")
assert result.duration_ms is None or result.duration_ms >= 900
@requires_auth class TestFiles:
class TestAsyncCapsuleLifecycle: """Shared capsule for filesystem tests."""
@pytest.mark.asyncio
async def test_async_create_exec_destroy(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
result = await cap.async_exec("echo", args=["async_hello"])
assert result.exit_code == 0
assert "async_hello" in result.stdout
finally:
await cap.async_destroy()
@pytest.mark.asyncio capsule: Capsule
async def test_async_upload_download(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
content = b"Async upload test"
await cap.async_upload("/tmp/async_test.txt", content)
downloaded = await cap.async_download("/tmp/async_test.txt")
assert downloaded == content
finally:
await cap.async_destroy()
@pytest.mark.asyncio @classmethod
async def test_async_run_code(self, async_client): def setup_class(cls):
async with async_client: _ensure_env()
cap = await async_client.capsules.create( cls.capsule = Capsule(wait=True)
template="python-interpreter-v0-beta", timeout_sec=120
) @classmethod
try: def teardown_class(cls):
await cap.async_wait_ready(timeout=60, interval=1) try:
r = await cap.async_run_code("42 * 2") cls.capsule.destroy()
assert r.text == "84" except Exception:
finally: pass
await cap.async_destroy()
def test_write_and_read(self):
self.capsule.files.write("/tmp/test.txt", "hello world")
content = self.capsule.files.read("/tmp/test.txt")
assert content == "hello world"
def test_write_and_read_bytes(self):
data = b"\x00\x01\x02\xff"
self.capsule.files.write("/tmp/test.bin", data)
result = self.capsule.files.read_bytes("/tmp/test.bin")
assert result == data
def test_list_directory(self):
self.capsule.files.write("/tmp/listdir/a.txt", "a")
self.capsule.files.write("/tmp/listdir/b.txt", "b")
entries = self.capsule.files.list("/tmp/listdir")
assert isinstance(entries, list)
names = [e.name for e in entries]
assert "a.txt" in names
assert "b.txt" in names
def test_exists(self):
self.capsule.files.write("/tmp/exists_test.txt", "x")
assert self.capsule.files.exists("/tmp/exists_test.txt")
assert not self.capsule.files.exists("/tmp/does_not_exist_xyz.txt")
def test_make_dir(self):
entry = self.capsule.files.make_dir("/tmp/newdir")
assert isinstance(entry, FileEntry)
assert self.capsule.files.exists("/tmp/newdir")
def test_make_dir_idempotent(self):
self.capsule.files.make_dir("/tmp/idempotent_dir")
entry = self.capsule.files.make_dir("/tmp/idempotent_dir")
assert isinstance(entry, FileEntry)
def test_remove_file(self):
self.capsule.files.write("/tmp/to_remove.txt", "delete me")
assert self.capsule.files.exists("/tmp/to_remove.txt")
self.capsule.files.remove("/tmp/to_remove.txt")
assert not self.capsule.files.exists("/tmp/to_remove.txt")
def test_remove_directory(self):
self.capsule.files.make_dir("/tmp/dir_to_remove")
self.capsule.files.write("/tmp/dir_to_remove/child.txt", "data")
self.capsule.files.remove("/tmp/dir_to_remove")
assert not self.capsule.files.exists("/tmp/dir_to_remove")
def test_write_creates_parent_dirs(self):
self.capsule.files.write("/tmp/deep/nested/dir/file.txt", "nested")
content = self.capsule.files.read("/tmp/deep/nested/dir/file.txt")
assert content == "nested"
def test_list_with_depth(self):
self.capsule.files.write("/tmp/depth_test/a/b.txt", "deep")
entries_shallow = self.capsule.files.list("/tmp/depth_test", depth=1)
entries_deep = self.capsule.files.list("/tmp/depth_test", depth=2)
assert len(entries_deep) >= len(entries_shallow)
def test_overwrite_file(self):
self.capsule.files.write("/tmp/overwrite.txt", "original")
self.capsule.files.write("/tmp/overwrite.txt", "updated")
content = self.capsule.files.read("/tmp/overwrite.txt")
assert content == "updated"
def test_upload_and_download_stream(self):
chunks = [b"chunk1", b"chunk2", b"chunk3"]
self.capsule.files.upload_stream("/tmp/streamed.bin", iter(chunks))
downloaded = b"".join(self.capsule.files.download_stream("/tmp/streamed.bin"))
assert downloaded == b"chunk1chunk2chunk3"
@requires_auth class TestGit:
class TestFilesystemListDir: """Shared capsule for git operation tests.
def test_list_dir_root(self, client: WrennClient):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.mkdir("/tmp/ls_test_root")
cap.upload("/tmp/ls_test_root/hello.txt", b"hello")
entries = cap.list_dir("/tmp/ls_test_root")
assert isinstance(entries, list)
names = [e.name for e in entries]
assert "hello.txt" in names
def test_list_dir_after_mkdir(self, client): Initializes a repo at /root (default cwd) since the exec API
with client.capsules.create(template="minimal", timeout_sec=120) as cap: does not support the cwd parameter.
cap.wait_ready(timeout=60, interval=1) """
cap.mkdir("/tmp/fs_test_dir")
entries = cap.list_dir("/tmp")
names = [e.name for e in entries]
assert "fs_test_dir" in names
def test_list_dir_file_metadata(self, client): capsule: Capsule
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
cap.upload("/tmp/meta_test.txt", b"hello world")
entries = cap.list_dir("/tmp")
match = [e for e in entries if e.name == "meta_test.txt"]
assert len(match) == 1
f = match[0]
assert f.type == "file"
assert f.size == 11
assert f.permissions is not None
assert f.owner is not None
assert f.group is not None
assert f.modified_at is not None
def test_list_dir_depth(self, client): @classmethod
with client.capsules.create(template="minimal", timeout_sec=120) as cap: def setup_class(cls):
cap.wait_ready(timeout=60, interval=1) _ensure_env()
cap.mkdir("/tmp/depth_a/depth_b") cls.capsule = Capsule(wait=True)
cap.upload("/tmp/depth_a/depth_b/nested.txt", b"deep") cls.capsule.git.init(".", initial_branch="main")
entries = cap.list_dir("/tmp/depth_a", depth=2) cls.capsule.git.configure_user("Test User", "test@example.com")
paths = [e.path for e in entries]
assert any("nested.txt" in p for p in paths)
def test_list_dir_empty_directory(self, client): @classmethod
with client.capsules.create(template="minimal", timeout_sec=120) as cap: def teardown_class(cls):
cap.wait_ready(timeout=60, interval=1) try:
cap.mkdir("/tmp/empty_dir_test") cls.capsule.destroy()
entries = cap.list_dir("/tmp/empty_dir_test") except Exception:
assert entries == [] pass
def test_init_created_repo(self):
assert self.capsule.files.exists("/root/.git")
@requires_auth def test_status_clean(self):
class TestFilesystemMkdir: status = self.capsule.git.status()
def test_mkdir_creates_directory(self, client): assert status.branch == "main"
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
entry = cap.mkdir("/tmp/mkdir_test")
assert entry.name == "mkdir_test"
assert entry.type == "directory"
assert entry.path == "/tmp/mkdir_test"
def test_mkdir_creates_parents(self, client): def test_add_and_commit(self):
with client.capsules.create(template="minimal", timeout_sec=120) as cap: self.capsule.files.write("/root/hello.txt", "hello git")
cap.wait_ready(timeout=60, interval=1) self.capsule.git.add(all=True)
entry = cap.mkdir("/tmp/a/b/c/d") result = self.capsule.git.commit("initial commit")
assert entry.type == "directory" assert result.exit_code == 0
def test_mkdir_already_exists(self, client: WrennClient): def test_status_after_commit(self):
with client.capsules.create(template="minimal", timeout_sec=120) as cap: status = self.capsule.git.status()
cap.wait_ready(timeout=60, interval=1) assert status.is_clean
cap.mkdir("/tmp/exist_test")
entry = cap.mkdir("/tmp/exist_test")
assert entry.type == "directory"
def test_status_with_changes(self):
self.capsule.files.write("/root/dirty.txt", "uncommitted")
try:
status = self.capsule.git.status()
assert not status.is_clean
paths = [f.path for f in status.files]
assert "dirty.txt" in paths
finally:
self.capsule.files.remove("/root/dirty.txt")
@requires_auth def test_branches(self):
class TestFilesystemRemove: branches = self.capsule.git.branches()
def test_remove_file(self, client): assert len(branches) >= 1
with client.capsules.create(template="minimal", timeout_sec=120) as cap: names = [b.name for b in branches]
cap.wait_ready(timeout=60, interval=1) assert "main" in names
cap.upload("/tmp/rm_test.txt", b"delete me") current = [b for b in branches if b.is_current]
entries_before = cap.list_dir("/tmp") assert len(current) == 1
assert any(e.name == "rm_test.txt" for e in entries_before)
cap.remove("/tmp/rm_test.txt")
entries_after = cap.list_dir("/tmp")
assert not any(e.name == "rm_test.txt" for e in entries_after)
def test_remove_directory(self, client): def test_create_and_checkout_branch(self):
with client.capsules.create(template="minimal", timeout_sec=120) as cap: self.capsule.git.create_branch("feature-1")
cap.wait_ready(timeout=60, interval=1) branches = self.capsule.git.branches()
cap.mkdir("/tmp/rm_dir_test") names = [b.name for b in branches]
cap.upload("/tmp/rm_dir_test/file.txt", b"inside") assert "feature-1" in names
cap.remove("/tmp/rm_dir_test")
entries = cap.list_dir("/tmp")
assert not any(e.name == "rm_dir_test" for e in entries)
def test_upload_download_remove_roundtrip(self, client): current = [b for b in branches if b.is_current]
with client.capsules.create(template="minimal", timeout_sec=120) as cap: assert current[0].name == "feature-1"
cap.wait_ready(timeout=60, interval=1)
content = b"round trip test data " * 100
cap.upload("/tmp/rt.txt", content)
downloaded = cap.download("/tmp/rt.txt")
assert downloaded == content
cap.remove("/tmp/rt.txt")
with pytest.raises(Exception):
cap.download("/tmp/rt.txt")
self.capsule.git.checkout_branch("main")
@requires_auth def test_delete_branch(self):
class TestStreamUploadDownload: self.capsule.git.create_branch("to-delete")
def test_stream_upload_and_download(self, client: WrennClient): self.capsule.git.checkout_branch("main")
with client.capsules.create(template="minimal", timeout_sec=120) as cap: self.capsule.git.delete_branch("to-delete")
cap.wait_ready(timeout=60, interval=1)
chunks = [b"chunk0_", b"chunk1_", b"chunk2"]
def data_gen(): branches = self.capsule.git.branches()
yield from chunks names = [b.name for b in branches]
assert "to-delete" not in names
cap.stream_upload("/tmp/stream_test.bin", data_gen()) def test_set_and_get_config(self):
downloaded = cap.download("/tmp/stream_test.bin") self.capsule.git.set_config("test.key", "test-value")
assert downloaded == b"chunk0_chunk1_chunk2" value = self.capsule.git.get_config("test.key")
assert value == "test-value"
def test_stream_download_large(self, client): def test_get_config_missing_returns_none(self):
with client.capsules.create(template="minimal", timeout_sec=120) as cap: value = self.capsule.git.get_config("nonexistent.key")
cap.wait_ready(timeout=60, interval=1) assert value is None
content = b"x" * 65536 * 3
cap.upload("/tmp/large.bin", content)
collected = b""
for chunk in cap.stream_download("/tmp/large.bin"):
collected += chunk
assert collected == content
@requires_auth
class TestPty:
def test_pty_basic_output(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", cwd="/tmp") as term:
term.write(b"echo pty_hello\n")
output = b""
for event in term:
if event.type == PtyEventType.output:
output += event.data
elif event.type == PtyEventType.exit:
break
if b"pty_hello" in output:
term.write(b"exit\n")
assert b"pty_hello" in output
def test_pty_tag_and_pid(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh") as term:
started = False
for event in term:
if event.type == PtyEventType.started:
started = True
assert term.tag is not None
assert term.pid is not None
assert term.tag.startswith("pty-")
elif event.type == PtyEventType.output:
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
assert started
def test_pty_exit_on_command_exit(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/echo", args=["immediate"]) as term:
events = list(term)
types = [e.type for e in events]
assert PtyEventType.started in types
assert PtyEventType.output in types or PtyEventType.exit in types
def test_pty_resize(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", cols=80, rows=24) as term:
for event in term:
if event.type == PtyEventType.started:
term.resize(120, 40)
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
def test_pty_envs(self, client):
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
cap.wait_ready(timeout=60, interval=1)
with cap.pty(cmd="/bin/sh", envs={"MY_VAR": "hello_env"}) as term:
output = b""
for event in term:
if event.type == PtyEventType.started:
term.write(b"echo $MY_VAR\n")
elif event.type == PtyEventType.output:
output += event.data
if b"hello_env" in output:
term.write(b"exit\n")
elif event.type == PtyEventType.exit:
break
assert b"hello_env" in output
@requires_auth
class TestAsyncFilesystem:
@pytest.mark.asyncio
async def test_async_list_dir(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
await cap.async_mkdir("/tmp/async_ls_test")
await cap.async_upload("/tmp/async_ls_test/file.txt", b"data")
entries = await cap.async_list_dir("/tmp/async_ls_test")
assert isinstance(entries, list)
assert any(e.name == "file.txt" for e in entries)
finally:
await cap.async_destroy()
@pytest.mark.asyncio
async def test_async_mkdir(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
entry = await cap.async_mkdir("/tmp/async_mkdir_test")
assert entry.type == "directory"
assert entry.name == "async_mkdir_test"
finally:
await cap.async_destroy()
@pytest.mark.asyncio
async def test_async_remove(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
await cap.async_upload("/tmp/async_rm.txt", b"bye")
entries = await cap.async_list_dir("/tmp")
assert any(e.name == "async_rm.txt" for e in entries)
await cap.async_remove("/tmp/async_rm.txt")
entries = await cap.async_list_dir("/tmp")
assert not any(e.name == "async_rm.txt" for e in entries)
finally:
await cap.async_destroy()
@pytest.mark.asyncio
async def test_async_full_filesystem_roundtrip(self, async_client):
async with async_client:
cap = await async_client.capsules.create(
template="minimal", timeout_sec=120
)
try:
await cap.async_wait_ready(timeout=60, interval=1)
await cap.async_mkdir("/tmp/async_rt")
await cap.async_upload("/tmp/async_rt/file.txt", b"async content")
entries = await cap.async_list_dir("/tmp/async_rt")
assert any(e.name == "file.txt" for e in entries)
data = await cap.async_download("/tmp/async_rt/file.txt")
assert data == b"async content"
await cap.async_remove("/tmp/async_rt/file.txt")
entries = await cap.async_list_dir("/tmp/async_rt")
assert not any(e.name == "file.txt" for e in entries)
finally:
await cap.async_destroy()