test: expand command/PTY/git coverage, fix WebSocket close handling
Some checks failed
ci/woodpecker/pr/check Pipeline failed

Tests:
- tests/test_commands.py: unit coverage for Commands/AsyncCommands —
  payload construction (cwd, envs, tag, timeout), background dispatch,
  base64 response decoding, stream-event parsing, stream/connect iterators.
- tests/test_integration_advanced.py: live tests for cwd/env handling,
  long-running commands (apt-get), PTY sessions, streaming exec,
  process connect, and git workflows including cloning wrennhq/wrenn.
- test_filesystem_pty.py: PTY ping/pong reply tests.
- test_integration.py: poll for async process-registry prune in
  test_kill_process instead of asserting on a zero-delay list().

Fixes:
- commands.py / pty.py: stream(), connect() and the PTY iterators only
  caught WebSocketDisconnect. The server closes exec/process streams
  abruptly, raising WebSocketNetworkError — a sibling under
  HTTPXWSException — which crashed connect() entirely. Both are now
  caught via _WS_CLOSED so abrupt closes end iteration cleanly.
- pty.py: reply to the server keepalive ping with a pong so idle PTY
  sessions stay open.
This commit is contained in:
2026-05-19 17:12:52 +06:00
parent 87cc16e9e2
commit fce514c49c
6 changed files with 1085 additions and 18 deletions

View File

@ -15,17 +15,6 @@ pytestmark = pytest.mark.integration
_env_loaded = False
def _wait_for_pid_dead(capsule: Capsule, pid: int, timeout: float = 5.0) -> bool:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
result = capsule.commands.run(f"ps -p {pid} -o stat= 2>/dev/null || true")
state = result.stdout.strip()
if not state or state.startswith("Z"):
return True
time.sleep(0.2)
return False
def _ensure_env() -> None:
global _env_loaded
if _env_loaded:
@ -229,7 +218,14 @@ class TestCommands:
def test_kill_process(self):
handle = self.capsule.commands.run("sleep 30", background=True)
self.capsule.commands.kill(handle.pid)
assert _wait_for_pid_dead(self.capsule, handle.pid)
# Registry prune runs asynchronously after the process end event,
# so poll rather than asserting on a zero-delay list().
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if handle.pid not in [p.pid for p in self.capsule.commands.list()]:
break
time.sleep(0.2)
assert handle.pid not in [p.pid for p in self.capsule.commands.list()]
def test_run_duration_ms(self):
result = self.capsule.commands.run("sleep 1")