feat(builds): real-time WebSocket+PTY template build console
The admin template build experience was poll-based: the frontend fetched
GET /v1/admin/builds/{id} every 3s and per-step logs only appeared after a
step finished. This replaces it with a live console.
Backend:
- recipe.Execute gains streaming callbacks (StepStartFunc, OutputChunkFunc)
and a StreamExecFunc. RUN steps now stream via a PTY so build tools emit
unbuffered, colorized output. execRunStreaming treats a stream that ends
without a terminal chunk as a failure.
- The build worker runs each RUN step through the host agent PtyAttach RPC
and publishes step-start/output/step-end/build-status events to a per-build
Redis pub/sub channel (wrenn:build:{id}).
- BuildBroker fans those events from Redis out to in-process WebSocket
subscribers, with lazy per-build subscriptions.
- New WS endpoint GET /v1/admin/builds/{id}/stream replays the completed-step
history from the DB log, then live-tails broker events.
run_as_root:
- The non-root build user is now injected as USER/WORKDIR steps prepended to
the persisted recipe by the create handler, instead of being hardcoded in
the pre-build phase. "Run as root" simply omits them, so wrenn-user is never
created in a root template. No build-level column is needed.
Frontend:
- New /admin/templates/builds/[id] route with a hybrid console: an xterm.js
terminal streaming live PTY output plus a structured step list (status,
exit code, timing).
- Build rows on the templates page navigate to the console; the inline
log-expand is removed. A "Run recipe as root" checkbox is added to the
create modal.
This commit is contained in:
@ -41,9 +41,34 @@ export type CreateBuildParams = {
|
||||
vcpus?: number;
|
||||
memory_mb?: number;
|
||||
skip_pre_post?: boolean;
|
||||
run_as_root?: boolean;
|
||||
archive?: File;
|
||||
};
|
||||
|
||||
// BuildStreamEvent is one message from the live build console WebSocket
|
||||
// (GET /v1/admin/builds/{id}/stream). It mirrors the backend event shape.
|
||||
export type BuildStreamEvent = {
|
||||
type: 'step-start' | 'output' | 'step-end' | 'build-status' | 'ping';
|
||||
step?: number;
|
||||
phase?: string;
|
||||
cmd?: string;
|
||||
data?: string; // base64-encoded PTY output bytes
|
||||
exit?: number;
|
||||
ok?: boolean;
|
||||
elapsed_ms?: number;
|
||||
status?: string;
|
||||
current_step?: number;
|
||||
total_steps?: number;
|
||||
error?: string;
|
||||
t?: number;
|
||||
};
|
||||
|
||||
// buildStreamUrl returns the WebSocket URL for a build's live console.
|
||||
export function buildStreamUrl(id: string): string {
|
||||
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
return `${proto}//${window.location.host}/api/v1/admin/builds/${id}/stream`;
|
||||
}
|
||||
|
||||
export async function createBuild(params: CreateBuildParams): Promise<ApiResult<Build>> {
|
||||
if (params.archive) {
|
||||
// Use multipart when an archive file is provided.
|
||||
|
||||
192
frontend/src/lib/build-console-ws.svelte.ts
Normal file
192
frontend/src/lib/build-console-ws.svelte.ts
Normal file
@ -0,0 +1,192 @@
|
||||
// build-console-ws.svelte.ts — WebSocket client for the live admin build
|
||||
// console. Connects to GET /v1/admin/builds/{id}/stream, which replays the
|
||||
// completed-step history then live-tails events. The client maps events to a
|
||||
// reactive step list and forwards raw PTY output to a terminal writer.
|
||||
|
||||
import { buildStreamUrl, type BuildStreamEvent } from '$lib/api/builds';
|
||||
|
||||
export type StepStatus = 'running' | 'success' | 'failed';
|
||||
|
||||
export type BuildStep = {
|
||||
step: number;
|
||||
phase: string;
|
||||
cmd: string;
|
||||
status: StepStatus;
|
||||
exit: number | null;
|
||||
elapsedMs: number | null;
|
||||
};
|
||||
|
||||
export type ConsoleConnState = 'connecting' | 'connected' | 'closed' | 'error';
|
||||
|
||||
const RECONNECT_DELAY = 1500;
|
||||
|
||||
// ANSI truecolor escapes matching the Wrenn palette.
|
||||
const dim = (s: string) => `\x1b[38;2;107;104;98m${s}\x1b[0m`; // text-tertiary
|
||||
const sage = (s: string) => `\x1b[38;2;137;167;133m${s}\x1b[0m`; // accent-mid
|
||||
const red = (s: string) => `\x1b[38;2;207;129;114m${s}\x1b[0m`; // red
|
||||
|
||||
// Binary-safe base64 decode for raw PTY bytes.
|
||||
function decodeBase64(b64: string): string {
|
||||
const bytes = Uint8Array.from(atob(b64), (c) => c.charCodeAt(0));
|
||||
return new TextDecoder().decode(bytes);
|
||||
}
|
||||
|
||||
function isTerminal(status: string): boolean {
|
||||
return status === 'success' || status === 'failed' || status === 'cancelled';
|
||||
}
|
||||
|
||||
/**
|
||||
* createBuildConsole wires a build's event WebSocket to reactive state.
|
||||
* Call connect() with a terminal write function once the terminal exists,
|
||||
* and disconnect() on teardown.
|
||||
*/
|
||||
export function createBuildConsole(buildId: string) {
|
||||
let connState = $state<ConsoleConnState>('connecting');
|
||||
let steps = $state<BuildStep[]>([]);
|
||||
let buildStatus = $state('');
|
||||
let currentStep = $state(0);
|
||||
let totalSteps = $state(0);
|
||||
let errorMessage = $state<string | null>(null);
|
||||
|
||||
let ws: WebSocket | null = null;
|
||||
let writeTerm: ((text: string) => void) | null = null;
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let disposed = false;
|
||||
|
||||
function upsertStep(step: number, patch: Partial<BuildStep>) {
|
||||
const idx = steps.findIndex((s) => s.step === step);
|
||||
if (idx === -1) {
|
||||
steps = [
|
||||
...steps,
|
||||
{
|
||||
step,
|
||||
phase: patch.phase ?? '',
|
||||
cmd: patch.cmd ?? '',
|
||||
status: patch.status ?? 'running',
|
||||
exit: patch.exit ?? null,
|
||||
elapsedMs: patch.elapsedMs ?? null
|
||||
}
|
||||
].sort((a, b) => a.step - b.step);
|
||||
} else {
|
||||
// Immutable replace so the reactive array re-renders the step list.
|
||||
steps = steps.map((s, i) => (i === idx ? { ...s, ...patch } : s));
|
||||
}
|
||||
}
|
||||
|
||||
function summaryLine(status: string): string {
|
||||
if (status === 'success') return `\r\n${sage('● build succeeded')}\r\n`;
|
||||
if (status === 'failed') return `\r\n${red('● build failed')}\r\n`;
|
||||
return `\r\n${dim('● build ' + status)}\r\n`;
|
||||
}
|
||||
|
||||
function handle(ev: BuildStreamEvent) {
|
||||
switch (ev.type) {
|
||||
case 'step-start':
|
||||
upsertStep(ev.step ?? 0, {
|
||||
phase: ev.phase,
|
||||
cmd: ev.cmd,
|
||||
status: 'running',
|
||||
exit: null,
|
||||
elapsedMs: null
|
||||
});
|
||||
writeTerm?.(`\r\n${sage('▸')} ${dim('step ' + ev.step)} ${ev.cmd ?? ''}\r\n`);
|
||||
break;
|
||||
case 'output':
|
||||
if (ev.data) writeTerm?.(decodeBase64(ev.data));
|
||||
break;
|
||||
case 'step-end': {
|
||||
const ok = ev.ok ?? false;
|
||||
upsertStep(ev.step ?? 0, {
|
||||
phase: ev.phase,
|
||||
cmd: ev.cmd,
|
||||
status: ok ? 'success' : 'failed',
|
||||
exit: ev.exit ?? 0,
|
||||
elapsedMs: ev.elapsed_ms ?? 0
|
||||
});
|
||||
if (typeof ev.step === 'number' && ev.step > currentStep) currentStep = ev.step;
|
||||
break;
|
||||
}
|
||||
case 'build-status':
|
||||
if (ev.status) buildStatus = ev.status;
|
||||
if (typeof ev.total_steps === 'number' && ev.total_steps > 0) totalSteps = ev.total_steps;
|
||||
if (typeof ev.current_step === 'number' && ev.current_step > currentStep) {
|
||||
currentStep = ev.current_step;
|
||||
}
|
||||
if (ev.error) errorMessage = ev.error;
|
||||
if (ev.status && isTerminal(ev.status)) writeTerm?.(summaryLine(ev.status));
|
||||
break;
|
||||
case 'ping':
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
function open() {
|
||||
connState = 'connecting';
|
||||
ws = new WebSocket(buildStreamUrl(buildId));
|
||||
|
||||
ws.onopen = () => {
|
||||
connState = 'connected';
|
||||
};
|
||||
|
||||
ws.onmessage = (e) => {
|
||||
try {
|
||||
handle(JSON.parse(e.data) as BuildStreamEvent);
|
||||
} catch {
|
||||
// ignore malformed frames
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
if (disposed) return;
|
||||
// A finished build closes cleanly; nothing more to stream.
|
||||
if (isTerminal(buildStatus)) {
|
||||
connState = 'closed';
|
||||
return;
|
||||
}
|
||||
// Unexpected drop mid-build: reconnect and resume from history.
|
||||
connState = 'connecting';
|
||||
writeTerm?.(`\r\n${dim('[reconnecting...]')}\r\n`);
|
||||
reconnectTimer = setTimeout(open, RECONNECT_DELAY);
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
if (!disposed) connState = 'error';
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
get connState() {
|
||||
return connState;
|
||||
},
|
||||
get steps() {
|
||||
return steps;
|
||||
},
|
||||
get buildStatus() {
|
||||
return buildStatus;
|
||||
},
|
||||
get currentStep() {
|
||||
return currentStep;
|
||||
},
|
||||
get totalSteps() {
|
||||
return totalSteps;
|
||||
},
|
||||
get errorMessage() {
|
||||
return errorMessage;
|
||||
},
|
||||
|
||||
/** connect opens the WebSocket; write receives terminal output. */
|
||||
connect(write: (text: string) => void) {
|
||||
if (disposed) return;
|
||||
writeTerm = write;
|
||||
open();
|
||||
},
|
||||
|
||||
/** disconnect tears down the WebSocket and cancels any reconnect. */
|
||||
disconnect() {
|
||||
disposed = true;
|
||||
if (reconnectTimer) clearTimeout(reconnectTimer);
|
||||
ws?.close();
|
||||
ws = null;
|
||||
}
|
||||
};
|
||||
}
|
||||
198
frontend/src/lib/components/BuildConsole.svelte
Normal file
198
frontend/src/lib/components/BuildConsole.svelte
Normal file
@ -0,0 +1,198 @@
|
||||
<script lang="ts">
|
||||
import { onMount, onDestroy, tick } from 'svelte';
|
||||
import type { Build } from '$lib/api/builds';
|
||||
import { createBuildConsole } from '$lib/build-console-ws.svelte';
|
||||
import BuildStepList from './BuildStepList.svelte';
|
||||
|
||||
type Props = {
|
||||
buildId: string;
|
||||
build: Build;
|
||||
onStatusChange?: (status: string) => void;
|
||||
};
|
||||
let { buildId, build, onStatusChange }: Props = $props();
|
||||
|
||||
const bc = createBuildConsole(buildId);
|
||||
|
||||
let containerRef = $state<HTMLDivElement>();
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let term: any = null;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let fitAddon: any = null;
|
||||
let resizeObserver: ResizeObserver | null = null;
|
||||
let fitDebounce: ReturnType<typeof setTimeout> | null = null;
|
||||
let alive = true;
|
||||
|
||||
const stepTotal = $derived(bc.totalSteps || build.total_steps);
|
||||
|
||||
const TERM_THEME = {
|
||||
background: '#0a0c0b',
|
||||
foreground: '#d0cdc6',
|
||||
cursor: '#0a0c0b',
|
||||
cursorAccent: '#0a0c0b',
|
||||
selectionBackground: 'rgba(94, 140, 88, 0.25)',
|
||||
selectionForeground: '#eae7e2',
|
||||
black: '#1a1e1c',
|
||||
red: '#cf8172',
|
||||
green: '#5e8c58',
|
||||
yellow: '#d4a73c',
|
||||
blue: '#5a9fd4',
|
||||
magenta: '#b07ab8',
|
||||
cyan: '#5aafb0',
|
||||
white: '#d0cdc6',
|
||||
brightBlack: '#454340',
|
||||
brightRed: '#e09585',
|
||||
brightGreen: '#89a785',
|
||||
brightYellow: '#e0c070',
|
||||
brightBlue: '#7ab8e0',
|
||||
brightMagenta: '#c898cf',
|
||||
brightCyan: '#7ac5c6',
|
||||
brightWhite: '#eae7e2'
|
||||
};
|
||||
|
||||
// Propagate live build status to the parent (drives the cancel button).
|
||||
$effect(() => {
|
||||
if (bc.buildStatus) onStatusChange?.(bc.buildStatus);
|
||||
});
|
||||
|
||||
function connLabel(state: string): string {
|
||||
switch (state) {
|
||||
case 'connected':
|
||||
return 'Live';
|
||||
case 'connecting':
|
||||
return 'Connecting';
|
||||
case 'closed':
|
||||
return 'Ended';
|
||||
default:
|
||||
return 'Disconnected';
|
||||
}
|
||||
}
|
||||
|
||||
onMount(async () => {
|
||||
const [{ Terminal }, { FitAddon }] = await Promise.all([
|
||||
import('@xterm/xterm'),
|
||||
import('@xterm/addon-fit')
|
||||
]);
|
||||
await import('@xterm/xterm/css/xterm.css');
|
||||
await tick();
|
||||
// The component may have been destroyed during the awaits above.
|
||||
if (!alive || !containerRef) return;
|
||||
|
||||
fitAddon = new FitAddon();
|
||||
term = new Terminal({
|
||||
disableStdin: true,
|
||||
cursorBlink: false,
|
||||
cursorStyle: 'underline',
|
||||
fontFamily: "'JetBrains Mono Variable', 'JetBrains Mono', monospace",
|
||||
fontSize: 13,
|
||||
lineHeight: 1.4,
|
||||
theme: TERM_THEME,
|
||||
scrollback: 10000,
|
||||
convertEol: true
|
||||
});
|
||||
term.loadAddon(fitAddon);
|
||||
term.open(containerRef);
|
||||
requestAnimationFrame(() => fitAddon?.fit());
|
||||
|
||||
resizeObserver = new ResizeObserver(() => {
|
||||
if (fitDebounce) clearTimeout(fitDebounce);
|
||||
fitDebounce = setTimeout(() => fitAddon?.fit(), 50);
|
||||
});
|
||||
resizeObserver.observe(containerRef);
|
||||
|
||||
bc.connect((text) => term?.write(text));
|
||||
});
|
||||
|
||||
onDestroy(() => {
|
||||
alive = false;
|
||||
if (fitDebounce) clearTimeout(fitDebounce);
|
||||
resizeObserver?.disconnect();
|
||||
bc.disconnect();
|
||||
term?.dispose();
|
||||
});
|
||||
</script>
|
||||
|
||||
<div class="flex min-h-0 flex-1 flex-col">
|
||||
<!-- Console toolbar -->
|
||||
<div
|
||||
class="flex items-center gap-3 border-b border-[var(--color-border)] bg-[var(--color-bg-1)] px-4 py-2"
|
||||
>
|
||||
<span class="font-mono text-label text-[var(--color-text-muted)]">{buildId}</span>
|
||||
<span class="font-mono text-label tabular-nums text-[var(--color-text-tertiary)]">
|
||||
{bc.currentStep}/{stepTotal}
|
||||
</span>
|
||||
<div class="flex-1"></div>
|
||||
{#if bc.connState === 'connected'}
|
||||
<span class="relative flex h-[7px] w-[7px]">
|
||||
<span
|
||||
class="animate-status-ping absolute inline-flex h-full w-full rounded-full bg-[var(--color-accent)]"
|
||||
></span>
|
||||
<span class="relative inline-flex h-[7px] w-[7px] rounded-full bg-[var(--color-accent)]"></span>
|
||||
</span>
|
||||
{:else if bc.connState === 'connecting'}
|
||||
<svg
|
||||
class="animate-spin text-[var(--color-text-tertiary)]"
|
||||
width="11"
|
||||
height="11"
|
||||
viewBox="0 0 24 24"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
stroke-width="2.5"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<path d="M21 12a9 9 0 1 1-6.219-8.56" />
|
||||
</svg>
|
||||
{:else}
|
||||
<span
|
||||
class="h-[7px] w-[7px] rounded-full {bc.connState === 'error'
|
||||
? 'bg-[var(--color-red)]'
|
||||
: 'bg-[var(--color-text-muted)]'}"
|
||||
></span>
|
||||
{/if}
|
||||
<span class="text-label text-[var(--color-text-tertiary)]">{connLabel(bc.connState)}</span>
|
||||
</div>
|
||||
|
||||
<!-- Terminal + step list -->
|
||||
<div class="flex min-h-0 flex-1">
|
||||
<div class="relative min-w-0 flex-1 bg-[var(--color-bg-0)]">
|
||||
<div bind:this={containerRef} class="terminal-host absolute inset-0"></div>
|
||||
</div>
|
||||
<aside
|
||||
class="flex w-72 shrink-0 flex-col border-l border-[var(--color-border)] bg-[var(--color-bg-1)]"
|
||||
>
|
||||
<BuildStepList steps={bc.steps} />
|
||||
</aside>
|
||||
</div>
|
||||
|
||||
{#if bc.errorMessage}
|
||||
<div
|
||||
class="border-t border-[var(--color-red)]/30 bg-[var(--color-red)]/5 px-4 py-2 font-mono text-meta text-[var(--color-red)]"
|
||||
>
|
||||
{bc.errorMessage}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
<style>
|
||||
.terminal-host :global(.xterm) {
|
||||
padding: 12px 4px 12px 16px;
|
||||
height: 100%;
|
||||
}
|
||||
.terminal-host :global(.xterm-viewport),
|
||||
.terminal-host :global(.xterm-screen) {
|
||||
background-color: #0a0c0b !important;
|
||||
}
|
||||
.terminal-host :global(.xterm-viewport) {
|
||||
scrollbar-width: thin;
|
||||
scrollbar-color: rgba(94, 140, 88, 0.18) transparent;
|
||||
}
|
||||
.terminal-host :global(.xterm-viewport::-webkit-scrollbar) {
|
||||
width: 6px;
|
||||
}
|
||||
.terminal-host :global(.xterm-viewport::-webkit-scrollbar-thumb) {
|
||||
background: rgba(94, 140, 88, 0.18);
|
||||
border-radius: 3px;
|
||||
}
|
||||
.terminal-host :global(.xterm-viewport::-webkit-scrollbar-thumb:hover) {
|
||||
background: rgba(94, 140, 88, 0.32);
|
||||
}
|
||||
</style>
|
||||
126
frontend/src/lib/components/BuildStepList.svelte
Normal file
126
frontend/src/lib/components/BuildStepList.svelte
Normal file
@ -0,0 +1,126 @@
|
||||
<script lang="ts">
|
||||
import type { BuildStep } from '$lib/build-console-ws.svelte';
|
||||
|
||||
type Props = {
|
||||
steps: BuildStep[];
|
||||
};
|
||||
let { steps }: Props = $props();
|
||||
|
||||
// [keyword, rest] split of a recipe instruction.
|
||||
function splitInstruction(cmd: string): [string, string] {
|
||||
const idx = cmd.indexOf(' ');
|
||||
if (idx === -1) return [cmd.toUpperCase(), ''];
|
||||
return [cmd.slice(0, idx).toUpperCase(), cmd.slice(idx + 1)];
|
||||
}
|
||||
|
||||
function keywordColor(keyword: string): string {
|
||||
switch (keyword) {
|
||||
case 'RUN':
|
||||
return 'var(--color-blue)';
|
||||
case 'START':
|
||||
return 'var(--color-accent-bright)';
|
||||
case 'ENV':
|
||||
return 'var(--color-amber)';
|
||||
case 'USER':
|
||||
return 'var(--color-accent)';
|
||||
case 'COPY':
|
||||
return 'var(--color-text-bright)';
|
||||
case 'WORKDIR':
|
||||
return 'var(--color-text-tertiary)';
|
||||
default:
|
||||
return 'var(--color-text-muted)';
|
||||
}
|
||||
}
|
||||
|
||||
function formatMs(ms: number): string {
|
||||
if (ms < 1000) return `${ms}ms`;
|
||||
return `${(ms / 1000).toFixed(ms < 10000 ? 1 : 0)}s`;
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="flex items-center justify-between border-b border-[var(--color-border)] px-4 py-2.5">
|
||||
<span class="text-label font-semibold uppercase tracking-[0.06em] text-[var(--color-text-tertiary)]">
|
||||
Steps
|
||||
</span>
|
||||
{#if steps.length > 0}
|
||||
<span class="font-mono text-label tabular-nums text-[var(--color-text-muted)]">
|
||||
{steps.length}
|
||||
</span>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
<div class="min-h-0 flex-1 overflow-y-auto">
|
||||
{#each steps as s (s.step)}
|
||||
{@const [kw, rest] = splitInstruction(s.cmd)}
|
||||
<div class="border-b border-[var(--color-border)] px-4 py-2.5">
|
||||
<div class="flex items-center gap-2.5">
|
||||
{#if s.status === 'running'}
|
||||
<span class="relative flex h-2 w-2 shrink-0">
|
||||
<span
|
||||
class="animate-status-ping absolute inline-flex h-full w-full rounded-full bg-[var(--color-blue)] opacity-60"
|
||||
></span>
|
||||
<span class="relative inline-flex h-2 w-2 rounded-full bg-[var(--color-blue)]"></span>
|
||||
</span>
|
||||
{:else if s.status === 'success'}
|
||||
<svg
|
||||
class="shrink-0"
|
||||
width="13"
|
||||
height="13"
|
||||
viewBox="0 0 24 24"
|
||||
fill="none"
|
||||
stroke="var(--color-accent-bright)"
|
||||
stroke-width="2.5"
|
||||
stroke-linecap="round"
|
||||
stroke-linejoin="round"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<polyline points="20 6 9 17 4 12" />
|
||||
</svg>
|
||||
{:else}
|
||||
<svg
|
||||
class="shrink-0"
|
||||
width="13"
|
||||
height="13"
|
||||
viewBox="0 0 24 24"
|
||||
fill="none"
|
||||
stroke="var(--color-red)"
|
||||
stroke-width="2.5"
|
||||
stroke-linecap="round"
|
||||
stroke-linejoin="round"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<line x1="18" y1="6" x2="6" y2="18" />
|
||||
<line x1="6" y1="6" x2="18" y2="18" />
|
||||
</svg>
|
||||
{/if}
|
||||
<span class="font-mono text-label tabular-nums text-[var(--color-text-muted)]">
|
||||
{s.step}
|
||||
</span>
|
||||
<div class="flex-1"></div>
|
||||
{#if s.exit !== null && s.exit !== 0}
|
||||
<span
|
||||
class="rounded-full bg-[var(--color-red)]/10 px-1.5 py-0.5 font-mono text-label text-[var(--color-red)]"
|
||||
>
|
||||
exit {s.exit}
|
||||
</span>
|
||||
{/if}
|
||||
{#if s.elapsedMs !== null}
|
||||
<span class="font-mono text-label tabular-nums text-[var(--color-text-muted)]">
|
||||
{formatMs(s.elapsedMs)}
|
||||
</span>
|
||||
{/if}
|
||||
</div>
|
||||
<code class="mt-1.5 block truncate font-mono text-meta">
|
||||
<span style="color: {keywordColor(kw)}">{kw}</span>{#if rest}
|
||||
<span class="text-[var(--color-text-secondary)]">{rest}</span>
|
||||
{/if}
|
||||
</code>
|
||||
</div>
|
||||
{/each}
|
||||
|
||||
{#if steps.length === 0}
|
||||
<div class="px-4 py-8 text-center text-meta text-[var(--color-text-muted)]">
|
||||
Waiting for the first step...
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
@ -4,14 +4,13 @@
|
||||
import { toast } from '$lib/toast.svelte';
|
||||
import { auth } from '$lib/auth.svelte';
|
||||
import { formatDate, timeAgo } from '$lib/utils/format';
|
||||
import { goto } from '$app/navigation';
|
||||
import {
|
||||
listBuilds,
|
||||
createBuild,
|
||||
cancelBuild,
|
||||
listAdminTemplates,
|
||||
deleteAdminTemplate,
|
||||
type Build,
|
||||
type BuildLogEntry,
|
||||
type AdminTemplate
|
||||
} from '$lib/api/builds';
|
||||
import { listAdminTeams } from '$lib/api/team';
|
||||
@ -33,10 +32,6 @@
|
||||
let hasActiveBuilds = $derived(builds.some((b) => b.status === 'pending' || b.status === 'running'));
|
||||
let visibilityHandler: (() => void) | null = null;
|
||||
|
||||
// Build log expansion
|
||||
let expandedBuildId = $state<string | null>(null);
|
||||
let expandedSteps = $state<Set<number>>(new Set());
|
||||
|
||||
// Team name lookup
|
||||
let teamNames = $state<Map<string, string>>(new Map());
|
||||
|
||||
@ -55,14 +50,12 @@
|
||||
recipe: '',
|
||||
healthcheck: '',
|
||||
skip_pre_post: false,
|
||||
run_as_root: false,
|
||||
archive: null as File | null
|
||||
});
|
||||
let creating = $state(false);
|
||||
let createError = $state<string | null>(null);
|
||||
|
||||
// Cancel build state
|
||||
let cancelingBuildId = $state<string | null>(null);
|
||||
|
||||
// Stats
|
||||
let templateCount = $derived(templates.length);
|
||||
let snapshotCount = $derived(templates.filter((t) => t.type === 'snapshot').length);
|
||||
@ -153,17 +146,15 @@
|
||||
vcpus: createForm.vcpus,
|
||||
memory_mb: createForm.memory_mb,
|
||||
skip_pre_post: createForm.skip_pre_post,
|
||||
run_as_root: createForm.run_as_root,
|
||||
archive: createForm.archive || undefined
|
||||
});
|
||||
|
||||
if (result.ok) {
|
||||
showCreate = false;
|
||||
createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, archive: null };
|
||||
builds = [result.data, ...builds];
|
||||
activeTab = 'builds';
|
||||
expandedBuildId = result.data.id;
|
||||
createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, run_as_root: false, archive: null };
|
||||
toast.success('Build queued');
|
||||
startPolling();
|
||||
goto(`/admin/templates/builds/${result.data.id}`);
|
||||
} else {
|
||||
createError = result.error;
|
||||
}
|
||||
@ -186,36 +177,8 @@
|
||||
deleting = false;
|
||||
}
|
||||
|
||||
async function handleCancelBuild(buildId: string) {
|
||||
cancelingBuildId = buildId;
|
||||
const result = await cancelBuild(buildId);
|
||||
if (result.ok) {
|
||||
builds = builds.map((b) => b.id === buildId ? { ...b, status: 'cancelled' } : b);
|
||||
toast.success('Build cancelled');
|
||||
} else {
|
||||
toast.error(result.error ?? 'Failed to cancel build');
|
||||
}
|
||||
cancelingBuildId = null;
|
||||
}
|
||||
|
||||
function toggleBuildExpand(buildId: string) {
|
||||
if (expandedBuildId === buildId) {
|
||||
expandedBuildId = null;
|
||||
expandedSteps = new Set();
|
||||
} else {
|
||||
expandedBuildId = buildId;
|
||||
expandedSteps = new Set();
|
||||
}
|
||||
}
|
||||
|
||||
function toggleStepExpand(step: number) {
|
||||
const next = new Set(expandedSteps);
|
||||
if (next.has(step)) {
|
||||
next.delete(step);
|
||||
} else {
|
||||
next.add(step);
|
||||
}
|
||||
expandedSteps = next;
|
||||
function openBuild(buildId: string) {
|
||||
goto(`/admin/templates/builds/${buildId}`);
|
||||
}
|
||||
|
||||
function formatBytes(bytes: number): string {
|
||||
@ -245,25 +208,6 @@
|
||||
}
|
||||
}
|
||||
|
||||
// Returns [keyword, rest] from a recipe instruction string.
|
||||
function splitInstruction(cmd: string): [string, string] {
|
||||
const idx = cmd.indexOf(' ');
|
||||
if (idx === -1) return [cmd.toUpperCase(), ''];
|
||||
return [cmd.slice(0, idx).toUpperCase(), cmd.slice(idx + 1)];
|
||||
}
|
||||
|
||||
function keywordColor(keyword: string): string {
|
||||
switch (keyword) {
|
||||
case 'RUN': return 'var(--color-blue)';
|
||||
case 'START': return 'var(--color-accent-bright)';
|
||||
case 'ENV': return 'var(--color-amber)';
|
||||
case 'USER': return 'var(--color-accent)';
|
||||
case 'COPY': return 'var(--color-text-bright)';
|
||||
case 'WORKDIR': return 'var(--color-text-tertiary)';
|
||||
default: return 'var(--color-text-muted)';
|
||||
}
|
||||
}
|
||||
|
||||
onMount(() => {
|
||||
fetchTeamNames();
|
||||
fetchTemplates();
|
||||
@ -302,7 +246,7 @@
|
||||
</p>
|
||||
</div>
|
||||
<button
|
||||
onclick={() => { showCreate = true; createError = null; createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, archive: null }; }}
|
||||
onclick={() => { showCreate = true; createError = null; createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, run_as_root: false, archive: null }; }}
|
||||
class="group flex items-center gap-2.5 rounded-[var(--radius-button)] bg-[var(--color-accent)] px-5 py-2.5 text-ui font-semibold text-white shadow-sm transition-all duration-200 hover:shadow-[0_0_20px_var(--color-accent-glow-mid)] hover:brightness-115 hover:-translate-y-px active:translate-y-0"
|
||||
>
|
||||
<svg width="13" height="13" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round" class="transition-transform duration-200 group-hover:rotate-90"><line x1="12" y1="5" x2="12" y2="19"/><line x1="5" y1="12" x2="19" y2="12"/></svg>
|
||||
@ -453,7 +397,7 @@
|
||||
</p>
|
||||
{#if type === 'templates'}
|
||||
<button
|
||||
onclick={() => { showCreate = true; createError = null; createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, archive: null }; }}
|
||||
onclick={() => { showCreate = true; createError = null; createForm = { name: '', base_template: 'minimal', vcpus: 1, memory_mb: 512, recipe: '', healthcheck: '', skip_pre_post: false, run_as_root: false, archive: null }; }}
|
||||
class="mt-6 flex items-center gap-2 rounded-[var(--radius-button)] border border-[var(--color-accent)]/30 bg-[var(--color-accent)]/10 px-4 py-2 text-ui font-medium text-[var(--color-accent-bright)] transition-all duration-200 hover:bg-[var(--color-accent)]/20 hover:border-[var(--color-accent)]/50"
|
||||
>
|
||||
<svg width="13" height="13" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round"><line x1="12" y1="5" x2="12" y2="19"/><line x1="5" y1="12" x2="19" y2="12"/></svg>
|
||||
@ -564,22 +508,22 @@
|
||||
<tbody>
|
||||
{#each builds as build, i (build.id)}
|
||||
<tr
|
||||
class="table-row-animate border-b border-[var(--color-border)] last:border-0 cursor-pointer transition-colors duration-200
|
||||
{expandedBuildId === build.id ? 'bg-[var(--color-bg-2)]' : 'hover:bg-[var(--color-bg-2)]'}
|
||||
class="build-row group table-row-animate border-b border-[var(--color-border)] last:border-0 cursor-pointer transition-colors duration-200 hover:bg-[var(--color-bg-2)]
|
||||
{build.status === 'running' ? 'build-row-active' : ''}"
|
||||
style="animation-delay: {i * 30}ms"
|
||||
onclick={() => toggleBuildExpand(build.id)}
|
||||
onclick={() => openBuild(build.id)}
|
||||
>
|
||||
<td class="px-5 py-3.5">
|
||||
<div class="flex items-center gap-2.5">
|
||||
<span class="font-mono text-meta text-[var(--color-text-primary)]">{build.id}</span>
|
||||
<svg
|
||||
width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor"
|
||||
stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
|
||||
class="shrink-0 text-[var(--color-text-muted)] transition-transform duration-200 {expandedBuildId === build.id ? 'rotate-90' : ''}"
|
||||
aria-hidden="true"
|
||||
class="shrink-0 text-[var(--color-text-muted)] opacity-0 transition-opacity duration-150 group-hover:opacity-100"
|
||||
>
|
||||
<polyline points="9 18 15 12 9 6"/>
|
||||
</svg>
|
||||
<span class="font-mono text-meta text-[var(--color-text-primary)]">{build.id}</span>
|
||||
</div>
|
||||
</td>
|
||||
<td class="px-5 py-3.5">
|
||||
@ -631,132 +575,6 @@
|
||||
</span>
|
||||
</td>
|
||||
</tr>
|
||||
<!-- Expanded build logs -->
|
||||
{#if expandedBuildId === build.id}
|
||||
<tr>
|
||||
<td colspan="7" class="border-b border-[var(--color-border)] last:border-0">
|
||||
<div class="bg-[var(--color-bg-0)] px-6 py-5" style="animation: fadeUp 0.15s ease both">
|
||||
{#if build.status === 'pending' || build.status === 'running'}
|
||||
<div class="mb-4 flex justify-end">
|
||||
<button
|
||||
onclick={(e) => { e.stopPropagation(); handleCancelBuild(build.id); }}
|
||||
disabled={cancelingBuildId === build.id}
|
||||
class="flex items-center gap-1.5 rounded-[var(--radius-button)] border border-[var(--color-red)]/30 bg-[var(--color-red)]/8 px-3 py-1.5 text-meta text-[var(--color-red)] transition-colors duration-150 hover:bg-[var(--color-red)]/15 disabled:opacity-50"
|
||||
>
|
||||
{#if cancelingBuildId === build.id}
|
||||
<svg class="animate-spin" width="11" height="11" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2"><path d="M21 12a9 9 0 1 1-6.219-8.56"/></svg>
|
||||
{:else}
|
||||
<svg width="11" height="11" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round"><line x1="18" y1="6" x2="6" y2="18"/><line x1="6" y1="6" x2="18" y2="18"/></svg>
|
||||
{/if}
|
||||
Cancel build
|
||||
</button>
|
||||
</div>
|
||||
{/if}
|
||||
{#if build.error}
|
||||
<div class="mb-4 rounded-[var(--radius-input)] border border-[var(--color-red)]/30 bg-[var(--color-red)]/5 px-3 py-2 text-meta text-[var(--color-red)]">
|
||||
{build.error}
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
{#if build.logs && build.logs.length > 0}
|
||||
<div class="space-y-1">
|
||||
{#each build.logs as log, i (i)}
|
||||
{@const isInternal = log.phase === 'pre-build' || log.phase === 'post-build'}
|
||||
{@const recipeIdx = log.phase === 'recipe' ? build.logs.filter(l => l.phase === 'recipe' && l.step <= log.step).length : 0}
|
||||
{@const phaseLabel = isInternal ? (log.phase === 'pre-build' ? 'Pre-build' : 'Post-build') : `Step ${recipeIdx}`}
|
||||
{@const [kw, kwRest] = splitInstruction(log.cmd)}
|
||||
<div class="rounded-[var(--radius-input)] border border-[var(--color-border)] bg-[var(--color-bg-1)] overflow-hidden">
|
||||
<!-- Step header -->
|
||||
<button
|
||||
onclick={(e) => { e.stopPropagation(); toggleStepExpand(log.step); }}
|
||||
class="flex w-full items-center gap-3 px-3 py-2.5 text-left transition-colors duration-150 hover:bg-[var(--color-bg-2)]"
|
||||
>
|
||||
<!-- Status icon -->
|
||||
{#if log.ok}
|
||||
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="var(--color-accent-bright)" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round" class="shrink-0"><polyline points="20 6 9 17 4 12"/></svg>
|
||||
{:else}
|
||||
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="var(--color-red)" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round" class="shrink-0"><line x1="18" y1="6" x2="6" y2="18"/><line x1="6" y1="6" x2="18" y2="18"/></svg>
|
||||
{/if}
|
||||
<span class="shrink-0 text-label font-semibold text-[var(--color-text-tertiary)]">{phaseLabel}</span>
|
||||
<code class="flex-1 truncate font-mono text-meta"><span style="color: {keywordColor(kw)}">{kw}</span>{#if kwRest}{' '}<span class="text-[var(--color-text-secondary)]">{kwRest}</span>{/if}</code>
|
||||
<span class="shrink-0 font-mono text-label text-[var(--color-text-muted)]">{log.elapsed_ms}ms</span>
|
||||
{#if log.exit !== 0}
|
||||
<span class="shrink-0 rounded-full bg-[var(--color-red)]/10 px-1.5 py-0.5 font-mono text-label text-[var(--color-red)]">
|
||||
exit {log.exit}
|
||||
</span>
|
||||
{/if}
|
||||
<svg
|
||||
width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor"
|
||||
stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
|
||||
class="shrink-0 text-[var(--color-text-muted)] transition-transform duration-200 {expandedSteps.has(log.step) ? 'rotate-90' : ''}"
|
||||
>
|
||||
<polyline points="9 18 15 12 9 6"/>
|
||||
</svg>
|
||||
</button>
|
||||
|
||||
<!-- Step output -->
|
||||
{#if expandedSteps.has(log.step)}
|
||||
<div class="border-t border-[var(--color-border)] bg-[var(--color-bg-0)] px-3 py-3" style="animation: fadeUp 0.12s ease both">
|
||||
{#if log.stdout}
|
||||
<div class="mb-2">
|
||||
<span class="text-label font-semibold uppercase tracking-[0.06em] text-[var(--color-text-tertiary)]">stdout</span>
|
||||
<pre class="mt-1 max-h-48 overflow-auto rounded-[var(--radius-input)] bg-[var(--color-bg-1)] px-3 py-2 font-mono text-meta leading-relaxed text-[var(--color-text-secondary)]">{log.stdout}</pre>
|
||||
</div>
|
||||
{/if}
|
||||
{#if log.stderr}
|
||||
<div>
|
||||
<span class="text-label font-semibold uppercase tracking-[0.06em] text-[var(--color-text-tertiary)]">stderr</span>
|
||||
<pre class="mt-1 max-h-48 overflow-auto rounded-[var(--radius-input)] bg-[var(--color-bg-1)] px-3 py-2 font-mono text-meta leading-relaxed text-[var(--color-red)]/80">{log.stderr}</pre>
|
||||
</div>
|
||||
{/if}
|
||||
{#if !log.stdout && !log.stderr}
|
||||
<span class="text-meta text-[var(--color-text-muted)]">No output</span>
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
{:else}
|
||||
<div class="flex items-center gap-2 text-meta text-[var(--color-text-muted)]">
|
||||
{#if build.status === 'pending' || build.status === 'running'}
|
||||
<svg class="animate-spin" width="13" height="13" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2"><path d="M21 12a9 9 0 1 1-6.219-8.56"/></svg>
|
||||
{build.status === 'pending' ? 'Waiting for worker…' : 'Running…'}
|
||||
{:else}
|
||||
No build logs recorded.
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
<!-- Recipe reference -->
|
||||
{#if build.recipe && build.recipe.length > 0}
|
||||
<div class="mt-4 border-t border-[var(--color-border)] pt-4">
|
||||
<div class="flex items-center gap-1.5">
|
||||
<span class="text-label font-semibold uppercase tracking-[0.06em] text-[var(--color-text-tertiary)]">Recipe</span>
|
||||
<CopyButton value={build.recipe.join('\n')} />
|
||||
</div>
|
||||
<div class="mt-2 rounded-[var(--radius-input)] bg-[var(--color-bg-1)] border border-[var(--color-border)] px-3 py-2">
|
||||
{#each build.recipe as cmd, i}
|
||||
{@const [kw, kwRest] = splitInstruction(cmd)}
|
||||
<div class="flex gap-2 py-0.5">
|
||||
<span class="shrink-0 font-mono text-label text-[var(--color-text-muted)] tabular-nums">{i + 1}.</span>
|
||||
<code class="font-mono text-meta"><span style="color: {keywordColor(kw)}">{kw}</span>{#if kwRest}{' '}<span class="text-[var(--color-text-secondary)]">{kwRest}</span>{/if}</code>
|
||||
</div>
|
||||
{/each}
|
||||
</div>
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
{#if build.healthcheck}
|
||||
<div class="mt-3">
|
||||
<span class="text-label font-semibold uppercase tracking-[0.06em] text-[var(--color-text-tertiary)]">Healthcheck</span>
|
||||
<code class="ml-2 font-mono text-meta text-[var(--color-text-secondary)]">{build.healthcheck}</code>
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
</td>
|
||||
</tr>
|
||||
{/if}
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
@ -917,15 +735,32 @@
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<label class="flex cursor-pointer items-center gap-2.5">
|
||||
<input
|
||||
type="checkbox"
|
||||
bind:checked={createForm.skip_pre_post}
|
||||
disabled={creating}
|
||||
class="h-4 w-4 cursor-pointer rounded border border-[var(--color-border)] bg-[var(--color-bg-4)] accent-[var(--color-accent)]"
|
||||
/>
|
||||
<span class="text-ui text-[var(--color-text-secondary)]">Skip pre-build and post-build steps</span>
|
||||
</label>
|
||||
<div class="space-y-3">
|
||||
<label class="flex cursor-pointer items-center gap-2.5">
|
||||
<input
|
||||
type="checkbox"
|
||||
bind:checked={createForm.skip_pre_post}
|
||||
disabled={creating}
|
||||
class="h-4 w-4 cursor-pointer rounded border border-[var(--color-border)] bg-[var(--color-bg-4)] accent-[var(--color-accent)]"
|
||||
/>
|
||||
<span class="text-ui text-[var(--color-text-secondary)]">Skip pre-build and post-build steps</span>
|
||||
</label>
|
||||
|
||||
<label class="flex cursor-pointer items-start gap-2.5">
|
||||
<input
|
||||
type="checkbox"
|
||||
bind:checked={createForm.run_as_root}
|
||||
disabled={creating}
|
||||
class="mt-0.5 h-4 w-4 shrink-0 cursor-pointer rounded border border-[var(--color-border)] bg-[var(--color-bg-4)] accent-[var(--color-accent)]"
|
||||
/>
|
||||
<span class="text-ui text-[var(--color-text-secondary)]">
|
||||
Run recipe as root
|
||||
<span class="mt-0.5 block text-label text-[var(--color-text-muted)]">
|
||||
Default runs the recipe as an unprivileged build user. Enable only when root is required; no build user is created.
|
||||
</span>
|
||||
</span>
|
||||
</label>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="mt-6 flex justify-end gap-3">
|
||||
|
||||
158
internal/api/handlers_build_stream.go
Normal file
158
internal/api/handlers_build_stream.go
Normal file
@ -0,0 +1,158 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/internal/recipe"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/db"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/id"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/service"
|
||||
)
|
||||
|
||||
// buildStreamKeepalive is the interval between server pings on an idle build
|
||||
// stream, preventing intermediaries from closing the WebSocket.
|
||||
const buildStreamKeepalive = 30 * time.Second
|
||||
|
||||
// buildStreamHandler serves the live admin build console WebSocket.
|
||||
type buildStreamHandler struct {
|
||||
db *db.Queries
|
||||
broker *service.BuildBroker
|
||||
}
|
||||
|
||||
func newBuildStreamHandler(db *db.Queries, broker *service.BuildBroker) *buildStreamHandler {
|
||||
return &buildStreamHandler{db: db, broker: broker}
|
||||
}
|
||||
|
||||
// Stream handles WS /v1/admin/builds/{id}/stream. On connect it replays the
|
||||
// completed-step history from the DB log, sends the current build status,
|
||||
// then live-tails events from the build broker until the build finishes or
|
||||
// the client disconnects. Admin auth is enforced by upstream middleware.
|
||||
func (h *buildStreamHandler) Stream(w http.ResponseWriter, r *http.Request) {
|
||||
buildIDStr := chi.URLParam(r, "id")
|
||||
buildID, err := id.ParseBuildID(buildIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid build ID")
|
||||
return
|
||||
}
|
||||
|
||||
build, err := h.db.GetTemplateBuild(r.Context(), buildID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "build not found")
|
||||
return
|
||||
}
|
||||
|
||||
conn, _, err := upgradeAndAuthenticate(w, r)
|
||||
if err != nil {
|
||||
slog.Error("build stream websocket upgrade/auth failed", "error", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
h.runStream(r.Context(), conn, build)
|
||||
}
|
||||
|
||||
func (h *buildStreamHandler) runStream(ctx context.Context, conn *websocket.Conn, build db.TemplateBuild) {
|
||||
ws := &wsWriter{conn: conn}
|
||||
buildIDStr := id.FormatBuildID(build.ID)
|
||||
|
||||
// Replay completed-step history from the DB log snapshot. lastStep is the
|
||||
// highest step number already delivered, used to dedup overlapping live
|
||||
// events for a step that finished between the DB read and the subscribe.
|
||||
lastStep := replayBuildHistory(ws, build)
|
||||
|
||||
ws.writeJSON(service.BuildStreamEvent{
|
||||
Type: "build-status",
|
||||
Status: build.Status,
|
||||
CurrentStep: build.CurrentStep,
|
||||
TotalSteps: build.TotalSteps,
|
||||
Error: build.Error,
|
||||
})
|
||||
|
||||
// A finished build has no live events to follow.
|
||||
if service.IsTerminalBuildStatus(build.Status) {
|
||||
return
|
||||
}
|
||||
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
events, release := h.broker.Subscribe(buildIDStr)
|
||||
defer release()
|
||||
|
||||
// Drain client reads so a disconnect cancels the stream. The client sends
|
||||
// nothing meaningful; any read error means the socket is gone.
|
||||
go func() {
|
||||
for {
|
||||
if _, _, err := conn.ReadMessage(); err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(buildStreamKeepalive)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-streamCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ws.writeJSON(map[string]string{"type": "ping"})
|
||||
case ev, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Skip step events already covered by the history replay.
|
||||
if ev.Type != "build-status" && ev.Step > 0 && ev.Step <= lastStep {
|
||||
continue
|
||||
}
|
||||
ws.writeJSON(ev)
|
||||
if ev.Type == "build-status" && service.IsTerminalBuildStatus(ev.Status) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// replayBuildHistory synthesizes step-start/output/step-end events from the
|
||||
// build's persisted log entries and writes them to the WebSocket. It returns
|
||||
// the highest step number replayed.
|
||||
func replayBuildHistory(ws *wsWriter, build db.TemplateBuild) int {
|
||||
if len(build.Logs) == 0 {
|
||||
return 0
|
||||
}
|
||||
var entries []recipe.BuildLogEntry
|
||||
if err := json.Unmarshal(build.Logs, &entries); err != nil {
|
||||
slog.Warn("build stream: bad log JSON", "build_id", id.FormatBuildID(build.ID), "error", err)
|
||||
return 0
|
||||
}
|
||||
|
||||
lastStep := 0
|
||||
for _, e := range entries {
|
||||
ws.writeJSON(service.BuildStreamEvent{Type: "step-start", Step: e.Step, Phase: e.Phase, Cmd: e.Cmd})
|
||||
if out := e.Stdout + e.Stderr; out != "" {
|
||||
ws.writeJSON(service.BuildStreamEvent{
|
||||
Type: "output",
|
||||
Step: e.Step,
|
||||
Data: base64.StdEncoding.EncodeToString([]byte(out)),
|
||||
})
|
||||
}
|
||||
ws.writeJSON(service.BuildStreamEvent{
|
||||
Type: "step-end", Step: e.Step, Phase: e.Phase, Cmd: e.Cmd,
|
||||
Exit: e.Exit, Ok: e.Ok, ElapsedMs: e.Elapsed,
|
||||
})
|
||||
if e.Step > lastStep {
|
||||
lastStep = e.Step
|
||||
}
|
||||
}
|
||||
return lastStep
|
||||
}
|
||||
@ -42,6 +42,7 @@ type createBuildRequest struct {
|
||||
VCPUs int32 `json:"vcpus"`
|
||||
MemoryMB int32 `json:"memory_mb"`
|
||||
SkipPrePost bool `json:"skip_pre_post"`
|
||||
RunAsRoot bool `json:"run_as_root"`
|
||||
}
|
||||
|
||||
type buildResponse struct {
|
||||
@ -181,6 +182,7 @@ func (h *buildHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
VCPUs: req.VCPUs,
|
||||
MemoryMB: req.MemoryMB,
|
||||
SkipPrePost: req.SkipPrePost,
|
||||
RunAsRoot: req.RunAsRoot,
|
||||
Archive: archive,
|
||||
ArchiveName: archiveName,
|
||||
})
|
||||
|
||||
@ -2899,6 +2899,26 @@ paths:
|
||||
"204":
|
||||
description: Cancelled
|
||||
|
||||
/v1/admin/builds/{id}/stream:
|
||||
get:
|
||||
summary: Stream a build's live console (admin, WebSocket)
|
||||
description: >
|
||||
WebSocket endpoint. On connect, replays the completed-step history,
|
||||
then live-tails JSON events (step-start, output, step-end,
|
||||
build-status, ping) until the build finishes.
|
||||
operationId: adminStreamBuild
|
||||
tags: [admin]
|
||||
security:
|
||||
- sessionAuth: []
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema: {type: string}
|
||||
responses:
|
||||
"101":
|
||||
description: WebSocket upgrade — streams build console events
|
||||
|
||||
/v1/admin/capsules:
|
||||
post:
|
||||
summary: Create a capsule on behalf of any team (admin)
|
||||
|
||||
@ -95,6 +95,7 @@ func New(
|
||||
statsSvc := &service.StatsService{DB: queries, Pool: pgPool}
|
||||
usageSvc := &service.UsageService{DB: queries}
|
||||
buildSvc := &service.BuildService{DB: queries, Redis: rdb, Pool: pool, Scheduler: sched}
|
||||
buildBroker := service.NewBuildBroker(rdb)
|
||||
|
||||
sandbox := newSandboxHandler(sandboxSvc, al)
|
||||
exec := newExecHandler(queries, pool)
|
||||
@ -115,6 +116,7 @@ func New(
|
||||
usageH := newUsageHandler(usageSvc)
|
||||
metricsH := newSandboxMetricsHandler(queries, pool)
|
||||
buildH := newBuildHandler(buildSvc, queries, pool, al)
|
||||
buildStreamH := newBuildStreamHandler(queries, buildBroker)
|
||||
channelH := newChannelHandler(channelSvc, al)
|
||||
ptyH := newPtyHandler(queries, pool)
|
||||
processH := newProcessHandler(queries, pool)
|
||||
@ -341,6 +343,15 @@ func New(
|
||||
r.Get("/capsules", adminCapsules.List)
|
||||
})
|
||||
|
||||
// Admin build console WebSocket — cookie + admin DB check before
|
||||
// upgrade, no CSRF (WS upgrade). Builds are platform-scoped, not
|
||||
// sandbox-scoped, so this sits outside the /capsules/{id} router.
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(requireSession(queries, sessionSvc))
|
||||
r.Use(requireAdmin(queries))
|
||||
r.Get("/builds/{id}/stream", buildStreamH.Stream)
|
||||
})
|
||||
|
||||
r.Route("/capsules/{id}", func(r chi.Router) {
|
||||
// Auth-required non-WS admin capsule routes.
|
||||
r.Group(func(r chi.Router) {
|
||||
|
||||
@ -41,6 +41,28 @@ type ExecFunc func(ctx context.Context, req *connect.Request[pb.ExecRequest]) (*
|
||||
// accumulated log entries. Used for per-step DB progress updates.
|
||||
type ProgressFunc func(step int, entries []BuildLogEntry)
|
||||
|
||||
// StepStartFunc is called immediately before a step begins executing.
|
||||
type StepStartFunc func(step int, phase string, st Step)
|
||||
|
||||
// OutputChunkFunc is called with each raw output chunk produced by a streaming
|
||||
// RUN step, as it arrives.
|
||||
type OutputChunkFunc func(step int, data []byte)
|
||||
|
||||
// PtyChunk is one event from a streaming PTY exec: either an output chunk
|
||||
// (Data set) or the terminal result (Done set, Exit/Err populated).
|
||||
type PtyChunk struct {
|
||||
Data []byte
|
||||
Done bool
|
||||
Exit int32
|
||||
Err error
|
||||
}
|
||||
|
||||
// StreamExecFunc runs shellCmd in a PTY inside sandboxID and returns a channel
|
||||
// of PtyChunk events. The channel is closed after a Done chunk (or an Err
|
||||
// chunk). It is the streaming counterpart of ExecFunc, used for RUN steps so
|
||||
// build output reaches the client live.
|
||||
type StreamExecFunc func(ctx context.Context, sandboxID, shellCmd string) (<-chan PtyChunk, error)
|
||||
|
||||
// Execute runs steps sequentially against sandboxID using execFn.
|
||||
//
|
||||
// - phase labels the log entries (e.g., "pre-build", "recipe", "post-build").
|
||||
@ -63,6 +85,9 @@ func Execute(
|
||||
defaultTimeout time.Duration,
|
||||
bctx *ExecContext,
|
||||
execFn ExecFunc,
|
||||
streamFn StreamExecFunc,
|
||||
onStepStart StepStartFunc,
|
||||
onChunk OutputChunkFunc,
|
||||
onProgress ProgressFunc,
|
||||
) (entries []BuildLogEntry, nextStep int, ok bool) {
|
||||
if defaultTimeout <= 0 {
|
||||
@ -73,6 +98,9 @@ func Execute(
|
||||
for _, st := range steps {
|
||||
step++
|
||||
slog.Info("executing build step", "phase", phase, "step", step, "instruction", st.Raw)
|
||||
if onStepStart != nil {
|
||||
onStepStart(step, phase, st)
|
||||
}
|
||||
|
||||
switch st.Kind {
|
||||
case KindENV:
|
||||
@ -120,7 +148,13 @@ func Execute(
|
||||
if st.Timeout > 0 {
|
||||
timeout = st.Timeout
|
||||
}
|
||||
entry, succeeded := execRun(ctx, st, sandboxID, phase, step, timeout, bctx, execFn)
|
||||
var entry BuildLogEntry
|
||||
var succeeded bool
|
||||
if streamFn != nil {
|
||||
entry, succeeded = execRunStreaming(ctx, st, sandboxID, phase, step, timeout, bctx, streamFn, onChunk)
|
||||
} else {
|
||||
entry, succeeded = execRun(ctx, st, sandboxID, phase, step, timeout, bctx, execFn)
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
if !succeeded {
|
||||
return entries, step, false
|
||||
@ -171,6 +205,66 @@ func execRun(
|
||||
return entry, entry.Ok
|
||||
}
|
||||
|
||||
// execRunStreaming runs a RUN step in a PTY via streamFn, forwarding each
|
||||
// output chunk to onChunk as it arrives. The merged PTY output is also
|
||||
// accumulated into the returned BuildLogEntry.Stdout for cold log viewing.
|
||||
// A PTY merges stdout and stderr onto one stream, so Stderr stays empty
|
||||
// unless the exec itself fails to start.
|
||||
func execRunStreaming(
|
||||
ctx context.Context,
|
||||
st Step,
|
||||
sandboxID, phase string,
|
||||
step int,
|
||||
timeout time.Duration,
|
||||
bctx *ExecContext,
|
||||
streamFn StreamExecFunc,
|
||||
onChunk OutputChunkFunc,
|
||||
) (BuildLogEntry, bool) {
|
||||
execCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
entry := BuildLogEntry{Step: step, Phase: phase, Cmd: st.Raw}
|
||||
|
||||
ch, err := streamFn(execCtx, sandboxID, bctx.WrappedCommand(st.Shell))
|
||||
if err != nil {
|
||||
entry.Stderr = fmt.Sprintf("exec error: %v", err)
|
||||
entry.Elapsed = time.Since(start).Milliseconds()
|
||||
return entry, false
|
||||
}
|
||||
|
||||
var out []byte
|
||||
gotDone := false
|
||||
for chunk := range ch {
|
||||
if chunk.Err != nil {
|
||||
entry.Stdout = string(out)
|
||||
entry.Stderr = fmt.Sprintf("exec error: %v", chunk.Err)
|
||||
entry.Elapsed = time.Since(start).Milliseconds()
|
||||
return entry, false
|
||||
}
|
||||
if chunk.Done {
|
||||
entry.Exit = chunk.Exit
|
||||
gotDone = true
|
||||
continue
|
||||
}
|
||||
out = append(out, chunk.Data...)
|
||||
if onChunk != nil {
|
||||
onChunk(step, chunk.Data)
|
||||
}
|
||||
}
|
||||
|
||||
entry.Stdout = string(out)
|
||||
entry.Elapsed = time.Since(start).Milliseconds()
|
||||
// A channel that closes without a Done chunk means the stream ended
|
||||
// early (cancelled/aborted). Treat that as a failure, never a success.
|
||||
if !gotDone {
|
||||
entry.Stderr = "exec error: build step stream ended without completion"
|
||||
return entry, false
|
||||
}
|
||||
entry.Ok = entry.Exit == 0
|
||||
return entry, entry.Ok
|
||||
}
|
||||
|
||||
// execUser creates a unix user (if not exists), grants passwordless sudo,
|
||||
// and updates bctx.User for subsequent steps.
|
||||
func execUser(
|
||||
|
||||
@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
@ -26,14 +27,17 @@ const (
|
||||
buildCommandTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// preBuildCmds run before the user recipe to prepare the build environment.
|
||||
// apt update runs as root first, then USER switches to wrenn-user for the recipe.
|
||||
// preBuildCmds run before the recipe to prepare the build environment, as
|
||||
// root. The build user (USER/WORKDIR) is not injected here — Create prepends
|
||||
// it to the persisted recipe instead, so "run as root" can omit it with no
|
||||
// build-level flag to track.
|
||||
var preBuildCmds = []string{
|
||||
"RUN apt update",
|
||||
"USER wrenn-user",
|
||||
"WORKDIR /home/wrenn-user",
|
||||
}
|
||||
|
||||
// buildUser is the non-root user a recipe runs as unless run_as_root is set.
|
||||
const buildUser = "wrenn-user"
|
||||
|
||||
// postBuildCmds run after the user recipe to clean up caches and reduce image size.
|
||||
var postBuildCmds = []string{
|
||||
"RUN apt clean",
|
||||
@ -47,6 +51,8 @@ type buildAgentClient interface {
|
||||
CreateSandbox(ctx context.Context, req *connect.Request[pb.CreateSandboxRequest]) (*connect.Response[pb.CreateSandboxResponse], error)
|
||||
DestroySandbox(ctx context.Context, req *connect.Request[pb.DestroySandboxRequest]) (*connect.Response[pb.DestroySandboxResponse], error)
|
||||
Exec(ctx context.Context, req *connect.Request[pb.ExecRequest]) (*connect.Response[pb.ExecResponse], error)
|
||||
PtyAttach(ctx context.Context, req *connect.Request[pb.PtyAttachRequest]) (*connect.ServerStreamForClient[pb.PtyAttachResponse], error)
|
||||
PtyKill(ctx context.Context, req *connect.Request[pb.PtyKillRequest]) (*connect.Response[pb.PtyKillResponse], error)
|
||||
WriteFile(ctx context.Context, req *connect.Request[pb.WriteFileRequest]) (*connect.Response[pb.WriteFileResponse], error)
|
||||
CreateSnapshot(ctx context.Context, req *connect.Request[pb.CreateSnapshotRequest]) (*connect.Response[pb.CreateSnapshotResponse], error)
|
||||
FlattenRootfs(ctx context.Context, req *connect.Request[pb.FlattenRootfsRequest]) (*connect.Response[pb.FlattenRootfsResponse], error)
|
||||
@ -73,6 +79,7 @@ type BuildCreateParams struct {
|
||||
VCPUs int32
|
||||
MemoryMB int32
|
||||
SkipPrePost bool
|
||||
RunAsRoot bool // Run the recipe as root instead of the non-root build user.
|
||||
Archive []byte // Optional tar/tar.gz/zip archive for COPY commands.
|
||||
ArchiveName string // Original filename (used to detect format).
|
||||
}
|
||||
@ -108,7 +115,19 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
p.MemoryMB = 512
|
||||
}
|
||||
|
||||
recipeJSON, err := json.Marshal(p.Recipe)
|
||||
// Assemble the recipe. Unless run_as_root is set, the non-root build user
|
||||
// is prepended as USER + WORKDIR steps. Persisting it in the recipe means
|
||||
// "run as root" needs no build-level flag — it simply omits these steps,
|
||||
// so wrenn-user is never created in a root template.
|
||||
recipeLines := p.Recipe
|
||||
if !p.RunAsRoot {
|
||||
recipeLines = append([]string{
|
||||
"USER " + buildUser,
|
||||
"WORKDIR /home/" + buildUser,
|
||||
}, recipeLines...)
|
||||
}
|
||||
|
||||
recipeJSON, err := json.Marshal(recipeLines)
|
||||
if err != nil {
|
||||
return db.TemplateBuild{}, fmt.Errorf("marshal recipe: %w", err)
|
||||
}
|
||||
@ -130,7 +149,7 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
Healthcheck: p.Healthcheck,
|
||||
Vcpus: p.VCPUs,
|
||||
MemoryMb: p.MemoryMB,
|
||||
TotalSteps: int32(len(p.Recipe) + defaultSteps),
|
||||
TotalSteps: int32(len(recipeLines) + defaultSteps),
|
||||
TemplateID: newTemplateID,
|
||||
TeamID: id.PlatformTeamID,
|
||||
SkipPrePost: p.SkipPrePost,
|
||||
@ -183,6 +202,7 @@ func (s *BuildService) Cancel(ctx context.Context, buildID pgtype.UUID) error {
|
||||
}); err != nil {
|
||||
return fmt.Errorf("update build status: %w", err)
|
||||
}
|
||||
s.publishStatus(ctx, buildID, "cancelled", 0, 0, "")
|
||||
|
||||
// If the build is currently running, signal its context.
|
||||
buildIDStr := id.FormatBuildID(buildID)
|
||||
@ -274,6 +294,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
log.Error("failed to update build status", "error", err)
|
||||
return
|
||||
}
|
||||
s.publishStatus(buildCtx, buildID, "running", 0, build.TotalSteps, "")
|
||||
|
||||
// Parse user recipe.
|
||||
var userRecipe []string
|
||||
@ -318,16 +339,35 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
}
|
||||
bctx := &recipe.ExecContext{EnvVars: envVars, User: "root"}
|
||||
|
||||
// Per-step progress callback for live UI updates.
|
||||
progressFn := func(currentStep int, allEntries []recipe.BuildLogEntry) {
|
||||
s.updateLogs(buildCtx, buildID, currentStep, allEntries)
|
||||
}
|
||||
streamFn := s.ptyStreamExec(agent)
|
||||
|
||||
runPhase := func(phase string, steps []recipe.Step, defaultTimeout time.Duration) bool {
|
||||
newEntries, nextStep, ok := recipe.Execute(buildCtx, phase, steps, sandboxIDStr, step, defaultTimeout, bctx, agent.Exec, func(currentStep int, phaseEntries []recipe.BuildLogEntry) {
|
||||
// Progress callback: combine prior logs with current phase entries.
|
||||
progressFn(currentStep, append(logs, phaseEntries...))
|
||||
})
|
||||
// step-start: published before each step begins.
|
||||
onStepStart := func(stepNum int, ph string, st recipe.Step) {
|
||||
publishBuildEvent(buildCtx, s.Redis, buildIDStr, BuildStreamEvent{
|
||||
Type: "step-start", Step: stepNum, Phase: ph, Cmd: st.Raw,
|
||||
})
|
||||
}
|
||||
// output: raw PTY bytes from a streaming RUN step, base64-encoded.
|
||||
onChunk := func(stepNum int, data []byte) {
|
||||
publishBuildEvent(buildCtx, s.Redis, buildIDStr, BuildStreamEvent{
|
||||
Type: "output", Step: stepNum, Data: base64.StdEncoding.EncodeToString(data),
|
||||
})
|
||||
}
|
||||
// onProgress: persist the DB log snapshot and publish step-end.
|
||||
onProgress := func(currentStep int, phaseEntries []recipe.BuildLogEntry) {
|
||||
s.updateLogs(buildCtx, buildID, currentStep, append(logs, phaseEntries...))
|
||||
if len(phaseEntries) > 0 {
|
||||
last := phaseEntries[len(phaseEntries)-1]
|
||||
publishBuildEvent(buildCtx, s.Redis, buildIDStr, BuildStreamEvent{
|
||||
Type: "step-end", Step: last.Step, Phase: last.Phase, Cmd: last.Cmd,
|
||||
Exit: last.Exit, Ok: last.Ok, ElapsedMs: last.Elapsed,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
newEntries, nextStep, ok := recipe.Execute(buildCtx, phase, steps, sandboxIDStr, step,
|
||||
defaultTimeout, bctx, agent.Exec, streamFn, onStepStart, onChunk, onProgress)
|
||||
logs = append(logs, newEntries...)
|
||||
step = nextStep
|
||||
s.updateLogs(buildCtx, buildID, step, logs)
|
||||
@ -350,15 +390,16 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
return ok
|
||||
}
|
||||
|
||||
// Phase 1: Pre-build (as root) — creates wrenn-user, updates apt.
|
||||
// Phase 1: Pre-build (as root) — apt update.
|
||||
if !build.SkipPrePost {
|
||||
if !runPhase("pre-build", preBuildSteps, 0) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: User recipe — starts as wrenn-user (set by USER in pre-build)
|
||||
// or root if skip_pre_post.
|
||||
// Phase 2: Recipe — the persisted recipe. For non-root builds it begins
|
||||
// with the injected USER/WORKDIR steps that create and switch to the build
|
||||
// user; for run_as_root builds it runs as root throughout.
|
||||
if !runPhase("recipe", userRecipeSteps, buildCommandTimeout) {
|
||||
return
|
||||
}
|
||||
@ -559,6 +600,7 @@ func (s *BuildService) finalizeBuild(
|
||||
}); err != nil {
|
||||
log.Error("failed to mark build as success", "error", err)
|
||||
}
|
||||
s.publishStatus(ctx, buildID, "success", build.TotalSteps, build.TotalSteps, "")
|
||||
|
||||
log.Info("template build completed successfully", "name", build.Name)
|
||||
}
|
||||
@ -658,6 +700,91 @@ func (s *BuildService) failBuild(_ context.Context, buildID pgtype.UUID, errMsg
|
||||
}); err != nil {
|
||||
slog.Error("failed to update build error", "build_id", id.FormatBuildID(buildID), "error", err)
|
||||
}
|
||||
s.publishStatus(ctx, buildID, "failed", 0, 0, errMsg)
|
||||
}
|
||||
|
||||
// build PTY dimensions — wide enough for tools that adapt output to terminal
|
||||
// width (apt/pip progress bars).
|
||||
const (
|
||||
buildPtyCols = 120
|
||||
buildPtyRows = 40
|
||||
)
|
||||
|
||||
// publishStatus emits a build-status event to the build's live stream.
|
||||
func (s *BuildService) publishStatus(ctx context.Context, buildID pgtype.UUID, status string, currentStep, totalSteps int32, errMsg string) {
|
||||
publishBuildEvent(ctx, s.Redis, id.FormatBuildID(buildID), BuildStreamEvent{
|
||||
Type: "build-status",
|
||||
Status: status,
|
||||
CurrentStep: currentStep,
|
||||
TotalSteps: totalSteps,
|
||||
Error: errMsg,
|
||||
})
|
||||
}
|
||||
|
||||
// ptyStreamExec returns a recipe.StreamExecFunc that runs a shell command in a
|
||||
// PTY on the build sandbox via the host agent and streams its output. A PTY
|
||||
// makes build tools emit unbuffered, colorized output (apt/pip progress bars).
|
||||
func (s *BuildService) ptyStreamExec(agent buildAgentClient) recipe.StreamExecFunc {
|
||||
return func(ctx context.Context, sandboxID, shellCmd string) (<-chan recipe.PtyChunk, error) {
|
||||
tag := "build-" + id.NewPtyTag()
|
||||
stream, err := agent.PtyAttach(ctx, connect.NewRequest(&pb.PtyAttachRequest{
|
||||
SandboxId: sandboxID,
|
||||
Tag: tag,
|
||||
Cmd: "/bin/sh",
|
||||
Args: []string{"-c", shellCmd},
|
||||
Cols: buildPtyCols,
|
||||
Rows: buildPtyRows,
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan recipe.PtyChunk, 64)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer stream.Close()
|
||||
|
||||
gotExit := false
|
||||
for stream.Receive() {
|
||||
switch ev := stream.Msg().Event.(type) {
|
||||
case *pb.PtyAttachResponse_Output:
|
||||
select {
|
||||
case ch <- recipe.PtyChunk{Data: ev.Output.Data}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case *pb.PtyAttachResponse_Exited:
|
||||
gotExit = true
|
||||
select {
|
||||
case ch <- recipe.PtyChunk{Done: true, Exit: ev.Exited.ExitCode}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if gotExit {
|
||||
return
|
||||
}
|
||||
// Stream ended with no exit event: timeout, cancellation, or error.
|
||||
// Kill the lingering guest process so it does not keep running.
|
||||
streamErr := stream.Err()
|
||||
if ctx.Err() != nil {
|
||||
killCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
_, _ = agent.PtyKill(killCtx, connect.NewRequest(&pb.PtyKillRequest{
|
||||
SandboxId: sandboxID, Tag: tag,
|
||||
}))
|
||||
cancel()
|
||||
if streamErr == nil {
|
||||
streamErr = ctx.Err()
|
||||
}
|
||||
}
|
||||
if streamErr == nil {
|
||||
streamErr = fmt.Errorf("pty stream ended without an exit event")
|
||||
}
|
||||
ch <- recipe.PtyChunk{Err: streamErr}
|
||||
}()
|
||||
return ch, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) destroySandbox(_ context.Context, agent buildAgentClient, sandboxIDStr string) {
|
||||
|
||||
143
pkg/service/build_broker.go
Normal file
143
pkg/service/build_broker.go
Normal file
@ -0,0 +1,143 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// buildSubBuffer is the per-subscriber channel buffer. A slow WebSocket
|
||||
// consumer that fills the buffer drops live events; it recovers the full
|
||||
// build state from the DB log on reconnect.
|
||||
const buildSubBuffer = 256
|
||||
|
||||
// buildBrokerReconnect is the backoff before re-subscribing to Redis after a
|
||||
// subscription error.
|
||||
const buildBrokerReconnect = 2 * time.Second
|
||||
|
||||
// BuildBroker fans build events out from per-build Redis pub/sub channels to
|
||||
// in-process WebSocket subscribers. A Redis subscription is started lazily for
|
||||
// a build when its first client connects and torn down when the last leaves.
|
||||
//
|
||||
// The build worker publishes via publishBuildEvent (Redis only); the broker is
|
||||
// purely the read/fan-out side. Decoupling through Redis means the worker and
|
||||
// the WebSocket handler need not run in the same process.
|
||||
type BuildBroker struct {
|
||||
rdb *redis.Client
|
||||
mu sync.Mutex
|
||||
builds map[string]*buildFanout
|
||||
}
|
||||
|
||||
type buildFanout struct {
|
||||
subs map[chan BuildStreamEvent]struct{}
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewBuildBroker creates a broker reading from the given Redis client.
|
||||
func NewBuildBroker(rdb *redis.Client) *BuildBroker {
|
||||
return &BuildBroker{rdb: rdb, builds: make(map[string]*buildFanout)}
|
||||
}
|
||||
|
||||
// Subscribe registers an in-process subscriber for buildID's event stream and
|
||||
// returns the receive channel plus a release function. The first subscriber
|
||||
// for a build starts its Redis subscription; the last to release stops it.
|
||||
// The release function is idempotent and closes the channel.
|
||||
func (b *BuildBroker) Subscribe(buildID string) (<-chan BuildStreamEvent, func()) {
|
||||
ch := make(chan BuildStreamEvent, buildSubBuffer)
|
||||
|
||||
b.mu.Lock()
|
||||
fan, ok := b.builds[buildID]
|
||||
if !ok {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fan = &buildFanout{subs: make(map[chan BuildStreamEvent]struct{}), cancel: cancel}
|
||||
b.builds[buildID] = fan
|
||||
go b.run(ctx, buildID)
|
||||
}
|
||||
fan.subs[ch] = struct{}{}
|
||||
b.mu.Unlock()
|
||||
|
||||
var once sync.Once
|
||||
release := func() {
|
||||
once.Do(func() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
fan, ok := b.builds[buildID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if _, present := fan.subs[ch]; !present {
|
||||
return
|
||||
}
|
||||
delete(fan.subs, ch)
|
||||
close(ch)
|
||||
if len(fan.subs) == 0 {
|
||||
fan.cancel()
|
||||
delete(b.builds, buildID)
|
||||
}
|
||||
})
|
||||
}
|
||||
return ch, release
|
||||
}
|
||||
|
||||
// run keeps a Redis subscription alive for buildID, reconnecting on error,
|
||||
// until the fanout's context is cancelled (last subscriber left).
|
||||
func (b *BuildBroker) run(ctx context.Context, buildID string) {
|
||||
for ctx.Err() == nil {
|
||||
b.subscribeOnce(ctx, buildID)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(buildBrokerReconnect):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BuildBroker) subscribeOnce(ctx context.Context, buildID string) {
|
||||
sub := b.rdb.Subscribe(ctx, buildStreamChannel(buildID))
|
||||
defer sub.Close()
|
||||
|
||||
msgCh := sub.Channel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg, ok := <-msgCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var ev BuildStreamEvent
|
||||
if err := json.Unmarshal([]byte(msg.Payload), &ev); err != nil {
|
||||
slog.Warn("build broker: bad event payload", "build_id", buildID, "error", err)
|
||||
continue
|
||||
}
|
||||
b.dispatch(buildID, ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch fans one event to every in-process subscriber. The send is
|
||||
// non-blocking; a full subscriber buffer drops the event. The mutex is held
|
||||
// for the whole dispatch so a concurrent release cannot close a channel
|
||||
// mid-send.
|
||||
func (b *BuildBroker) dispatch(buildID string, ev BuildStreamEvent) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
fan, ok := b.builds[buildID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
for ch := range fan.subs {
|
||||
select {
|
||||
case ch <- ev:
|
||||
default:
|
||||
slog.Debug("build broker: dropped event for slow consumer", "build_id", buildID)
|
||||
}
|
||||
}
|
||||
}
|
||||
72
pkg/service/build_stream.go
Normal file
72
pkg/service/build_stream.go
Normal file
@ -0,0 +1,72 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// buildStreamChannelPrefix is the Redis pub/sub channel prefix for live build
|
||||
// events. One channel per build: wrenn:build:{buildID}.
|
||||
const buildStreamChannelPrefix = "wrenn:build:"
|
||||
|
||||
func buildStreamChannel(buildID string) string {
|
||||
return buildStreamChannelPrefix + buildID
|
||||
}
|
||||
|
||||
// BuildStreamEvent is one event in a build's live stream. The same struct is
|
||||
// published to Redis by the build worker and forwarded verbatim to admin
|
||||
// WebSocket clients, so its JSON shape is the wire contract for both.
|
||||
//
|
||||
// Type discriminates the payload:
|
||||
// - "step-start": Step, Phase, Cmd set.
|
||||
// - "output": Step, Data (base64 PTY bytes) set.
|
||||
// - "step-end": Step, Phase, Cmd, Exit, Ok, ElapsedMs set.
|
||||
// - "build-status": Status, CurrentStep, TotalSteps, Error set.
|
||||
type BuildStreamEvent struct {
|
||||
Type string `json:"type"`
|
||||
Step int `json:"step,omitempty"`
|
||||
Phase string `json:"phase,omitempty"`
|
||||
Cmd string `json:"cmd,omitempty"`
|
||||
Data string `json:"data,omitempty"` // base64-encoded PTY output bytes
|
||||
Exit int32 `json:"exit,omitempty"`
|
||||
Ok bool `json:"ok,omitempty"`
|
||||
ElapsedMs int64 `json:"elapsed_ms,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
CurrentStep int32 `json:"current_step,omitempty"`
|
||||
TotalSteps int32 `json:"total_steps,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
T int64 `json:"t"` // unix milliseconds, set at publish time
|
||||
}
|
||||
|
||||
// IsTerminalBuildStatus reports whether a build status is final (the worker
|
||||
// will publish no further events for it).
|
||||
func IsTerminalBuildStatus(status string) bool {
|
||||
switch status {
|
||||
case "success", "failed", "cancelled":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// publishBuildEvent fire-and-forget publishes one event to a build's Redis
|
||||
// channel. A missing/closed Redis connection only drops live events; the WS
|
||||
// client always has the DB log history to fall back on.
|
||||
func publishBuildEvent(ctx context.Context, rdb *redis.Client, buildID string, ev BuildStreamEvent) {
|
||||
if rdb == nil {
|
||||
return
|
||||
}
|
||||
ev.T = time.Now().UnixMilli()
|
||||
payload, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
slog.Warn("build event marshal failed", "build_id", buildID, "error", err)
|
||||
return
|
||||
}
|
||||
if err := rdb.Publish(ctx, buildStreamChannel(buildID), payload).Err(); err != nil {
|
||||
slog.Debug("build event publish failed", "build_id", buildID, "error", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user