ci: add test pipeline, PyPI release workflow, and lint fixes

- Update Woodpecker to run unit and integration tests in parallel
- Add GitHub Actions workflow for PyPI trusted publishing on main
- Add license, classifiers, keywords, and URLs to pyproject.toml
- Fix ruff lint errors (unused imports, duplicate class name) and formatting
This commit is contained in:
2026-04-23 18:32:59 +06:00
parent ad64c85393
commit 68c7d0de42
18 changed files with 303 additions and 280 deletions

24
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,24 @@
name: Publish to PyPI
on:
push:
branches:
- main
jobs:
pypi-publish:
name: Upload release to PyPI
runs-on: ubuntu-latest
environment: pypi
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v6
- name: Build package
run: uv build
- name: Publish package distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

View File

@ -1,46 +1,20 @@
when:
event: push
branch:
- main
- dev
variables:
- &python_image "ghcr.io/astral-sh/uv:python3.13-bookworm-slim"
- &uv_cache_dir "/root/.cache/uv"
steps:
- name: restore-cache
image: woodpeckerci/plugin-cache
settings:
restore: true
cache_key: "uv-{{ checksum \"uv.lock\" }}"
mount:
- /root/.cache/uv
- name: lint
image: *python_image
environment:
UV_CACHE_DIR: *uv_cache_dir
UV_FROZEN: 1
unit-tests:
image: ghcr.io/astral-sh/uv:python3.13-bookworm
commands:
- uv sync --no-install-project
- make lint
- uv sync --dev
- uv run pytest -m "not integration" -v
- name: test
image: *python_image
integration-tests:
image: ghcr.io/astral-sh/uv:python3.13-bookworm
environment:
UV_CACHE_DIR: *uv_cache_dir
UV_FROZEN: 1
WRENN_API_KEY:
from_secret: WRENN_API_KEY
commands:
- uv sync --no-install-project
- make test
- name: rebuild-cache
image: woodpeckerci/plugin-cache
when:
- status: [success]
settings:
rebuild: true
cache_key: "uv-{{ checksum \"uv.lock\" }}"
mount:
- /root/.cache/uv
- uv sync --dev
- uv run pytest -m integration -v

View File

