diff --git a/src/_shared/async-queue.ts b/src/_shared/async-queue.ts new file mode 100644 index 0000000..5da5de2 --- /dev/null +++ b/src/_shared/async-queue.ts @@ -0,0 +1,62 @@ +export class AsyncQueue implements AsyncIterableIterator { + private readonly values: T[] = []; + private readonly waiters: Array<{ + reject: (reason?: unknown) => void; + resolve: (value: IteratorResult) => void; + }> = []; + private closed = false; + private error: unknown; + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } + + next(): Promise> { + if (this.values.length) { + return Promise.resolve({ done: false, value: this.values.shift() as T }); + } + if (this.error) return Promise.reject(this.error); + if (this.closed) return Promise.resolve({ done: true, value: undefined }); + + return new Promise((resolve, reject) => { + this.waiters.push({ reject, resolve }); + }); + } + + return(): Promise> { + this.end(); + return Promise.resolve({ done: true, value: undefined }); + } + + throw(error?: unknown): Promise> { + this.fail(error); + return Promise.reject(error); + } + + push(value: T): void { + if (this.closed) return; + const waiter = this.waiters.shift(); + if (waiter) { + waiter.resolve({ done: false, value }); + return; + } + this.values.push(value); + } + + end(): void { + if (this.closed) return; + this.closed = true; + for (const waiter of this.waiters.splice(0)) { + waiter.resolve({ done: true, value: undefined }); + } + } + + fail(error: unknown): void { + if (this.closed) return; + this.closed = true; + this.error = error; + for (const waiter of this.waiters.splice(0)) { + waiter.reject(error); + } + } +} diff --git a/src/capsule.ts b/src/capsule.ts index 195ddd4..dce0485 100644 --- a/src/capsule.ts +++ b/src/capsule.ts @@ -4,8 +4,11 @@ import { type OperationQueryParams, WrennClient, } from "./client.js"; +import { CommandManager } from "./commands.js"; import type { ClientConfig } from "./config.js"; import { TimeoutError, WrennError } from "./exceptions.js"; +import { FileManager } from "./files.js"; +import { PtyManager } from "./pty.js"; export type CapsuleInfo = OperationJsonResponse<"getCapsule", 200>; export type CapsuleMetrics = OperationJsonResponse<"getCapsuleMetrics", 200>; @@ -88,6 +91,9 @@ function delay(ms: number, signal?: AbortSignal): Promise { export class Capsule { readonly id: string; readonly client: WrennClient; + readonly commands: CommandManager; + readonly files: FileManager; + readonly pty: PtyManager; /** * Wraps an existing capsule ID without fetching or creating remote resources. @@ -98,6 +104,9 @@ export class Capsule { constructor(id: string, opts?: ClientConfig) { this.id = id; this.client = new WrennClient(opts); + this.commands = new CommandManager(id, this.client); + this.files = new FileManager(id, this.client); + this.pty = new PtyManager(id, this.client); } static create(opts?: CapsuleCreateOptions): Promise; diff --git a/src/client.ts b/src/client.ts index 550dab6..6cdb35d 100644 --- a/src/client.ts +++ b/src/client.ts @@ -78,7 +78,7 @@ function createFileFormData(input: FileUploadInput): FormData { const formData = new FormData(); formData.append("path", input.path); if (typeof input.file === "string") { - formData.append("file", input.file); + formData.append("file", new Blob([input.file]), input.filename ?? "file"); } else { if (input.filename) { formData.append("file", input.file, input.filename); diff --git a/src/commands.ts b/src/commands.ts new file mode 100644 index 0000000..ff467f2 --- /dev/null +++ b/src/commands.ts @@ -0,0 +1,139 @@ +import { AsyncQueue } from "./_shared/async-queue.js"; +import type { WsConnection } from "./_shared/websocket.js"; +import type { + OperationJsonBody, + OperationJsonResponse, + WrennClient, +} from "./client.js"; + +export type CommandResult = OperationJsonResponse<"execCommand", 200>; +export type BackgroundProcess = OperationJsonResponse<"execCommand", 202>; +export type ProcessList = OperationJsonResponse<"listProcesses", 200>; + +export interface CommandOptions { + args?: string[]; + timeoutSec?: number; +} + +export interface BackgroundCommandOptions extends CommandOptions { + tag?: string; + envs?: Record; + cwd?: string; +} + +export interface CommandStreamOptions { + args?: string[]; + timeoutMs?: number; +} + +export interface CommandStreamEvent { + type: string; + [key: string]: unknown; +} + +const DEFAULT_COMMAND_TIMEOUT_SEC = 30; + +function commandBody( + cmd: string, + background: boolean, + opts?: BackgroundCommandOptions, +): OperationJsonBody<"execCommand"> { + const body: OperationJsonBody<"execCommand"> = { + background, + cmd, + timeout_sec: opts?.timeoutSec ?? DEFAULT_COMMAND_TIMEOUT_SEC, + }; + if (opts?.args) body.args = opts.args; + if (opts?.tag) body.tag = opts.tag; + if (opts?.envs) body.envs = opts.envs; + if (opts?.cwd) body.cwd = opts.cwd; + return body; +} + +function isTerminalEvent(event: CommandStreamEvent): boolean { + return event.type === "exit" || event.type === "error"; +} + +/** User-friendly command execution API bound to one capsule. */ +export class CommandManager { + constructor( + private readonly capsuleId: string, + private readonly client: WrennClient, + ) {} + + exec(cmd: string, opts?: CommandOptions): Promise { + return this.client.capsules.exec( + this.capsuleId, + commandBody(cmd, false, opts), + ) as Promise; + } + + start( + cmd: string, + opts?: BackgroundCommandOptions, + ): Promise { + return this.client.capsules.exec( + this.capsuleId, + commandBody(cmd, true, opts), + ) as Promise; + } + + list(): Promise { + return this.client.capsules.listProcesses(this.capsuleId); + } + + kill(selector: string, signal?: "SIGKILL" | "SIGTERM"): Promise { + const params = signal ? { signal } : undefined; + return this.client.capsules.killProcess(this.capsuleId, selector, params); + } + + async *stream( + cmd: string, + opts?: CommandStreamOptions, + ): AsyncGenerator { + const queue = new AsyncQueue(); + let connection: WsConnection | undefined; + const socketOpts: Parameters[1] = { + onClose: () => queue.end(), + onError: (error) => queue.fail(error), + onMessage: (message) => { + const event = message as CommandStreamEvent; + queue.push(event); + if (isTerminalEvent(event)) queue.end(); + }, + }; + if (opts?.timeoutMs !== undefined) socketOpts.timeoutMs = opts.timeoutMs; + + connection = await this.client.capsules.execStream( + this.capsuleId, + socketOpts, + ); + + const start: CommandStreamEvent = { cmd, type: "start" }; + if (opts?.args) start.args = opts.args; + connection.send(start); + + try { + yield* queue; + } finally { + if (!connection.isClosed) connection.close(); + } + } + + streamProcess( + selector: string, + opts?: Pick, + ): Promise { + const socketOpts: Parameters< + typeof this.client.capsules.connectProcess + >[2] = { + onMessage: () => undefined, + }; + if (opts?.timeoutMs !== undefined) socketOpts.timeoutMs = opts.timeoutMs; + return this.client.capsules.connectProcess( + this.capsuleId, + selector, + socketOpts, + ); + } +} diff --git a/src/files.ts b/src/files.ts new file mode 100644 index 0000000..7a0d75c --- /dev/null +++ b/src/files.ts @@ -0,0 +1,80 @@ +import type { OperationJsonResponse, WrennClient } from "./client.js"; + +export type FileList = OperationJsonResponse<"listDir", 200>; +export type MakeDirectoryResult = OperationJsonResponse<"makeDir", 200>; + +export interface ListFilesOptions { + depth?: number; +} + +export type FileContent = Blob | string | Uint8Array | ArrayBuffer; + +async function streamToBuffer( + stream: ReadableStream, +): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.from(chunk)); + } + return Buffer.concat(chunks); +} + +function uploadContent(content: FileContent): Blob | string { + if (typeof content === "string" || content instanceof Blob) return content; + if (content instanceof ArrayBuffer) return new Blob([content]); + const buffer = new ArrayBuffer(content.byteLength); + new Uint8Array(buffer).set(content); + return new Blob([buffer]); +} + +/** User-friendly file API bound to one capsule. */ +export class FileManager { + constructor( + private readonly capsuleId: string, + private readonly client: WrennClient, + ) {} + + async read(path: string): Promise { + return streamToBuffer( + await this.client.files.download(this.capsuleId, { path }), + ); + } + + write(path: string, content: FileContent): Promise { + return this.client.files.upload(this.capsuleId, { + file: uploadContent(content), + path, + }); + } + + list(path: string, opts?: ListFilesOptions): Promise { + return this.client.files.list(this.capsuleId, { + depth: opts?.depth ?? 1, + path, + }); + } + + mkdir(path: string): Promise { + return this.client.files.mkdir(this.capsuleId, { path }); + } + + remove(path: string): Promise { + return this.client.files.remove(this.capsuleId, { path }); + } + + async *downloadStream(path: string): AsyncGenerator { + const stream = await this.client.files.streamDownload(this.capsuleId, { + path, + }); + for await (const chunk of stream) { + yield Buffer.from(chunk); + } + } + + uploadStream(path: string, content: FileContent): Promise { + return this.client.files.streamUpload(this.capsuleId, { + file: uploadContent(content), + path, + }); + } +} diff --git a/src/index.ts b/src/index.ts index 021015f..0395fcb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,6 +31,16 @@ export { UsersResource, WrennClient, } from "./client.js"; +export type { + BackgroundCommandOptions, + BackgroundProcess, + CommandOptions, + CommandResult, + CommandStreamEvent, + CommandStreamOptions, + ProcessList, +} from "./commands.js"; +export { CommandManager } from "./commands.js"; export type { ClientConfig, ResolvedClientConfig } from "./config.js"; export { DEFAULT_BASE_URL, @@ -53,3 +63,12 @@ export { throwErrorFromResponse, WrennError, } from "./exceptions.js"; +export type { + FileContent, + FileList, + ListFilesOptions, + MakeDirectoryResult, +} from "./files.js"; +export { FileManager } from "./files.js"; +export type { PtyEvent, PtyStartOptions } from "./pty.js"; +export { PtyManager, PtySession } from "./pty.js"; diff --git a/src/pty.ts b/src/pty.ts new file mode 100644 index 0000000..b7b094f --- /dev/null +++ b/src/pty.ts @@ -0,0 +1,98 @@ +import { AsyncQueue } from "./_shared/async-queue.js"; +import type { WsConnection } from "./_shared/websocket.js"; +import type { WrennClient } from "./client.js"; + +export interface PtyStartOptions { + cmd?: string; + args?: string[]; + cols?: number; + rows?: number; + envs?: Record; + cwd?: string; + user?: string; + timeoutMs?: number; +} + +export interface PtyEvent { + type: string; + [key: string]: unknown; +} + +export class PtySession { + readonly events: AsyncIterableIterator; + + constructor( + private readonly connection: WsConnection, + queue: AsyncQueue, + ) { + this.events = queue; + } + + input(data: string | Uint8Array): void { + this.connection.send({ + data: Buffer.from(data).toString("base64"), + type: "input", + }); + } + + resize(cols: number, rows: number): void { + this.connection.send({ cols, rows, type: "resize" }); + } + + kill(): void { + this.connection.send({ type: "kill" }); + } + + close(): void { + this.connection.close(); + } +} + +/** Interactive terminal API bound to one capsule. */ +export class PtyManager { + constructor( + private readonly capsuleId: string, + private readonly client: WrennClient, + ) {} + + async start(opts?: PtyStartOptions): Promise { + const session = await this.open(opts?.timeoutMs); + const message: PtyEvent = { type: "start" }; + if (opts?.cmd) message.cmd = opts.cmd; + if (opts?.args) message.args = opts.args; + if (opts?.cols) message.cols = opts.cols; + if (opts?.rows) message.rows = opts.rows; + if (opts?.envs) message.envs = opts.envs; + if (opts?.cwd) message.cwd = opts.cwd; + if (opts?.user) message.user = opts.user; + session.connection.send(message); + return session.value; + } + + async connect( + tag: string, + opts?: Pick, + ): Promise { + const session = await this.open(opts?.timeoutMs); + session.connection.send({ tag, type: "connect" }); + return session.value; + } + + private async open(timeoutMs?: number): Promise<{ + connection: WsConnection; + value: PtySession; + }> { + const queue = new AsyncQueue(); + const socketOpts: Parameters[1] = { + onClose: () => queue.end(), + onError: (error) => queue.fail(error), + onMessage: (message) => queue.push(message as PtyEvent), + }; + if (timeoutMs !== undefined) socketOpts.timeoutMs = timeoutMs; + const connection = await this.client.capsules.ptySession( + this.capsuleId, + socketOpts, + ); + return { connection, value: new PtySession(connection, queue) }; + } +} diff --git a/tests/client.test.ts b/tests/client.test.ts index 169dbf9..8fc8ea4 100644 --- a/tests/client.test.ts +++ b/tests/client.test.ts @@ -323,6 +323,9 @@ describe("WrennClient", () => { url: "https://api.example.com/v1/capsules/cap_1/files/write", }); expect(calls.at(-1)?.init.body).toBeInstanceOf(FormData); + expect((calls.at(-1)?.init.body as FormData).get("file")).toBeInstanceOf( + Blob, + ); await client.files.download("cap_1", {} as never); expectLastCall(calls, { body: {}, diff --git a/tests/commands.test.ts b/tests/commands.test.ts new file mode 100644 index 0000000..8f8efe4 --- /dev/null +++ b/tests/commands.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it, vi } from "vitest"; + +import { Capsule } from "../src/capsule.js"; + +describe("CommandManager", () => { + it("executes foreground commands through the capsule client", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const exec = vi.spyOn(capsule.client.capsules, "exec").mockResolvedValue({ + exit_code: 0, + stdout: "ok\n", + }); + + await expect( + capsule.commands.exec("node", { + args: ["--version"], + timeoutSec: 5, + }), + ).resolves.toMatchObject({ stdout: "ok\n" }); + + expect(exec).toHaveBeenCalledWith("cap_1", { + args: ["--version"], + background: false, + cmd: "node", + timeout_sec: 5, + }); + }); + + it("starts, lists, and kills background processes", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const exec = vi.spyOn(capsule.client.capsules, "exec").mockResolvedValue({ + pid: 123, + tag: "worker", + }); + const listProcesses = vi + .spyOn(capsule.client.capsules, "listProcesses") + .mockResolvedValue({ processes: [{ pid: 123, tag: "worker" }] }); + const killProcess = vi + .spyOn(capsule.client.capsules, "killProcess") + .mockResolvedValue(undefined); + + await expect( + capsule.commands.start("sleep", { + args: ["60"], + cwd: "/tmp", + envs: { A: "1" }, + tag: "worker", + }), + ).resolves.toMatchObject({ tag: "worker" }); + await expect(capsule.commands.list()).resolves.toMatchObject({ + processes: [{ tag: "worker" }], + }); + await expect( + capsule.commands.kill("worker", "SIGTERM"), + ).resolves.toBeUndefined(); + + expect(exec).toHaveBeenCalledWith("cap_1", { + args: ["60"], + background: true, + cmd: "sleep", + cwd: "/tmp", + envs: { A: "1" }, + tag: "worker", + timeout_sec: 30, + }); + expect(listProcesses).toHaveBeenCalledWith("cap_1"); + expect(killProcess).toHaveBeenCalledWith("cap_1", "worker", { + signal: "SIGTERM", + }); + }); + + it("streams command events over the exec WebSocket", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const sent: unknown[] = []; + let onMessage: ((message: unknown) => void) | undefined; + vi.spyOn(capsule.client.capsules, "execStream").mockImplementation( + async (_id, opts) => { + onMessage = opts.onMessage; + return { + close: vi.fn(), + get isClosed() { + return false; + }, + send: (message: unknown) => sent.push(message), + } as never; + }, + ); + + const events = capsule.commands.stream("printf", { args: ["hello"] }); + const first = events.next(); + await vi.waitFor(() => expect(sent).toHaveLength(1)); + expect(sent).toEqual([{ args: ["hello"], cmd: "printf", type: "start" }]); + + onMessage?.({ data: "hello", type: "stdout" }); + await expect(first).resolves.toEqual({ + done: false, + value: { data: "hello", type: "stdout" }, + }); + + const done = events.next(); + onMessage?.({ exit_code: 0, type: "exit" }); + await expect(done).resolves.toEqual({ + done: false, + value: { exit_code: 0, type: "exit" }, + }); + await expect(events.next()).resolves.toEqual({ + done: true, + value: undefined, + }); + }); +}); diff --git a/tests/files.test.ts b/tests/files.test.ts new file mode 100644 index 0000000..e5e2a71 --- /dev/null +++ b/tests/files.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it, vi } from "vitest"; + +import { Capsule } from "../src/capsule.js"; + +function streamFrom(text: string): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue(Buffer.from(text)); + controller.close(); + }, + }); +} + +describe("FileManager", () => { + it("reads and writes files through the capsule file client", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const download = vi + .spyOn(capsule.client.files, "download") + .mockResolvedValue(streamFrom("hello")); + const upload = vi + .spyOn(capsule.client.files, "upload") + .mockResolvedValue(undefined); + + await expect(capsule.files.read("/tmp/a.txt")).resolves.toEqual( + Buffer.from("hello"), + ); + await expect( + capsule.files.write("/tmp/a.txt", "hello"), + ).resolves.toBeUndefined(); + + expect(download).toHaveBeenCalledWith("cap_1", { path: "/tmp/a.txt" }); + expect(upload).toHaveBeenCalledWith("cap_1", { + file: "hello", + path: "/tmp/a.txt", + }); + }); + + it("maps directory operations", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const list = vi.spyOn(capsule.client.files, "list").mockResolvedValue({ + entries: [{ name: "a.txt", path: "/tmp/a.txt", type: "file" }], + }); + const mkdir = vi.spyOn(capsule.client.files, "mkdir").mockResolvedValue({ + entry: { path: "/tmp/new", type: "directory" }, + }); + const remove = vi + .spyOn(capsule.client.files, "remove") + .mockResolvedValue(undefined); + + await expect( + capsule.files.list("/tmp", { depth: 2 }), + ).resolves.toMatchObject({ + entries: [{ name: "a.txt" }], + }); + await expect(capsule.files.mkdir("/tmp/new")).resolves.toMatchObject({ + entry: { type: "directory" }, + }); + await expect(capsule.files.remove("/tmp/a.txt")).resolves.toBeUndefined(); + + expect(list).toHaveBeenCalledWith("cap_1", { depth: 2, path: "/tmp" }); + expect(mkdir).toHaveBeenCalledWith("cap_1", { path: "/tmp/new" }); + expect(remove).toHaveBeenCalledWith("cap_1", { path: "/tmp/a.txt" }); + }); + + it("streams downloads as Buffer chunks and uploads streaming content", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + vi.spyOn(capsule.client.files, "streamDownload").mockResolvedValue( + streamFrom("hello"), + ); + const streamUpload = vi + .spyOn(capsule.client.files, "streamUpload") + .mockResolvedValue(undefined); + + const chunks: Buffer[] = []; + for await (const chunk of capsule.files.downloadStream("/tmp/a.txt")) { + chunks.push(chunk); + } + await capsule.files.uploadStream("/tmp/a.txt", Buffer.from("hello")); + + expect(Buffer.concat(chunks).toString()).toBe("hello"); + expect(streamUpload).toHaveBeenCalledWith("cap_1", { + file: expect.any(Blob), + path: "/tmp/a.txt", + }); + }); +}); diff --git a/tests/integration/capsule-features.integration.test.ts b/tests/integration/capsule-features.integration.test.ts new file mode 100644 index 0000000..12316cc --- /dev/null +++ b/tests/integration/capsule-features.integration.test.ts @@ -0,0 +1,180 @@ +import { describe, expect, it } from "vitest"; + +import { Capsule, type CapsuleCreateOptions } from "../../src/capsule.js"; +import type { CommandStreamEvent } from "../../src/commands.js"; +import { DEFAULT_BASE_URL } from "../../src/config.js"; +import type { PtyEvent } from "../../src/pty.js"; + +const baseUrl = process.env.WRENN_BASE_URL ?? DEFAULT_BASE_URL; +const apiKey = process.env.WRENN_API_KEY; +const template = process.env.WRENN_TEST_TEMPLATE ?? "minimal"; +const waitTimeoutMs = Number(process.env.WRENN_TEST_WAIT_TIMEOUT_MS ?? 120_000); +const testTimeoutMs = waitTimeoutMs + 45_000; +const describeWithApiKey = apiKey ? describe : describe.skip; + +const clientOpts = { apiKey, baseUrl } satisfies CapsuleCreateOptions; + +async function withLiveCapsule( + fn: (capsule: Capsule) => Promise, +): Promise { + let capsule: Capsule | undefined; + try { + capsule = await Capsule.create(template, { + ...clientOpts, + timeout_sec: 120, + }); + await capsule.waitForReady({ + intervalMs: 2_000, + timeoutMs: waitTimeoutMs, + }); + await fn(capsule); + } finally { + if (capsule) { + await capsule.destroy().catch(() => undefined); + } + } +} + +async function collectCommandEvents( + events: AsyncGenerator, +): Promise { + const collected: CommandStreamEvent[] = []; + for await (const event of events) { + collected.push(event); + if (event.type === "exit" || event.type === "error") break; + } + return collected; +} + +async function nextPtyEvent( + events: AsyncIterableIterator, +): Promise { + const timeout = new Promise((_, reject) => { + setTimeout( + () => reject(new Error("Timed out waiting for PTY event")), + 15_000, + ); + }); + const result = await Promise.race([events.next(), timeout]); + if (result.done) throw new Error("PTY closed before next event"); + return result.value; +} + +describeWithApiKey("Phase 4 live integration", () => { + it( + "executes foreground, streaming, and background commands", + async () => { + await withLiveCapsule(async (capsule) => { + const result = await capsule.commands.exec("printf", { + args: ["phase4-command"], + timeoutSec: 10, + }); + expect(result.exit_code).toBe(0); + expect(result.stdout).toContain("phase4-command"); + + const streamEvents = await collectCommandEvents( + capsule.commands.stream("printf", { args: ["phase4-stream"] }), + ); + expect(streamEvents.some((event) => event.type === "stdout")).toBe( + true, + ); + expect(streamEvents.at(-1)).toMatchObject({ type: "exit" }); + + const tag = `phase4-${Date.now()}`; + const process = await capsule.commands.start("sleep", { + args: ["60"], + tag, + }); + expect(process.tag).toBe(tag); + + const processes = await capsule.commands.list(); + expect(processes.processes?.some((entry) => entry.tag === tag)).toBe( + true, + ); + + await expect( + capsule.commands.kill(tag, "SIGTERM"), + ).resolves.toBeUndefined(); + }); + }, + testTimeoutMs, + ); + + it( + "performs file read/write/list/remove and streaming transfers", + async () => { + await withLiveCapsule(async (capsule) => { + const dir = `/tmp/wrenn-phase4-${Date.now()}`; + const file = `${dir}/hello.txt`; + const streamFile = `${dir}/stream.txt`; + + await capsule.files.mkdir(dir); + await capsule.files.write(file, "phase4-file"); + + const content = await capsule.files.read(file); + expect(content.toString()).toBe("phase4-file"); + + const listing = await capsule.files.list(dir, { depth: 1 }); + expect( + listing.entries?.some((entry) => entry.name === "hello.txt"), + ).toBe(true); + + await capsule.files.uploadStream( + streamFile, + Buffer.from("phase4-stream-file"), + ); + const chunks: Buffer[] = []; + for await (const chunk of capsule.files.downloadStream(streamFile)) { + chunks.push(chunk); + } + expect(Buffer.concat(chunks).toString()).toBe("phase4-stream-file"); + + await expect(capsule.files.remove(file)).resolves.toBeUndefined(); + await expect(capsule.files.remove(streamFile)).resolves.toBeUndefined(); + await expect(capsule.files.remove(dir)).resolves.toBeUndefined(); + }); + }, + testTimeoutMs, + ); + + it( + "starts and controls an interactive PTY session", + async () => { + await withLiveCapsule(async (capsule) => { + const session = await capsule.pty.start({ + cmd: "/bin/sh", + cols: 80, + rows: 24, + }); + + const started = await nextPtyEvent(session.events); + expect(started.type).toBe("started"); + + session.resize(100, 30); + session.input("printf phase4-pty\\n\nexit\n"); + + const events: PtyEvent[] = []; + for (let i = 0; i < 10; i += 1) { + const event = await nextPtyEvent(session.events); + events.push(event); + if (event.type === "exit" || event.type === "error") break; + } + + const output = events + .filter( + (event) => + event.type === "output" && typeof event.data === "string", + ) + .map((event) => + Buffer.from(event.data as string, "base64").toString(), + ) + .join(""); + + expect(output).toContain("phase4-pty"); + expect(events.at(-1)).toMatchObject({ type: "exit" }); + session.close(); + }); + }, + testTimeoutMs, + ); +}); diff --git a/tests/pty.test.ts b/tests/pty.test.ts new file mode 100644 index 0000000..cd029a0 --- /dev/null +++ b/tests/pty.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it, vi } from "vitest"; + +import { Capsule } from "../src/capsule.js"; + +describe("PtyManager", () => { + it("starts a PTY, sends controls, and yields events", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const sent: unknown[] = []; + let onMessage: ((message: unknown) => void) | undefined; + vi.spyOn(capsule.client.capsules, "ptySession").mockImplementation( + async (_id, opts) => { + onMessage = opts.onMessage; + return { + close: vi.fn(), + get isClosed() { + return false; + }, + send: (message: unknown) => sent.push(message), + } as never; + }, + ); + + const session = await capsule.pty.start({ + cmd: "/bin/sh", + cols: 100, + rows: 30, + }); + expect(sent).toEqual([ + { cmd: "/bin/sh", cols: 100, rows: 30, type: "start" }, + ]); + + session.input("ls\n"); + session.resize(120, 40); + session.kill(); + expect(sent.slice(1)).toEqual([ + { data: Buffer.from("ls\n").toString("base64"), type: "input" }, + { cols: 120, rows: 40, type: "resize" }, + { type: "kill" }, + ]); + + const event = session.events.next(); + onMessage?.({ data: Buffer.from("ok").toString("base64"), type: "output" }); + await expect(event).resolves.toEqual({ + done: false, + value: { + data: Buffer.from("ok").toString("base64"), + type: "output", + }, + }); + }); + + it("connects to an existing PTY tag", async () => { + const capsule = new Capsule("cap_1", { + baseUrl: "https://api.example.com", + }); + const sent: unknown[] = []; + vi.spyOn(capsule.client.capsules, "ptySession").mockResolvedValue({ + close: vi.fn(), + get isClosed() { + return false; + }, + send: (message: unknown) => sent.push(message), + } as never); + + await capsule.pty.connect("pty-tag"); + + expect(sent).toEqual([{ tag: "pty-tag", type: "connect" }]); + }); +});