Files
python-sdk/tests/test_capsule_features.py
pptx704 2b10fde45b
All checks were successful
ci/woodpecker/push/unit Pipeline was successful
v0.1.4 (#9)
## What's New?

- Updated the SDK to support v0.2.0
- Improved the test suite
- Minor bugfix
- No breaking changes

Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com>
Reviewed-on: #9
Co-authored-by: pptx704 <rafeed@omukk.dev>
Co-committed-by: pptx704 <rafeed@omukk.dev>
2026-05-20 21:01:21 +00:00

426 lines
15 KiB
Python

from __future__ import annotations
import httpx
import pytest
import respx
from wrenn.capsule import Capsule, _build_http_proxy_url, _build_proxy_url
from wrenn.code_runner.models import Execution, ExecutionError, Logs, Result
BASE = "https://app.wrenn.dev/api"
API_KEY = "wrn_test1234567890abcdef12345678"
class TestBuildProxyUrl:
def test_https_production(self):
url = _build_proxy_url("https://app.wrenn.dev/api", "cl-abc123", 8888)
assert url == "wss://8888-cl-abc123.app.wrenn.dev"
def test_http_localhost(self):
url = _build_proxy_url("http://localhost:8080", "cl-abc123", 3000)
assert url == "ws://3000-cl-abc123.localhost:8080"
def test_https_custom_port(self):
url = _build_proxy_url("https://api.example.com:9443", "sb-1", 8080)
assert url == "wss://8080-sb-1.api.example.com:9443"
def test_http_no_port(self):
url = _build_proxy_url("http://192.168.1.1", "sb-2", 5000)
assert url == "ws://5000-sb-2.192.168.1.1"
class TestBuildHttpProxyUrl:
"""``get_url`` returns an HTTP(S) URL; ``/api`` path on the base URL is
discarded — only the host is used to build the proxy subdomain."""
def test_https_production_strips_api_path(self):
url = _build_http_proxy_url("https://app.wrenn.dev/api", "cl-abc", 8080)
assert url == "https://8080-cl-abc.app.wrenn.dev"
def test_http_localhost_preserves_port(self):
url = _build_http_proxy_url("http://localhost:8080/api", "cl-abc", 3000)
assert url == "http://3000-cl-abc.localhost:8080"
def test_https_custom_port(self):
url = _build_http_proxy_url("https://api.example.com:9443", "sb-1", 80)
assert url == "https://80-sb-1.api.example.com:9443"
def test_proxy_domain_override_http(self):
url = _build_http_proxy_url(
"https://app.wrenn.dev/api", "cl-abc", 8080, "wrenn.dev"
)
assert url == "https://8080-cl-abc.wrenn.dev"
def test_proxy_domain_override_ws(self):
url = _build_proxy_url("https://app.wrenn.dev/api", "cl-abc", 8888, "wrenn.dev")
assert url == "wss://8888-cl-abc.wrenn.dev"
class TestCapsuleCreate:
@respx.mock
def test_capsule_constructor_creates(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting", "template": "minimal"}
)
cap = Capsule(
template="minimal",
api_key="wrn_test1234567890abcdef12345678",
base_url=BASE,
)
assert cap.capsule_id == "cl-1"
assert hasattr(cap, "commands")
assert hasattr(cap, "files")
@respx.mock
def test_capsule_create_classmethod(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-2", "status": "starting"}
)
cap = Capsule.create(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert cap.capsule_id == "cl-2"
@respx.mock
def test_capsule_context_manager_kills(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
kill_route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
with Capsule(api_key="wrn_test1234567890abcdef12345678", base_url=BASE) as cap:
assert cap.capsule_id == "cl-1"
assert kill_route.called
@respx.mock
def test_capsule_env_var(self, monkeypatch):
monkeypatch.setenv("WRENN_API_KEY", "wrn_from_env_key")
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-3", "status": "starting"}
)
cap = Capsule(base_url=BASE)
assert cap.capsule_id == "cl-3"
class TestCapsuleStaticMethods:
@respx.mock
def test_static_destroy(self):
route = respx.delete(f"{BASE}/v1/capsules/cl-1").respond(202)
Capsule._static_destroy(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert route.called
@respx.mock
def test_static_pause(self):
respx.post(f"{BASE}/v1/capsules/cl-1/pause").respond(
202, json={"id": "cl-1", "status": "pausing"}
)
info = Capsule._static_pause(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert info.status.value == "pausing"
@respx.mock
def test_static_list(self):
respx.get(f"{BASE}/v1/capsules").respond(
200, json=[{"id": "cl-1", "status": "running"}]
)
items = Capsule.list(api_key="wrn_test1234567890abcdef12345678", base_url=BASE)
assert len(items) == 1
assert items[0].id == "cl-1"
@respx.mock
def test_static_get_info(self):
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "running"}
)
info = Capsule._static_get_info(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert info.id == "cl-1"
class TestCapsuleConnect:
@respx.mock
def test_connect_running(self):
respx.get(f"{BASE}/v1/capsules/cl-1").respond(
200, json={"id": "cl-1", "status": "running"}
)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert cap.capsule_id == "cl-1"
@respx.mock
def test_connect_paused_resumes(self):
get_route = respx.get(f"{BASE}/v1/capsules/cl-1")
get_route.side_effect = [
httpx.Response(200, json={"id": "cl-1", "status": "paused"}),
httpx.Response(200, json={"id": "cl-1", "status": "running"}),
]
respx.post(f"{BASE}/v1/capsules/cl-1/resume").respond(
202, json={"id": "cl-1", "status": "resuming"}
)
cap = Capsule.connect(
"cl-1", api_key="wrn_test1234567890abcdef12345678", base_url=BASE
)
assert cap.capsule_id == "cl-1"
class TestExecutionModels:
def test_execution_defaults(self):
e = Execution()
assert e.results == []
assert e.logs.stdout == []
assert e.logs.stderr == []
assert e.error is None
assert e.text is None
def test_result_from_bundle(self):
bundle = {"text/plain": "84", "image/png": "base64data"}
r = Result.from_bundle(bundle, is_main_result=True)
assert r.text == "84"
assert r.png == "base64data"
assert r.is_main_result is True
def test_result_from_bundle_preserves_text_plain(self):
# ``text/plain`` is the Jupyter repr — preserved verbatim now.
bundle = {"text/plain": "'hello'"}
r = Result.from_bundle(bundle)
assert r.text == "'hello'"
def test_result_from_bundle_extra_mimes(self):
bundle = {"text/plain": "x", "application/vnd.custom": "data"}
r = Result.from_bundle(bundle)
assert r.extra == {"application/vnd.custom": "data"}
def test_result_formats(self):
r = Result(text="hi", png="data")
assert "text" in r.formats()
assert "png" in r.formats()
assert "html" not in r.formats()
def test_execution_text_property(self):
e = Execution(
results=[
Result(text="chart", is_main_result=False),
Result(text="42", is_main_result=True),
]
)
assert e.text == "42"
def test_execution_error(self):
err = ExecutionError(
name="ZeroDivisionError",
value="division by zero",
traceback="Traceback ...\nZeroDivisionError: division by zero",
)
e = Execution(error=err)
assert e.error is not None
assert "ZeroDivisionError" in e.error.name
def test_logs(self):
logs = Logs(stdout=["hello\n", "world\n"], stderr=["warn\n"])
assert "".join(logs.stdout) == "hello\nworld\n"
assert "".join(logs.stderr) == "warn\n"
class TestGetUrlPublic:
"""``Capsule.get_url`` returns the HTTP proxy URL."""
@respx.mock
def test_sync_get_url_default_base(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-99", "status": "starting"}
)
cap = Capsule(api_key=API_KEY, base_url=BASE)
assert cap.get_url(8080) == "https://8080-cl-99.wrenn.dev"
@respx.mock
def test_sync_get_url_localhost(self):
local_base = "http://localhost:8080/api"
respx.post(f"{local_base}/v1/capsules").respond(
202, json={"id": "cl-42", "status": "starting"}
)
cap = Capsule(api_key=API_KEY, base_url=local_base)
assert cap.get_url(3000) == "http://3000-cl-42.localhost:8080"
@pytest.mark.asyncio
@respx.mock
async def test_async_get_url(self):
from wrenn.async_capsule import AsyncCapsule
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-async", "status": "starting"}
)
cap = await AsyncCapsule.create(api_key=API_KEY, base_url=BASE)
assert cap.get_url(5000) == "https://5000-cl-async.wrenn.dev"
await cap._client.aclose()
class TestPtyConnect:
"""``pty_connect`` reconnects to an existing PTY session by tag."""
def _capsule(self):
with respx.mock:
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
return Capsule(api_key=API_KEY, base_url=BASE)
def test_sync_pty_connect_sends_connect_frame(self):
from unittest.mock import MagicMock, patch
cap = self._capsule()
ws = MagicMock()
ctx = MagicMock()
ctx.__enter__.return_value = ws
ctx.__exit__.return_value = False
with patch("wrenn.capsule.httpx_ws.connect_ws", return_value=ctx):
with cap.pty_connect("tag-xyz") as session:
assert session is not None
# First send_text call must be a ``connect`` frame with the tag.
import json as _json
sent = ws.send_text.call_args_list[0].args[0]
payload = _json.loads(sent)
assert payload == {"type": "connect", "tag": "tag-xyz"}
@pytest.mark.asyncio
@respx.mock
async def test_async_pty_connect_sends_connect_frame(self):
from unittest.mock import AsyncMock, MagicMock, patch
from wrenn.async_capsule import AsyncCapsule
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
cap = await AsyncCapsule.create(api_key=API_KEY, base_url=BASE)
ws = MagicMock()
ws.send_text = AsyncMock()
ctx = MagicMock()
ctx.__aenter__ = AsyncMock(return_value=ws)
ctx.__aexit__ = AsyncMock(return_value=False)
with patch("wrenn.async_capsule.httpx_ws.aconnect_ws", return_value=ctx):
async with cap.pty_connect("tag-async") as session:
assert session is not None
import json as _json
sent = ws.send_text.call_args_list[0].args[0]
payload = _json.loads(sent)
assert payload == {"type": "connect", "tag": "tag-async"}
await cap._client.aclose()
class TestCreateSnapshot:
@respx.mock
def test_sync_create_snapshot_posts_capsule_id(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
snap_route = respx.post(f"{BASE}/v1/snapshots").respond(
201,
json={"name": "my-snap"},
)
cap = Capsule(api_key=API_KEY, base_url=BASE)
tpl = cap.create_snapshot(name="my-snap", overwrite=True)
import json as _json
req = snap_route.calls[0].request
body = _json.loads(req.content)
assert body["sandbox_id"] == "cl-1"
assert body["name"] == "my-snap"
assert req.url.params["overwrite"] == "true"
assert tpl.name == "my-snap"
@pytest.mark.asyncio
@respx.mock
async def test_async_create_snapshot(self):
from wrenn.async_capsule import AsyncCapsule
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
respx.post(f"{BASE}/v1/snapshots").respond(
201,
json={"name": "auto-named"},
)
cap = await AsyncCapsule.create(api_key=API_KEY, base_url=BASE)
tpl = await cap.create_snapshot()
assert tpl.name == "auto-named"
await cap._client.aclose()
class TestUploadStreamChunked:
"""``upload_stream`` must declare ``Transfer-Encoding: chunked`` and
deliver the multipart body without buffering."""
@respx.mock
def test_sync_upload_stream_chunked(self):
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
route = respx.post(f"{BASE}/v1/capsules/cl-1/files/stream/write").respond(
200, json={}
)
cap = Capsule(api_key=API_KEY, base_url=BASE)
def chunks():
yield b"hello "
yield b"world\n"
cap.files.upload_stream("/tmp/out.txt", chunks())
req = route.calls[0].request
assert req.headers["transfer-encoding"] == "chunked"
ct = req.headers["content-type"]
assert ct.startswith("multipart/form-data; boundary=")
body = bytes(req.content)
assert b'name="path"' in body
assert b"/tmp/out.txt" in body
assert b'name="file"' in body
assert b"hello world\n" in body
@pytest.mark.asyncio
@respx.mock
async def test_async_upload_stream_chunked(self):
from wrenn.async_capsule import AsyncCapsule
respx.post(f"{BASE}/v1/capsules").respond(
202, json={"id": "cl-1", "status": "starting"}
)
route = respx.post(f"{BASE}/v1/capsules/cl-1/files/stream/write").respond(
200, json={}
)
cap = await AsyncCapsule.create(api_key=API_KEY, base_url=BASE)
async def chunks():
yield b"abc"
yield b"def"
await cap.files.upload_stream("/tmp/out.bin", chunks())
req = route.calls[0].request
assert req.headers["transfer-encoding"] == "chunked"
body = bytes(req.content)
assert b"abcdef" in body
await cap._client.aclose()
class TestDeprecationWarnings:
def test_import_sandbox_from_wrenn_warns(self):
import sys
import warnings
# Clear cached attribute
if "Sandbox" in dir(sys.modules.get("wrenn", object())):
delattr(sys.modules["wrenn"], "Sandbox")
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
from wrenn import Sandbox
assert Sandbox is Capsule
fw = [x for x in w if issubclass(x.category, FutureWarning)]
assert len(fw) >= 1
assert "Sandbox" in str(fw[0].message)