@ -3,11 +3,24 @@ name = "wrenn"
version = "0.1.0"
description = "Python SDK for Wrenn"
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
authors = [
{ name = "Rafeed M. Bhuiyan", email = "rafeed@omukk.dev" },
{ name = "Tasnim Kabir Sadik", email = "tksadik@omukk.dev" },
]
requires-python = ">=3.13"
keywords = ["wrenn"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed",
]
dependencies = [
"email-validator>=2.3.0",
"httpx>=0.28.1",
@ -29,6 +42,10 @@ dev = [
"ruff>=0.15.10",
]
[project.urls]
Homepage = "https://wrenn.dev"
Repository = "https://github.com/wrennhq/python-sdk"
[tool.pytest.ini_options]
markers = [
"integration: integration tests (require live server)",

View File

@ -211,9 +211,7 @@ class Git:
if sanitized != clone_url:
repo_dir = dest or _derive_repo_dir(url)
if repo_dir:
repo_cwd = (
posixpath.join(cwd, repo_dir) if cwd else repo_dir
)
repo_cwd = posixpath.join(cwd, repo_dir) if cwd else repo_dir
strip_result = self._run(
build_remote_set_url("origin", sanitized),
cwd=repo_cwd,
@ -482,9 +480,7 @@ class Git:
Raises:
GitCommandError: If the command failed.
"""
result = self._run(
build_branches(), cwd=cwd, envs=envs, timeout=timeout
)
result = self._run(build_branches(), cwd=cwd, envs=envs, timeout=timeout)
_check_result(result, op="branches")
return parse_branches(result.stdout)
@ -697,9 +693,7 @@ class Git:
Raises:
GitCommandError: If the command failed.
"""
argv = build_restore(
paths, staged=staged, worktree=worktree, source=source
)
argv = build_restore(paths, staged=staged, worktree=worktree, source=source)
result = self._run(argv, cwd=cwd, envs=envs, timeout=timeout)
_check_result(result, op="restore")
return result
@ -798,8 +792,12 @@ class Git:
"""
if not name or not email:
raise ValueError("Both name and email are required.")
self.set_config("user.name", name, scope=scope, cwd=cwd, envs=envs, timeout=timeout)
self.set_config("user.email", email, scope=scope, cwd=cwd, envs=envs, timeout=timeout)
self.set_config(
"user.name", name, scope=scope, cwd=cwd, envs=envs, timeout=timeout
)
self.set_config(
"user.email", email, scope=scope, cwd=cwd, envs=envs, timeout=timeout
)
def dangerously_authenticate(
self,
@ -836,12 +834,14 @@ class Git:
GitCommandError: If a command failed.
"""
if not username or not password:
raise ValueError(
"Both username and password are required."
)
raise ValueError("Both username and password are required.")
self.set_config(
"credential.helper", "store",
scope="global", cwd=cwd, envs=envs, timeout=timeout,
"credential.helper",
"store",
scope="global",
cwd=cwd,
envs=envs,
timeout=timeout,
)
cmd = build_credential_approve_cmd(
username=username,
@ -880,7 +880,9 @@ class Git:
credential_url = embed_credentials(original_url, username, password)
self._run(
build_remote_set_url(remote, credential_url),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
op_error: Exception | None = None
@ -895,7 +897,9 @@ class Git:
try:
self._run(
build_remote_set_url(remote, original_url),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
except Exception as err:
restore_error = err
@ -988,9 +992,7 @@ class AsyncGit:
if sanitized != clone_url:
repo_dir = dest or _derive_repo_dir(url)
if repo_dir:
repo_cwd = (
posixpath.join(cwd, repo_dir) if cwd else repo_dir
)
repo_cwd = posixpath.join(cwd, repo_dir) if cwd else repo_dir
strip_result = await self._run(
build_remote_set_url("origin", sanitized),
cwd=repo_cwd,
@ -1072,10 +1074,13 @@ class AsyncGit:
) -> CommandResult:
"""Push commits to a remote."""
if username and password:
async def _op() -> CommandResult:
return await self._run(
build_push(remote, branch, force=force, set_upstream=set_upstream),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
return await self._with_remote_credentials(
@ -1109,10 +1114,13 @@ class AsyncGit:
) -> CommandResult:
"""Pull changes from a remote."""
if username and password:
async def _op() -> CommandResult:
return await self._run(
build_pull(remote, branch, rebase=rebase, ff_only=ff_only),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
return await self._with_remote_credentials(
@ -1141,9 +1149,7 @@ class AsyncGit:
timeout: int | None = 30,
) -> GitStatus:
"""Get repository status."""
result = await self._run(
build_status(), cwd=cwd, envs=envs, timeout=timeout
)
result = await self._run(build_status(), cwd=cwd, envs=envs, timeout=timeout)
_check_result(result, op="status")
return parse_status(result.stdout)
@ -1155,9 +1161,7 @@ class AsyncGit:
timeout: int | None = 30,
) -> list[GitBranch]:
"""List local branches."""
result = await self._run(
build_branches(), cwd=cwd, envs=envs, timeout=timeout
)
result = await self._run(build_branches(), cwd=cwd, envs=envs, timeout=timeout)
_check_result(result, op="branches")
return parse_branches(result.stdout)
@ -1270,9 +1274,7 @@ class AsyncGit:
timeout: int | None = 30,
) -> CommandResult:
"""Restore working-tree files or unstage changes."""
argv = build_restore(
paths, staged=staged, worktree=worktree, source=source
)
argv = build_restore(paths, staged=staged, worktree=worktree, source=source)
result = await self._run(argv, cwd=cwd, envs=envs, timeout=timeout)
_check_result(result, op="restore")
return result
@ -1325,8 +1327,12 @@ class AsyncGit:
"""Configure git user name and email."""
if not name or not email:
raise ValueError("Both name and email are required.")
await self.set_config("user.name", name, scope=scope, cwd=cwd, envs=envs, timeout=timeout)
await self.set_config("user.email", email, scope=scope, cwd=cwd, envs=envs, timeout=timeout)
await self.set_config(
"user.name", name, scope=scope, cwd=cwd, envs=envs, timeout=timeout
)
await self.set_config(
"user.email", email, scope=scope, cwd=cwd, envs=envs, timeout=timeout
)
async def dangerously_authenticate(
self,
@ -1348,12 +1354,14 @@ class AsyncGit:
parameters instead.
"""
if not username or not password:
raise ValueError(
"Both username and password are required."
)
raise ValueError("Both username and password are required.")
await self.set_config(
"credential.helper", "store",
scope="global", cwd=cwd, envs=envs, timeout=timeout,
"credential.helper",
"store",
scope="global",
cwd=cwd,
envs=envs,
timeout=timeout,
)
cmd = build_credential_approve_cmd(
username=username,
@ -1394,7 +1402,9 @@ class AsyncGit:
credential_url = embed_credentials(original_url, username, password)
await self._run(
build_remote_set_url(remote, credential_url),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
op_error: Exception | None = None
@ -1409,7 +1419,9 @@ class AsyncGit:
try:
await self._run(
build_remote_set_url(remote, original_url),
cwd=cwd, envs=envs, timeout=timeout,
cwd=cwd,
envs=envs,
timeout=timeout,
)
except Exception as err:
restore_error = err

View File

@ -20,9 +20,7 @@ def embed_credentials(url: str, username: str, password: str) -> str:
"""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(
"Only http(s) URLs support embedded credentials."
)
raise ValueError("Only http(s) URLs support embedded credentials.")
netloc = f"{username}:{password}@{parsed.hostname}"
if parsed.port:
netloc = f"{netloc}:{parsed.port}"
@ -93,12 +91,14 @@ def build_credential_approve_cmd(
raise ValueError("Credentials must not contain newline characters.")
target_host = host.strip() or "github.com"
target_protocol = protocol.strip() or "https"
credential_input = "\n".join([
f"protocol={target_protocol}",
f"host={target_host}",
f"username={username}",
f"password={password}",
"",
"",
])
credential_input = "\n".join(
[
f"protocol={target_protocol}",
f"host={target_host}",
f"username={username}",
f"password={password}",
"",
"",
]
)
return f"printf %s {shlex.quote(credential_input)} | git credential approve"

View File

@ -13,6 +13,7 @@ from dataclasses import dataclass, field
# ── Data types ─────────────────────────────────────────────────────
@dataclass
class FileStatus:
"""A single entry from ``git status --porcelain=v1``.
@ -96,6 +97,7 @@ class GitBranch:
# ── Argument builders ──────────────────────────────────────────────
def build_clone(
url: str,
dest: str | None = None,
@ -356,6 +358,7 @@ def build_has_upstream() -> list[str]:
# ── Parsers ────────────────────────────────────────────────────────
def parse_status(stdout: str) -> GitStatus:
"""Parse ``git status --porcelain=v1 --branch`` output.
@ -377,11 +380,13 @@ def parse_status(stdout: str) -> GitStatus:
for line in lines[1:]:
if line.startswith("?? "):
status.files.append(FileStatus(
path=line[3:],
index_status="?",
work_tree_status="?",
))
status.files.append(
FileStatus(
path=line[3:],
index_status="?",
work_tree_status="?",
)
)
continue
if len(line) < 4:
@ -394,12 +399,14 @@ def parse_status(stdout: str) -> GitStatus:
if " -> " in path:
renamed_from, path = path.split(" -> ", 1)
status.files.append(FileStatus(
path=path,
index_status=idx,
work_tree_status=wt,
renamed_from=renamed_from,
))
status.files.append(
FileStatus(
path=path,
index_status=idx,
work_tree_status=wt,
renamed_from=renamed_from,
)
)
return status
@ -427,6 +434,7 @@ def parse_branches(stdout: str) -> list[GitBranch]:
# ── Internal helpers ───────────────────────────────────────────────
def _resolve_scope_flag(scope: str) -> str:
"""Convert a scope name to a git config flag."""
scope = scope.strip().lower()
@ -436,16 +444,14 @@ def _resolve_scope_flag(scope: str) -> str:
return "--global"
if scope == "system":
return "--system"
raise ValueError(
"Git config scope must be one of: local, global, system."
)
raise ValueError("Git config scope must be one of: local, global, system.")
def _parse_branch_line(info: str, status: GitStatus) -> None:
"""Parse the ``## branch...upstream [ahead N, behind M]`` header."""
ahead_start = info.find(" [")
branch_part = info if ahead_start == -1 else info[:ahead_start]
ahead_part = None if ahead_start == -1 else info[ahead_start + 2:-1]
ahead_part = None if ahead_start == -1 else info[ahead_start + 2 : -1]
if branch_part.startswith("HEAD (detached at "):
status.detached = True
@ -457,10 +463,8 @@ def _parse_branch_line(info: str, status: GitStatus) -> None:
status.branch = local or None
status.upstream = remote or None
else:
name = (
branch_part
.replace("No commits yet on ", "")
.replace("Initial commit on ", "")
name = branch_part.replace("No commits yet on ", "").replace(
"Initial commit on ", ""
)
status.branch = name or None

View File

@ -13,9 +13,7 @@ class GitError(Exception):
exit_code (int): Process exit code.
"""
def __init__(
self, message: str, *, stderr: str = "", exit_code: int = -1
) -> None:
def __init__(self, message: str, *, stderr: str = "", exit_code: int = -1) -> None:
self.message = message
self.stderr = stderr
self.exit_code = exit_code

View File

@ -241,13 +241,9 @@ class AsyncCapsule:
self._info = info
return
if info.status in (Status.error, Status.stopped, Status.paused):
raise RuntimeError(
f"Capsule entered {info.status} state while waiting"
)
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
await asyncio.sleep(interval)
raise TimeoutError(
f"Capsule {self._id} did not become ready within {timeout}s"
)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
async def is_running(self) -> bool:
"""Check whether the capsule is currently running.

View File

@ -317,13 +317,9 @@ class Capsule:
self._info = info
return
if info.status in (Status.error, Status.stopped, Status.paused):
raise RuntimeError(
f"Capsule entered {info.status} state while waiting"
)
raise RuntimeError(f"Capsule entered {info.status} state while waiting")
time.sleep(interval)
raise TimeoutError(
f"Capsule {self._id} did not become ready within {timeout}s"
)
raise TimeoutError(f"Capsule {self._id} did not become ready within {timeout}s")
def is_running(self) -> bool:
"""Check whether the capsule is currently running.
@ -472,5 +468,3 @@ class Capsule:
self._client.close()
except Exception:
pass

View File

@ -17,7 +17,6 @@ from wrenn.code_interpreter.capsule import DEFAULT_TEMPLATE
from wrenn.code_interpreter.models import (
Execution,
ExecutionError,
Logs,
Result,
)
@ -215,9 +214,7 @@ class AsyncCapsule(BaseAsyncCapsule):
if time_left <= 0:
break
try:
data = await asyncio.wait_for(
ws.receive_json(), timeout=time_left
)
data = await asyncio.wait_for(ws.receive_json(), timeout=time_left)
except (asyncio.TimeoutError, Exception):
break
if not data:
@ -247,9 +244,7 @@ class AsyncCapsule(BaseAsyncCapsule):
result = Result.from_bundle(bundle, is_main_result=is_main)
execution.results.append(result)
if is_main:
execution.execution_count = content.get(
"execution_count"
)
execution.execution_count = content.get("execution_count")
if on_result is not None:
on_result(result)
elif msg_type == "error":
@ -261,10 +256,7 @@ class AsyncCapsule(BaseAsyncCapsule):
execution.error = err
if on_error is not None:
on_error(err)
elif (
msg_type == "status"
and content.get("execution_state") == "idle"
):
elif msg_type == "status" and content.get("execution_state") == "idle":
break
return execution

View File

@ -14,7 +14,6 @@ from wrenn.capsule import _build_proxy_url
from wrenn.code_interpreter.models import (
Execution,
ExecutionError,
Logs,
Result,
)
@ -271,9 +270,7 @@ class Capsule(BaseCapsule):
result = Result.from_bundle(bundle, is_main_result=is_main)
execution.results.append(result)
if is_main:
execution.execution_count = content.get(
"execution_count"
)
execution.execution_count = content.get("execution_count")
if on_result is not None:
on_result(result)
elif msg_type == "error":
@ -285,10 +282,7 @@ class Capsule(BaseCapsule):
execution.error = err
if on_error is not None:
on_error(err)
elif (
msg_type == "status"
and content.get("execution_state") == "idle"
):
elif msg_type == "status" and content.get("execution_state") == "idle":
break
return execution

View File

@ -197,9 +197,7 @@ class Commands:
if tag is not None:
payload["tag"] = tag
resp = self._http.post(
f"/v1/capsules/{self._capsule_id}/exec", json=payload
)
resp = self._http.post(f"/v1/capsules/{self._capsule_id}/exec", json=payload)
data = handle_response(resp)
if background:
@ -238,9 +236,7 @@ class Commands:
Raises:
WrennNotFoundError: If no process with the given PID exists.
"""
resp = self._http.delete(
f"/v1/capsules/{self._capsule_id}/processes/{pid}"
)
resp = self._http.delete(f"/v1/capsules/{self._capsule_id}/processes/{pid}")
handle_response(resp)
def connect(self, pid: int) -> Iterator[StreamEvent]:
@ -267,9 +263,7 @@ class Commands:
except httpx_ws.WebSocketDisconnect:
break
def stream(
self, cmd: str, args: list[str] | None = None
) -> Iterator[StreamEvent]:
def stream(self, cmd: str, args: list[str] | None = None) -> Iterator[StreamEvent]:
"""Execute a command via WebSocket, streaming output as events.
Args:
@ -400,9 +394,7 @@ class AsyncCommands:
list[ProcessInfo]: Running processes with their PID, tag, and
command information.
"""
resp = await self._http.get(
f"/v1/capsules/{self._capsule_id}/processes"
)
resp = await self._http.get(f"/v1/capsules/{self._capsule_id}/processes")
data = handle_response(resp)
return [
ProcessInfo(

View File

@ -24,7 +24,9 @@ def _read_env_file() -> dict[str, str]:
return result
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
env_vars = _read_env_file()
has_key = bool(os.environ.get("WRENN_API_KEY") or env_vars.get("WRENN_API_KEY"))
if has_key:

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import pytest
import respx
from wrenn.capsule import Capsule, _build_proxy_url
@ -95,7 +94,9 @@ class TestCapsuleStaticMethods:
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "running"}
)
info = Capsule._static_get_info("cl-1", api_key="wrn_test1234567890abcdef12345678")
info = Capsule._static_get_info(
"cl-1", api_key="wrn_test1234567890abcdef12345678"
)
assert info.id == "cl-1"
@ -179,7 +180,6 @@ class TestExecutionModels:
class TestDeprecationWarnings:
def test_import_sandbox_from_wrenn_warns(self):
import importlib
import sys
import warnings

View File

@ -246,9 +246,7 @@ class TestAsyncClient:
@respx.mock
async def test_async_capsules_list(self, async_client):
async with async_client:
respx.get(f"{BASE}/v1/capsules").respond(
200, json=[{"id": "sb-1"}]
)
respx.get(f"{BASE}/v1/capsules").respond(200, json=[{"id": "sb-1"}])
boxes = await async_client.capsules.list()
assert len(boxes) == 1

View File

@ -305,9 +305,7 @@ class TestPtySessionIteration:
ws = MagicMock()
messages = [
json.dumps({"type": "started", "tag": "pty-abc12345", "pid": 1}),
json.dumps(
{"type": "output", "data": base64.b64encode(b"hello").decode()}
),
json.dumps({"type": "output", "data": base64.b64encode(b"hello").decode()}),
json.dumps({"type": "exit", "exit_code": 0}),
]
ws.receive_text.side_effect = messages
@ -455,9 +453,7 @@ class TestAsyncPtySession:
ws = AsyncMock()
messages = [
json.dumps({"type": "started", "tag": "pty-xyz", "pid": 5}),
json.dumps(
{"type": "output", "data": base64.b64encode(b"hi").decode()}
),
json.dumps({"type": "output", "data": base64.b64encode(b"hi").decode()}),
json.dumps({"type": "exit", "exit_code": 0}),
]
ws.receive_text.side_effect = messages

View File

@ -4,14 +4,12 @@ import json
import pytest
import respx
from httpx import Response
from wrenn._git import (
AsyncGit,
FileStatus,
Git,
GitAuthError,
GitBranch,
GitCommandError,
GitError,
GitStatus,
@ -120,9 +118,13 @@ class TestBuildClone:
depth=5,
)
assert args == [
"git", "clone",
"--branch", "dev", "--single-branch",
"--depth", "5",
"git",
"clone",
"--branch",
"dev",
"--single-branch",
"--depth",
"5",
"https://github.com/user/repo.git",
"/tmp/repo",
]
@ -212,7 +214,9 @@ class TestBuildStatus:
class TestBuildBranches:
def test_args(self):
assert build_branches() == [
"git", "branch", "--format=%(refname:short)\t%(HEAD)"
"git",
"branch",
"--format=%(refname:short)\t%(HEAD)",
]
@ -237,7 +241,13 @@ class TestBuildBranchOps:
class TestBuildRemote:
def test_add(self):
args = build_remote_add("origin", "https://example.com/repo.git")
assert args == ["git", "remote", "add", "origin", "https://example.com/repo.git"]
assert args == [
"git",
"remote",
"add",
"origin",
"https://example.com/repo.git",
]
def test_add_with_fetch(self):
args = build_remote_add("origin", "https://example.com/repo.git", fetch=True)
@ -248,7 +258,13 @@ class TestBuildRemote:
def test_set_url(self):
args = build_remote_set_url("origin", "https://new.url/repo.git")
assert args == ["git", "remote", "set-url", "origin", "https://new.url/repo.git"]
assert args == [
"git",
"remote",
"set-url",
"origin",
"https://new.url/repo.git",
]
class TestBuildReset:
@ -445,21 +461,27 @@ class TestStripCredentials:
class TestIsAuthError:
@pytest.mark.parametrize("msg", [
"fatal: Authentication failed for 'https://...'",
"fatal: could not read Username",
"remote: Invalid username or password",
"fatal: terminal prompts disabled",
"Permission denied (publickey)",
])
@pytest.mark.parametrize(
"msg",
[
"fatal: Authentication failed for 'https://...'",
"fatal: could not read Username",
"remote: Invalid username or password",
"fatal: terminal prompts disabled",
"Permission denied (publickey)",
],
)
def test_auth_patterns(self, msg):
assert is_auth_error(msg) is True
@pytest.mark.parametrize("msg", [
"fatal: repository 'https://...' not found",
"error: pathspec 'foo' did not match any file(s)",
"",
])
@pytest.mark.parametrize(
"msg",
[
"fatal: repository 'https://...' not found",
"error: pathspec 'foo' did not match any file(s)",
"",
],
)
def test_non_auth_patterns(self, msg):
assert is_auth_error(msg) is False
@ -495,7 +517,9 @@ class TestCheckResult:
def test_auth_failure(self):
result = CommandResult(
stdout="", stderr="fatal: Authentication failed for 'https://...'", exit_code=128
stdout="",
stderr="fatal: Authentication failed for 'https://...'",
exit_code=128,
)
with pytest.raises(GitAuthError) as exc_info:
_check_result(result, op="clone")
@ -570,21 +594,27 @@ class TestGitStatus:
assert s.is_clean is True
def test_has_staged(self):
s = GitStatus(files=[
FileStatus(path="a.py", index_status="M", work_tree_status=" "),
])
s = GitStatus(
files=[
FileStatus(path="a.py", index_status="M", work_tree_status=" "),
]
)
assert s.has_staged is True
def test_has_untracked(self):
s = GitStatus(files=[
FileStatus(path="a.py", index_status="?", work_tree_status="?"),
])
s = GitStatus(
files=[
FileStatus(path="a.py", index_status="?", work_tree_status="?"),
]
)
assert s.has_untracked is True
def test_has_conflicts(self):
s = GitStatus(files=[
FileStatus(path="a.py", index_status="U", work_tree_status="U"),
])
s = GitStatus(
files=[
FileStatus(path="a.py", index_status="U", work_tree_status="U"),
]
)
assert s.has_conflicts is True
@ -596,18 +626,22 @@ class TestGitStatus:
class TestGitInit:
@respx.mock
def test_init(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="Initialized empty Git repository in /repo/.git/\n"
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(
stdout="Initialized empty Git repository in /repo/.git/\n"
),
)
git = _make_git()
result = git.init("/repo")
assert result.exit_code == 0
@respx.mock
def test_init_failure(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="fatal: cannot mkdir /readonly", exit_code=128
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(stderr="fatal: cannot mkdir /readonly", exit_code=128),
)
git = _make_git()
with pytest.raises(GitCommandError):
git.init("/readonly")
@ -616,9 +650,9 @@ class TestGitInit:
class TestGitClone:
@respx.mock
def test_clone_basic(self):
route = respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="Cloning into 'repo'...\n"
))
route = respx.post(EXEC_URL).respond(
200, json=_exec_response(stderr="Cloning into 'repo'...\n")
)
git = _make_git()
result = git.clone("https://github.com/user/repo.git")
assert result.exit_code == 0
@ -627,10 +661,13 @@ class TestGitClone:
@respx.mock
def test_clone_auth_failure(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="fatal: Authentication failed for 'https://...'",
exit_code=128,
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(
stderr="fatal: Authentication failed for 'https://...'",
exit_code=128,
),
)
git = _make_git()
with pytest.raises(GitAuthError):
git.clone("https://github.com/private/repo.git")
@ -667,20 +704,23 @@ class TestGitAdd:
class TestGitCommit:
@respx.mock
def test_commit(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="[main abc1234] initial commit\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="[main abc1234] initial commit\n")
)
git = _make_git()
result = git.commit("initial commit", cwd="/repo")
assert result.exit_code == 0
@respx.mock
def test_commit_nothing_to_commit(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="nothing to commit, working tree clean\n",
stderr="",
exit_code=1,
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(
stdout="nothing to commit, working tree clean\n",
stderr="",
exit_code=1,
),
)
git = _make_git()
with pytest.raises(GitCommandError):
git.commit("empty", cwd="/repo")
@ -702,12 +742,15 @@ class TestGitPushPull:
assert result.exit_code == 0
class TestGitStatus:
class TestGitStatusCommand:
@respx.mock
def test_status(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="## main...origin/main [ahead 1]\n M file.py\n?? new.txt\n"
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(
stdout="## main...origin/main [ahead 1]\n M file.py\n?? new.txt\n"
),
)
git = _make_git()
status = git.status(cwd="/repo")
assert isinstance(status, GitStatus)
@ -719,9 +762,9 @@ class TestGitStatus:
class TestGitBranches:
@respx.mock
def test_branches(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="main\t*\ndev\t \n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="main\t*\ndev\t \n")
)
git = _make_git()
branches = git.branches(cwd="/repo")
assert len(branches) == 2
@ -730,27 +773,27 @@ class TestGitBranches:
@respx.mock
def test_create_branch(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="Switched to a new branch 'feat'\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stderr="Switched to a new branch 'feat'\n")
)
git = _make_git()
result = git.create_branch("feat", cwd="/repo")
assert result.exit_code == 0
@respx.mock
def test_checkout_branch(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="Switched to branch 'main'\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stderr="Switched to branch 'main'\n")
)
git = _make_git()
result = git.checkout_branch("main", cwd="/repo")
assert result.exit_code == 0
@respx.mock
def test_delete_branch(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="Deleted branch old (was abc1234).\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="Deleted branch old (was abc1234).\n")
)
git = _make_git()
result = git.delete_branch("old", cwd="/repo")
assert result.exit_code == 0
@ -766,18 +809,18 @@ class TestGitRemote:
@respx.mock
def test_remote_get(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="https://example.com/repo.git\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="https://example.com/repo.git\n")
)
git = _make_git()
url = git.remote_get("origin", cwd="/repo")
assert url == "https://example.com/repo.git"
@respx.mock
def test_remote_get_not_found(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="fatal: No such remote 'nope'", exit_code=2
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stderr="fatal: No such remote 'nope'", exit_code=2)
)
git = _make_git()
url = git.remote_get("nope", cwd="/repo")
assert url is None
@ -816,9 +859,7 @@ class TestGitConfig:
@respx.mock
def test_get_config_not_set(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="", exit_code=1
))
respx.post(EXEC_URL).respond(200, json=_exec_response(stderr="", exit_code=1))
git = _make_git()
val = git.get_config("nonexistent.key", scope="global")
assert val is None
@ -897,9 +938,9 @@ class TestAsyncGit:
@pytest.mark.asyncio
@respx.mock
async def test_async_init(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="Initialized empty Git repository\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="Initialized empty Git repository\n")
)
git = _make_async_git()
result = await git.init("/repo")
assert result.exit_code == 0
@ -907,9 +948,9 @@ class TestAsyncGit:
@pytest.mark.asyncio
@respx.mock
async def test_async_status(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="## main\n M file.py\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="## main\n M file.py\n")
)
git = _make_async_git()
status = await git.status(cwd="/repo")
assert isinstance(status, GitStatus)
@ -918,9 +959,10 @@ class TestAsyncGit:
@pytest.mark.asyncio
@respx.mock
async def test_async_clone_auth_error(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stderr="fatal: Authentication failed", exit_code=128
))
respx.post(EXEC_URL).respond(
200,
json=_exec_response(stderr="fatal: Authentication failed", exit_code=128),
)
git = _make_async_git()
with pytest.raises(GitAuthError):
await git.clone("https://github.com/private/repo.git")
@ -928,9 +970,9 @@ class TestAsyncGit:
@pytest.mark.asyncio
@respx.mock
async def test_async_commit(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="[main abc1234] test\n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="[main abc1234] test\n")
)
git = _make_async_git()
result = await git.commit("test", cwd="/repo")
assert result.exit_code == 0
@ -938,9 +980,9 @@ class TestAsyncGit:
@pytest.mark.asyncio
@respx.mock
async def test_async_branches(self):
respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="main\t*\ndev\t \n"
))
respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="main\t*\ndev\t \n")
)
git = _make_async_git()
branches = await git.branches(cwd="/repo")
assert len(branches) == 2
@ -957,9 +999,9 @@ class TestCommandPayloadWrapping:
@respx.mock
def test_simple_command(self):
route = respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="hello world\n"
))
route = respx.post(EXEC_URL).respond(
200, json=_exec_response(stdout="hello world\n")
)
git = _make_git()
git.init("/repo")
body = json.loads(route.calls[0].request.content)
@ -978,9 +1020,7 @@ class TestCommandPayloadWrapping:
client = WrennClient(api_key="wrn_test1234567890abcdef12345678")
commands = Commands(CAPSULE_ID, client.http)
route = respx.post(EXEC_URL).respond(200, json=_exec_response(
stdout="3\n"
))
route = respx.post(EXEC_URL).respond(200, json=_exec_response(stdout="3\n"))
commands.run("cat /etc/passwd | wc -l")
body = json.loads(route.calls[0].request.content)
assert body["cmd"] == "/bin/sh"
@ -1082,9 +1122,7 @@ class TestCommandPayloadWrapping:
client = WrennClient(api_key="wrn_test1234567890abcdef12345678")
commands = Commands(CAPSULE_ID, client.http)
route = respx.post(EXEC_URL).respond(200, json={
"pid": 42, "tag": "bg-1"
})
route = respx.post(EXEC_URL).respond(200, json={"pid": 42, "tag": "bg-1"})
commands.run("tail -f /var/log/syslog", background=True)
body = json.loads(route.calls[0].request.content)
assert body["cmd"] == "/bin/sh"

