All checks were successful
ci/woodpecker/push/unit Pipeline was successful
Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com> Reviewed-on: #13
409 lines
13 KiB
Python
409 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from wrenn import Capsule, CommandResult
|
|
from wrenn.commands import CommandHandle, ProcessInfo
|
|
from wrenn.models import Capsule as CapsuleModel, FileEntry, Status
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
_env_loaded = False
|
|
|
|
|
|
def _ensure_env() -> None:
|
|
global _env_loaded
|
|
if _env_loaded:
|
|
return
|
|
_env_loaded = True
|
|
env_file = Path(__file__).resolve().parent.parent / ".env"
|
|
if not env_file.exists():
|
|
return
|
|
for line in env_file.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
key, value = key.strip(), value.strip().strip("\"'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
class TestCapsuleLifecycle:
|
|
"""Each test manages its own capsule to test create/destroy paths."""
|
|
|
|
def setup_method(self):
|
|
_ensure_env()
|
|
|
|
def test_create_and_destroy(self):
|
|
capsule = Capsule()
|
|
capsule_id = capsule.capsule_id
|
|
try:
|
|
assert capsule_id
|
|
assert capsule.info is not None
|
|
finally:
|
|
capsule.destroy(wait=True)
|
|
|
|
info = Capsule.get_info(capsule_id)
|
|
assert info.status in (Status.stopped, Status.missing)
|
|
|
|
def test_create_with_wait(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
assert capsule.info is not None
|
|
assert capsule.info.status == Status.running
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_context_manager_destroys(self):
|
|
with Capsule(wait=True) as capsule:
|
|
capsule_id = capsule.capsule_id
|
|
assert capsule.is_running()
|
|
|
|
info = Capsule.get_info(capsule_id)
|
|
assert info.status in (Status.stopping, Status.stopped, Status.missing)
|
|
|
|
def test_get_info(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
info = capsule.get_info()
|
|
assert isinstance(info, CapsuleModel)
|
|
assert info.id == capsule.capsule_id
|
|
assert info.status == Status.running
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_pause_and_resume(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
paused = capsule.pause(wait=True)
|
|
assert paused.status == Status.paused
|
|
assert not capsule.is_running()
|
|
|
|
resumed = capsule.resume(wait=True)
|
|
assert resumed.status == Status.running
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_static_destroy(self):
|
|
capsule = Capsule(wait=True)
|
|
capsule_id = capsule.capsule_id
|
|
try:
|
|
Capsule.destroy(capsule_id, wait=True)
|
|
except Exception:
|
|
capsule.destroy()
|
|
raise
|
|
|
|
info = Capsule.get_info(capsule_id)
|
|
assert info.status in (Status.stopped, Status.missing)
|
|
|
|
def test_connect_to_existing(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
connected = Capsule.connect(capsule.capsule_id)
|
|
assert connected.capsule_id == capsule.capsule_id
|
|
assert connected.info is not None
|
|
assert connected.info.status == Status.running
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_connect_resumes_paused(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
capsule.pause()
|
|
connected = Capsule.connect(capsule.capsule_id)
|
|
assert connected.info is not None
|
|
assert connected.info.status == Status.running
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_list_capsules(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
capsules = Capsule.list()
|
|
assert isinstance(capsules, list)
|
|
ids = [c.id for c in capsules]
|
|
assert capsule.capsule_id in ids
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_wait_ready(self):
|
|
capsule = Capsule()
|
|
try:
|
|
capsule.wait_ready(timeout=60)
|
|
assert capsule.is_running()
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
def test_ping(self):
|
|
capsule = Capsule(wait=True)
|
|
try:
|
|
capsule.ping()
|
|
finally:
|
|
capsule.destroy()
|
|
|
|
|
|
class TestCommands:
|
|
"""Shared capsule for command execution tests."""
|
|
|
|
capsule: Capsule
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
_ensure_env()
|
|
cls.capsule = Capsule(wait=True)
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
try:
|
|
cls.capsule.destroy()
|
|
except Exception:
|
|
pass
|
|
|
|
def test_run_foreground(self):
|
|
result = self.capsule.commands.run("echo hello")
|
|
assert isinstance(result, CommandResult)
|
|
assert result.exit_code == 0
|
|
assert "hello" in result.stdout
|
|
|
|
def test_run_stderr(self):
|
|
result = self.capsule.commands.run("echo error >&2")
|
|
assert "error" in result.stderr
|
|
|
|
def test_run_exit_code(self):
|
|
result = self.capsule.commands.run("exit 42")
|
|
assert result.exit_code == 42
|
|
|
|
def test_run_with_envs(self):
|
|
result = self.capsule.commands.run("export MY_VAR=test_value && echo $MY_VAR")
|
|
assert "test_value" in result.stdout
|
|
|
|
def test_run_with_cwd(self):
|
|
result = self.capsule.commands.run("cd /tmp && pwd")
|
|
assert result.stdout.strip() == "/tmp"
|
|
|
|
def test_run_multiline_output(self):
|
|
result = self.capsule.commands.run("echo -e 'line1\\nline2\\nline3'")
|
|
assert result.exit_code == 0
|
|
lines = result.stdout.strip().splitlines()
|
|
assert len(lines) == 3
|
|
|
|
def test_run_background(self):
|
|
handle = self.capsule.commands.run("sleep 30", background=True, tag="bg-test")
|
|
assert isinstance(handle, CommandHandle)
|
|
assert handle.pid > 0
|
|
assert handle.tag == "bg-test"
|
|
assert handle.capsule_id == self.capsule.capsule_id
|
|
|
|
self.capsule.commands.kill(handle.pid)
|
|
|
|
def test_list_processes(self):
|
|
handle = self.capsule.commands.run("sleep 30", background=True, tag="list-test")
|
|
try:
|
|
time.sleep(0.5)
|
|
processes = self.capsule.commands.list()
|
|
assert isinstance(processes, list)
|
|
pids = [p.pid for p in processes]
|
|
assert handle.pid in pids
|
|
|
|
proc = next(p for p in processes if p.pid == handle.pid)
|
|
assert isinstance(proc, ProcessInfo)
|
|
finally:
|
|
self.capsule.commands.kill(handle.pid)
|
|
|
|
def test_kill_process(self):
|
|
handle = self.capsule.commands.run("sleep 30", background=True)
|
|
self.capsule.commands.kill(handle.pid)
|
|
# Registry prune runs asynchronously after the process end event,
|
|
# so poll rather than asserting on a zero-delay list().
|
|
deadline = time.monotonic() + 5
|
|
while time.monotonic() < deadline:
|
|
if handle.pid not in [p.pid for p in self.capsule.commands.list()]:
|
|
break
|
|
time.sleep(0.2)
|
|
assert handle.pid not in [p.pid for p in self.capsule.commands.list()]
|
|
|
|
def test_run_duration_ms(self):
|
|
result = self.capsule.commands.run("sleep 1")
|
|
assert result.duration_ms is None or result.duration_ms >= 900
|
|
|
|
|
|
class TestFiles:
|
|
"""Shared capsule for filesystem tests."""
|
|
|
|
capsule: Capsule
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
_ensure_env()
|
|
cls.capsule = Capsule(wait=True)
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
try:
|
|
cls.capsule.destroy()
|
|
except Exception:
|
|
pass
|
|
|
|
def test_write_and_read(self):
|
|
self.capsule.files.write("/tmp/test.txt", "hello world")
|
|
content = self.capsule.files.read("/tmp/test.txt")
|
|
assert content == "hello world"
|
|
|
|
def test_write_and_read_bytes(self):
|
|
data = b"\x00\x01\x02\xff"
|
|
self.capsule.files.write("/tmp/test.bin", data)
|
|
result = self.capsule.files.read_bytes("/tmp/test.bin")
|
|
assert result == data
|
|
|
|
def test_list_directory(self):
|
|
self.capsule.files.write("/tmp/listdir/a.txt", "a")
|
|
self.capsule.files.write("/tmp/listdir/b.txt", "b")
|
|
entries = self.capsule.files.list("/tmp/listdir")
|
|
assert isinstance(entries, list)
|
|
names = [e.name for e in entries]
|
|
assert "a.txt" in names
|
|
assert "b.txt" in names
|
|
|
|
def test_exists(self):
|
|
self.capsule.files.write("/tmp/exists_test.txt", "x")
|
|
assert self.capsule.files.exists("/tmp/exists_test.txt")
|
|
assert not self.capsule.files.exists("/tmp/does_not_exist_xyz.txt")
|
|
|
|
def test_make_dir(self):
|
|
entry = self.capsule.files.make_dir("/tmp/newdir")
|
|
assert isinstance(entry, FileEntry)
|
|
assert self.capsule.files.exists("/tmp/newdir")
|
|
|
|
def test_make_dir_idempotent(self):
|
|
self.capsule.files.make_dir("/tmp/idempotent_dir")
|
|
entry = self.capsule.files.make_dir("/tmp/idempotent_dir")
|
|
assert isinstance(entry, FileEntry)
|
|
|
|
def test_remove_file(self):
|
|
self.capsule.files.write("/tmp/to_remove.txt", "delete me")
|
|
assert self.capsule.files.exists("/tmp/to_remove.txt")
|
|
self.capsule.files.remove("/tmp/to_remove.txt")
|
|
assert not self.capsule.files.exists("/tmp/to_remove.txt")
|
|
|
|
def test_remove_directory(self):
|
|
self.capsule.files.make_dir("/tmp/dir_to_remove")
|
|
self.capsule.files.write("/tmp/dir_to_remove/child.txt", "data")
|
|
self.capsule.files.remove("/tmp/dir_to_remove")
|
|
assert not self.capsule.files.exists("/tmp/dir_to_remove")
|
|
|
|
def test_write_creates_parent_dirs(self):
|
|
self.capsule.files.write("/tmp/deep/nested/dir/file.txt", "nested")
|
|
content = self.capsule.files.read("/tmp/deep/nested/dir/file.txt")
|
|
assert content == "nested"
|
|
|
|
def test_list_with_depth(self):
|
|
self.capsule.files.write("/tmp/depth_test/a/b.txt", "deep")
|
|
entries_shallow = self.capsule.files.list("/tmp/depth_test", depth=1)
|
|
entries_deep = self.capsule.files.list("/tmp/depth_test", depth=2)
|
|
assert len(entries_deep) >= len(entries_shallow)
|
|
|
|
def test_overwrite_file(self):
|
|
self.capsule.files.write("/tmp/overwrite.txt", "original")
|
|
self.capsule.files.write("/tmp/overwrite.txt", "updated")
|
|
content = self.capsule.files.read("/tmp/overwrite.txt")
|
|
assert content == "updated"
|
|
|
|
def test_upload_and_download_stream(self):
|
|
chunks = [b"chunk1", b"chunk2", b"chunk3"]
|
|
self.capsule.files.upload_stream("/tmp/streamed.bin", iter(chunks))
|
|
downloaded = b"".join(self.capsule.files.download_stream("/tmp/streamed.bin"))
|
|
assert downloaded == b"chunk1chunk2chunk3"
|
|
|
|
|
|
class TestGit:
|
|
"""Shared capsule for git operation tests.
|
|
|
|
Initializes a repo at /home/wrenn-user (default cwd) since the exec API
|
|
does not support the cwd parameter.
|
|
"""
|
|
|
|
capsule: Capsule
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
_ensure_env()
|
|
cls.capsule = Capsule(wait=True)
|
|
cls.capsule.git.init(".", initial_branch="main")
|
|
cls.capsule.git.configure_user("Test User", "test@example.com")
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
try:
|
|
cls.capsule.destroy()
|
|
except Exception:
|
|
pass
|
|
|
|
def test_init_created_repo(self):
|
|
assert self.capsule.files.exists("/home/wrenn-user/.git")
|
|
|
|
def test_status_clean(self):
|
|
status = self.capsule.git.status()
|
|
assert status.branch == "main"
|
|
|
|
def test_add_and_commit(self):
|
|
self.capsule.files.write("/home/wrenn-user/hello.txt", "hello git")
|
|
self.capsule.git.add(all=True)
|
|
result = self.capsule.git.commit("initial commit")
|
|
assert result.exit_code == 0
|
|
|
|
def test_status_after_commit(self):
|
|
status = self.capsule.git.status()
|
|
assert status.is_clean
|
|
|
|
def test_status_with_changes(self):
|
|
self.capsule.files.write("/home/wrenn-user/dirty.txt", "uncommitted")
|
|
try:
|
|
status = self.capsule.git.status()
|
|
assert not status.is_clean
|
|
paths = [f.path for f in status.files]
|
|
assert "dirty.txt" in paths
|
|
finally:
|
|
self.capsule.files.remove("/home/wrenn-user/dirty.txt")
|
|
|
|
def test_branches(self):
|
|
branches = self.capsule.git.branches()
|
|
assert len(branches) >= 1
|
|
names = [b.name for b in branches]
|
|
assert "main" in names
|
|
current = [b for b in branches if b.is_current]
|
|
assert len(current) == 1
|
|
|
|
def test_create_and_checkout_branch(self):
|
|
self.capsule.git.create_branch("feature-1")
|
|
branches = self.capsule.git.branches()
|
|
names = [b.name for b in branches]
|
|
assert "feature-1" in names
|
|
|
|
current = [b for b in branches if b.is_current]
|
|
assert current[0].name == "feature-1"
|
|
|
|
self.capsule.git.checkout_branch("main")
|
|
|
|
def test_delete_branch(self):
|
|
self.capsule.git.create_branch("to-delete")
|
|
self.capsule.git.checkout_branch("main")
|
|
self.capsule.git.delete_branch("to-delete")
|
|
|
|
branches = self.capsule.git.branches()
|
|
names = [b.name for b in branches]
|
|
assert "to-delete" not in names
|
|
|
|
def test_set_and_get_config(self):
|
|
self.capsule.git.set_config("test.key", "test-value")
|
|
value = self.capsule.git.get_config("test.key")
|
|
assert value == "test-value"
|
|
|
|
def test_get_config_missing_returns_none(self):
|
|
value = self.capsule.git.get_config("nonexistent.key")
|
|
assert value is None
|