forked from wrenn/python-sdk
Modularized the integration tests
This commit is contained in:
0
tests/integration/__init__.py
Normal file
0
tests/integration/__init__.py
Normal file
90
tests/integration/conftest.py
Normal file
90
tests/integration/conftest.py
Normal file
@ -0,0 +1,90 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from typing_extensions import AsyncGenerator
|
||||
|
||||
from wrenn.client import AsyncWrennClient, WrennClient
|
||||
|
||||
WRENN_API_KEY = os.environ.get("WRENN_API_KEY")
|
||||
WRENN_TOKEN = os.environ.get("WRENN_TOKEN")
|
||||
WRENN_BASE_URL = os.environ.get("WRENN_BASE_URL", "http://localhost:8080")
|
||||
WRENN_TEST_EMAIL = os.environ.get("WRENN_TEST_EMAIL")
|
||||
WRENN_TEST_PASSWORD = os.environ.get("WRENN_TEST_PASSWORD")
|
||||
|
||||
|
||||
def _has_auth() -> bool:
|
||||
return bool(WRENN_API_KEY or WRENN_TOKEN)
|
||||
|
||||
|
||||
requires_auth = pytest.mark.skipif(
|
||||
not _has_auth(),
|
||||
reason="Set WRENN_API_KEY or WRENN_TOKEN to run integration tests",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client() -> Generator[WrennClient, None, None]:
|
||||
with WrennClient(
|
||||
api_key=WRENN_API_KEY,
|
||||
token=WRENN_TOKEN,
|
||||
base_url=WRENN_BASE_URL,
|
||||
) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def async_client() -> AsyncGenerator[AsyncWrennClient, None]:
|
||||
async with AsyncWrennClient(
|
||||
api_key=WRENN_API_KEY, token=WRENN_TOKEN, base_url=WRENN_BASE_URL
|
||||
) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bearer_client() -> Generator[WrennClient, None, None]:
|
||||
if WRENN_TOKEN:
|
||||
with WrennClient(token=WRENN_TOKEN, base_url=WRENN_BASE_URL) as c:
|
||||
yield c
|
||||
elif WRENN_TEST_EMAIL and WRENN_TEST_PASSWORD:
|
||||
with WrennClient(api_key=WRENN_API_KEY, base_url=WRENN_BASE_URL) as c:
|
||||
resp = c.auth.login(WRENN_TEST_EMAIL, WRENN_TEST_PASSWORD)
|
||||
with WrennClient(token=resp.token, base_url=WRENN_BASE_URL) as c:
|
||||
yield c
|
||||
else:
|
||||
pytest.skip(
|
||||
"Set WRENN_TOKEN or WRENN_TEST_EMAIL+WRENN_TEST_PASSWORD for bearer-auth tests"
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def async_minimal_capsule(async_client: AsyncWrennClient):
|
||||
"""Provides a ready-to-use minimal capsule and cleans it up afterward."""
|
||||
cap = await async_client.capsules.create(template="minimal", timeout_sec=120)
|
||||
await cap.async_wait_ready(timeout=60, interval=1)
|
||||
yield cap
|
||||
await cap.async_destroy()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def async_python_capsule(async_client: AsyncWrennClient):
|
||||
"""Provides a ready-to-use Python interpreter capsule."""
|
||||
cap = await async_client.capsules.create(
|
||||
template="python-interpreter-v0-beta", timeout_sec=120
|
||||
)
|
||||
await cap.async_wait_ready(timeout=60, interval=1)
|
||||
yield cap
|
||||
await cap.async_destroy()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def minimal_capsule(
|
||||
client: WrennClient,
|
||||
) -> Generator[Any, None, None]: # Replace Any with your Capsule type
|
||||
"""Provides a ready-to-use minimal capsule and cleans it up afterward."""
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
yield cap
|
||||
78
tests/integration/test_async.py
Normal file
78
tests/integration/test_async.py
Normal file
@ -0,0 +1,78 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from wrenn.capsule import Capsule
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
# --- Tests ---
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestAsyncCapsuleLifecycle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_create_exec_destroy(self, async_minimal_capsule: Capsule):
|
||||
result = await async_minimal_capsule.async_exec("echo", args=["async_hello"])
|
||||
assert result.exit_code == 0
|
||||
assert "async_hello" in result.stdout
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_upload_download(self, async_minimal_capsule: Capsule):
|
||||
content = b"Async upload test"
|
||||
await async_minimal_capsule.async_upload("/tmp/async_test.txt", content)
|
||||
downloaded = await async_minimal_capsule.async_download("/tmp/async_test.txt")
|
||||
assert downloaded == content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_run_code(self, async_python_capsule: Capsule):
|
||||
r = await async_python_capsule.async_run_code("42 * 2")
|
||||
assert r.text == "84"
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestAsyncFilesystem:
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_list_dir(self, async_minimal_capsule: Capsule):
|
||||
await async_minimal_capsule.async_mkdir("/tmp/async_ls_test")
|
||||
await async_minimal_capsule.async_upload("/tmp/async_ls_test/file.txt", b"data")
|
||||
entries = await async_minimal_capsule.async_list_dir("/tmp/async_ls_test")
|
||||
|
||||
assert isinstance(entries, list)
|
||||
assert any(e.name == "file.txt" for e in entries)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_mkdir(self, async_minimal_capsule: Capsule):
|
||||
entry = await async_minimal_capsule.async_mkdir("/tmp/async_mkdir_test")
|
||||
assert entry.type == "directory"
|
||||
assert entry.name == "async_mkdir_test"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_remove(self, async_minimal_capsule: Capsule):
|
||||
await async_minimal_capsule.async_upload("/tmp/async_rm.txt", b"bye")
|
||||
|
||||
entries = await async_minimal_capsule.async_list_dir("/tmp")
|
||||
assert any(e.name == "async_rm.txt" for e in entries)
|
||||
|
||||
await async_minimal_capsule.async_remove("/tmp/async_rm.txt")
|
||||
entries = await async_minimal_capsule.async_list_dir("/tmp")
|
||||
assert not any(e.name == "async_rm.txt" for e in entries)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_full_filesystem_roundtrip(
|
||||
self, async_minimal_capsule: Capsule
|
||||
):
|
||||
await async_minimal_capsule.async_mkdir("/tmp/async_rt")
|
||||
await async_minimal_capsule.async_upload(
|
||||
"/tmp/async_rt/file.txt", b"async content"
|
||||
)
|
||||
|
||||
entries = await async_minimal_capsule.async_list_dir("/tmp/async_rt")
|
||||
assert any(e.name == "file.txt" for e in entries)
|
||||
|
||||
data = await async_minimal_capsule.async_download("/tmp/async_rt/file.txt")
|
||||
assert data == b"async content"
|
||||
|
||||
await async_minimal_capsule.async_remove("/tmp/async_rt/file.txt")
|
||||
entries = await async_minimal_capsule.async_list_dir("/tmp/async_rt")
|
||||
assert not any(e.name == "file.txt" for e in entries)
|
||||
28
tests/integration/test_auth_apikeys.py
Normal file
28
tests/integration/test_auth_apikeys.py
Normal file
@ -0,0 +1,28 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from wrenn.client import WrennClient
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestSnapshots:
|
||||
def test_list_templates(self, client: WrennClient):
|
||||
templates = client.snapshots.list()
|
||||
assert isinstance(templates, list)
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestAPIKeys:
|
||||
def test_create_list_delete(self, bearer_client: WrennClient):
|
||||
key_resp = bearer_client.api_keys.create(name="integration-test-key")
|
||||
assert key_resp.name == "integration-test-key"
|
||||
assert key_resp.key is not None
|
||||
assert key_resp.id is not None
|
||||
|
||||
try:
|
||||
keys = bearer_client.api_keys.list()
|
||||
ids = [k.id for k in keys]
|
||||
assert key_resp.id in ids
|
||||
finally:
|
||||
bearer_client.api_keys.delete(key_resp.id)
|
||||
91
tests/integration/test_capsule_lifecycle.py
Normal file
91
tests/integration/test_capsule_lifecycle.py
Normal file
@ -0,0 +1,91 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from wrenn.capsule import Capsule
|
||||
from wrenn.client import WrennClient
|
||||
from wrenn.exceptions import WrennNotFoundError, WrennValidationError
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestCapsuleLifecycle:
|
||||
def test_create_exec_destroy(self, minimal_capsule: Capsule):
|
||||
result = minimal_capsule.exec("echo", args=["hello"])
|
||||
assert result.exit_code == 0
|
||||
assert "hello" in result.stdout
|
||||
|
||||
def test_exec_with_args(self, minimal_capsule: Capsule):
|
||||
result = minimal_capsule.exec("echo", args=["hello", "world"])
|
||||
assert result.exit_code == 0
|
||||
assert "hello world" in result.stdout
|
||||
|
||||
def test_exec_nonzero_exit(self, minimal_capsule: Capsule):
|
||||
result = minimal_capsule.exec("sh", args=["-c", "exit 42"])
|
||||
assert result.exit_code == 42
|
||||
|
||||
def test_exec_stderr(self, minimal_capsule: Capsule):
|
||||
result = minimal_capsule.exec("sh", args=["-c", "echo err>&2"])
|
||||
assert result.exit_code == 0
|
||||
assert "err" in result.stderr
|
||||
|
||||
def test_context_manager_cleanup(self, client: WrennClient):
|
||||
# This test explicitly requires manual management to verify the context manager
|
||||
cap = client.capsules.create(template="minimal", timeout_sec=120)
|
||||
cap_id = cap.id
|
||||
|
||||
with cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
|
||||
fetched = client.capsules.get(cap_id)
|
||||
assert fetched.status in ("stopped", "destroyed")
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestPauseResume:
|
||||
def test_pause_and_resume(self, minimal_capsule: Capsule):
|
||||
minimal_capsule.pause()
|
||||
assert minimal_capsule.status == "paused"
|
||||
|
||||
minimal_capsule.resume()
|
||||
minimal_capsule.wait_ready(timeout=60, interval=1)
|
||||
|
||||
result = minimal_capsule.exec("echo", args=["resumed"])
|
||||
assert result.exit_code == 0
|
||||
assert "resumed" in result.stdout
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestPing:
|
||||
def test_ping_resets_timer(self, minimal_capsule: Capsule):
|
||||
minimal_capsule.ping()
|
||||
result = minimal_capsule.exec("echo", args=["still_alive"])
|
||||
assert result.exit_code == 0
|
||||
assert "still_alive" in result.stdout
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestProxy:
|
||||
def test_get_url(self, minimal_capsule: Capsule):
|
||||
url = minimal_capsule.get_url(8888)
|
||||
assert minimal_capsule.id in url
|
||||
assert "8888" in url
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestListAndGet:
|
||||
def test_list_capsules(self, client: WrennClient, minimal_capsule: Capsule):
|
||||
# Require minimal_capsule to ensure one exists, use client to list
|
||||
boxes = client.capsules.list()
|
||||
ids = [b.id for b in boxes]
|
||||
assert minimal_capsule.id in ids
|
||||
|
||||
def test_get_existing_capsule(self, client: WrennClient, minimal_capsule: Capsule):
|
||||
fetched = client.capsules.get(minimal_capsule.id)
|
||||
assert fetched.id == minimal_capsule.id
|
||||
assert fetched.status == "running"
|
||||
|
||||
def test_get_nonexistent_capsule(self, client: WrennClient):
|
||||
with pytest.raises((WrennNotFoundError, WrennValidationError)):
|
||||
client.capsules.get("cl-nonexistent00000000000000000")
|
||||
133
tests/integration/test_filesystem.py
Normal file
133
tests/integration/test_filesystem.py
Normal file
@ -0,0 +1,133 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from wrenn.client import WrennClient
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestFileIO:
|
||||
def test_upload_and_download(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
content = b"Hello from integration test!"
|
||||
cap.upload("/tmp/test_file.txt", content)
|
||||
downloaded = cap.download("/tmp/test_file.txt")
|
||||
assert downloaded == content
|
||||
|
||||
def test_download_nonexistent_file(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with pytest.raises(Exception):
|
||||
cap.download("/tmp/no_such_file_12345")
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestFilesystemListDir:
|
||||
def test_list_dir_root(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/ls_test_root")
|
||||
cap.upload("/tmp/ls_test_root/hello.txt", b"hello")
|
||||
entries = cap.list_dir("/tmp/ls_test_root")
|
||||
assert isinstance(entries, list)
|
||||
names = [e.name for e in entries]
|
||||
assert "hello.txt" in names
|
||||
|
||||
def test_list_dir_after_mkdir(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/fs_test_dir")
|
||||
entries = cap.list_dir("/tmp")
|
||||
names = [e.name for e in entries]
|
||||
assert "fs_test_dir" in names
|
||||
|
||||
def test_list_dir_file_metadata(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.upload("/tmp/meta_test.txt", b"hello world")
|
||||
entries = cap.list_dir("/tmp")
|
||||
match = [e for e in entries if e.name == "meta_test.txt"]
|
||||
assert len(match) == 1
|
||||
f = match[0]
|
||||
assert f.type == "file"
|
||||
assert f.size == 11
|
||||
assert f.permissions is not None
|
||||
assert f.owner is not None
|
||||
assert f.group is not None
|
||||
assert f.modified_at is not None
|
||||
|
||||
def test_list_dir_depth(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/depth_a/depth_b")
|
||||
cap.upload("/tmp/depth_a/depth_b/nested.txt", b"deep")
|
||||
entries = cap.list_dir("/tmp/depth_a", depth=2)
|
||||
paths = [e.path for e in entries]
|
||||
assert any("nested.txt" in p for p in paths)
|
||||
|
||||
def test_list_dir_empty_directory(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/empty_dir_test")
|
||||
entries = cap.list_dir("/tmp/empty_dir_test")
|
||||
assert entries == []
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestFilesystemMkdir:
|
||||
def test_mkdir_creates_directory(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
entry = cap.mkdir("/tmp/mkdir_test")
|
||||
assert entry.name == "mkdir_test"
|
||||
assert entry.type == "directory"
|
||||
assert entry.path == "/tmp/mkdir_test"
|
||||
|
||||
def test_mkdir_creates_parents(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
entry = cap.mkdir("/tmp/a/b/c/d")
|
||||
assert entry.type == "directory"
|
||||
|
||||
def test_mkdir_already_exists(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/exist_test")
|
||||
entry = cap.mkdir("/tmp/exist_test")
|
||||
assert entry.type == "directory"
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestFilesystemRemove:
|
||||
def test_remove_file(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.upload("/tmp/rm_test.txt", b"delete me")
|
||||
entries_before = cap.list_dir("/tmp")
|
||||
assert any(e.name == "rm_test.txt" for e in entries_before)
|
||||
cap.remove("/tmp/rm_test.txt")
|
||||
entries_after = cap.list_dir("/tmp")
|
||||
assert not any(e.name == "rm_test.txt" for e in entries_after)
|
||||
|
||||
def test_remove_directory(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
cap.mkdir("/tmp/rm_dir_test")
|
||||
cap.upload("/tmp/rm_dir_test/file.txt", b"inside")
|
||||
cap.remove("/tmp/rm_dir_test")
|
||||
entries = cap.list_dir("/tmp")
|
||||
assert not any(e.name == "rm_dir_test" for e in entries)
|
||||
|
||||
def test_upload_download_remove_roundtrip(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
content = b"round trip test data " * 100
|
||||
cap.upload("/tmp/rt.txt", content)
|
||||
downloaded = cap.download("/tmp/rt.txt")
|
||||
assert downloaded == content
|
||||
cap.remove("/tmp/rt.txt")
|
||||
with pytest.raises(Exception):
|
||||
cap.download("/tmp/rt.txt")
|
||||
77
tests/integration/test_pty.py
Normal file
77
tests/integration/test_pty.py
Normal file
@ -0,0 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from wrenn.client import WrennClient
|
||||
from wrenn.pty import PtyEventType
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestPty:
|
||||
def test_pty_basic_output(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with cap.pty(cmd="/bin/sh", cwd="/tmp") as term:
|
||||
term.write(b"echo pty_hello\n")
|
||||
output = b""
|
||||
for event in term:
|
||||
if event.type == PtyEventType.output:
|
||||
output += event.data
|
||||
elif event.type == PtyEventType.exit:
|
||||
break
|
||||
if b"pty_hello" in output:
|
||||
term.write(b"exit\n")
|
||||
assert b"pty_hello" in output
|
||||
|
||||
def test_pty_tag_and_pid(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with cap.pty(cmd="/bin/sh") as term:
|
||||
started = False
|
||||
for event in term:
|
||||
if event.type == PtyEventType.started:
|
||||
started = True
|
||||
assert term.tag is not None
|
||||
assert term.pid is not None
|
||||
assert term.tag.startswith("pty-")
|
||||
elif event.type == PtyEventType.output:
|
||||
term.write(b"exit\n")
|
||||
elif event.type == PtyEventType.exit:
|
||||
break
|
||||
assert started
|
||||
|
||||
def test_pty_exit_on_command_exit(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with cap.pty(cmd="/bin/echo", args=["immediate"]) as term:
|
||||
events = list(term)
|
||||
types = [e.type for e in events]
|
||||
assert PtyEventType.started in types
|
||||
assert PtyEventType.output in types or PtyEventType.exit in types
|
||||
|
||||
def test_pty_resize(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with cap.pty(cmd="/bin/sh", cols=80, rows=24) as term:
|
||||
for event in term:
|
||||
if event.type == PtyEventType.started:
|
||||
term.resize(120, 40)
|
||||
term.write(b"exit\n")
|
||||
elif event.type == PtyEventType.exit:
|
||||
break
|
||||
|
||||
def test_pty_envs(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
with cap.pty(cmd="/bin/sh", envs={"MY_VAR": "hello_env"}) as term:
|
||||
output = b""
|
||||
for event in term:
|
||||
if event.type == PtyEventType.started:
|
||||
term.write(b"echo $MY_VAR\n")
|
||||
elif event.type == PtyEventType.output:
|
||||
output += event.data
|
||||
if b"hello_env" in output:
|
||||
term.write(b"exit\n")
|
||||
elif event.type == PtyEventType.exit:
|
||||
break
|
||||
assert b"hello_env" in output
|
||||
49
tests/integration/test_run_code.py
Normal file
49
tests/integration/test_run_code.py
Normal file
@ -0,0 +1,49 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from wrenn.client import WrennClient
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestRunCode:
|
||||
def test_basic_execution(self, client: WrennClient):
|
||||
with client.capsules.create(
|
||||
template="python-interpreter-v0-beta", timeout_sec=120
|
||||
) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
|
||||
r = cap.run_code("x = 42")
|
||||
assert r.error is None
|
||||
|
||||
r = cap.run_code("x * 2")
|
||||
assert r.text == "84"
|
||||
|
||||
def test_state_persists(self, client: WrennClient):
|
||||
with client.capsules.create(
|
||||
template="python-interpreter-v0-beta", timeout_sec=120
|
||||
) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
|
||||
cap.run_code("def greet(name): return f'hello {name}'")
|
||||
r = cap.run_code("greet('capsule')")
|
||||
assert "hello capsule" in (r.text or "")
|
||||
|
||||
def test_error_traceback(self, client: WrennClient):
|
||||
with client.capsules.create(
|
||||
template="python-interpreter-v0-beta", timeout_sec=120
|
||||
) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
|
||||
r = cap.run_code("1/0")
|
||||
assert r.error is not None
|
||||
assert "ZeroDivisionError" in r.error
|
||||
|
||||
def test_stdout_capture(self, client: WrennClient):
|
||||
with client.capsules.create(
|
||||
template="python-interpreter-v0-beta", timeout_sec=120
|
||||
) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
|
||||
r = cap.run_code("print('hello from kernel')")
|
||||
assert "hello from kernel" in r.stdout
|
||||
30
tests/integration/test_streaming.py
Normal file
30
tests/integration/test_streaming.py
Normal file
@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from wrenn.client import WrennClient
|
||||
|
||||
from .conftest import requires_auth
|
||||
|
||||
|
||||
@requires_auth
|
||||
class TestStreamUploadDownload:
|
||||
def test_stream_upload_and_download(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
chunks = [b"chunk0_", b"chunk1_", b"chunk2"]
|
||||
|
||||
def data_gen():
|
||||
yield from chunks
|
||||
|
||||
cap.stream_upload("/tmp/stream_test.bin", data_gen())
|
||||
downloaded = cap.download("/tmp/stream_test.bin")
|
||||
assert downloaded == b"chunk0_chunk1_chunk2"
|
||||
|
||||
def test_stream_download_large(self, client: WrennClient):
|
||||
with client.capsules.create(template="minimal", timeout_sec=120) as cap:
|
||||
cap.wait_ready(timeout=60, interval=1)
|
||||
content = b"x" * 65536 * 3
|
||||
cap.upload("/tmp/large.bin", content)
|
||||
collected = b""
|
||||
for chunk in cap.stream_download("/tmp/large.bin"):
|
||||
collected += chunk
|
||||
assert collected == content
|
||||
Reference in New Issue
Block a user