View File

@ -179,9 +179,7 @@ class TestCommands:
assert result.exit_code == 42
def test_run_with_envs(self):
result = self.capsule.commands.run(
"export MY_VAR=test_value && echo $MY_VAR"
)
result = self.capsule.commands.run("export MY_VAR=test_value && echo $MY_VAR")
assert "test_value" in result.stdout
def test_run_with_cwd(self):
@ -195,9 +193,7 @@ class TestCommands:
assert len(lines) == 3
def test_run_background(self):
handle = self.capsule.commands.run(
"sleep 30", background=True, tag="bg-test"
)
handle = self.capsule.commands.run("sleep 30", background=True, tag="bg-test")
assert isinstance(handle, CommandHandle)
assert handle.pid > 0
assert handle.tag == "bg-test"
@ -206,9 +202,7 @@ class TestCommands:
self.capsule.commands.kill(handle.pid)
def test_list_processes(self):
handle = self.capsule.commands.run(
"sleep 30", background=True, tag="list-test"
)
handle = self.capsule.commands.run("sleep 30", background=True, tag="list-test")
try:
time.sleep(0.5)
processes = self.capsule.commands.list()
@ -222,9 +216,7 @@ class TestCommands:
self.capsule.commands.kill(handle.pid)
def test_kill_process(self):
handle = self.capsule.commands.run(
"sleep 30", background=True
)
handle = self.capsule.commands.run("sleep 30", background=True)
self.capsule.commands.kill(handle.pid)
time.sleep(0.5)