feat(api): real-time SSE event stream for sandbox lifecycle

In-process broker fans out sandbox state events (created/paused/running/
destroyed) to connected SSE clients, filtered by team. Backend publishes
through the channels Publisher; an SSE relay subscribes to Redis Pub/Sub
and dispatches to subscribers. Browser auth uses short-lived tickets
issued via /v1/events/token; SDKs use header auth. Admin routes get a
parallel stream that sees all teams. Frontend dashboard and admin
capsule pages subscribe to push state changes instead of polling.

Sandbox event publishing moved out of AuditLogger into the service layer
so callbacks from the host agent and direct state changes share one
path.
This commit is contained in:
2026-05-18 14:05:06 +06:00
parent 62bede5dae
commit b9cb3998f8
17 changed files with 816 additions and 59 deletions

View File

@ -0,0 +1,110 @@
import { auth } from '$lib/auth.svelte';
import type { Capsule } from '$lib/api/capsules';
export type SSEEvent = {
event: string;
timestamp: string;
team_id: string;
actor: { type: string; id?: string; name?: string };
resource: { id: string; type: string };
sandbox?: Capsule;
};
export type SSEEventHandler = (event: SSEEvent) => void;
export type EventStreamConnection = {
close: () => void;
};
async function fetchTicket(admin: boolean): Promise<string | null> {
const token = auth.token;
if (!token) return null;
const path = admin ? '/api/v1/admin/events/token' : '/api/v1/events/token';
try {
const res = await fetch(path, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`
}
});
if (!res.ok) return null;
const data = await res.json();
return data.ticket ?? null;
} catch {
return null;
}
}
/**
* Connects to the SSE event stream. Returns a handle to close the connection.
* Automatically reconnects on disconnect with exponential backoff.
* Uses a short-lived ticket (obtained via POST) to avoid exposing JWTs in URLs.
*/
export function connectEventStream(
onEvent: SSEEventHandler,
opts?: { admin?: boolean }
): EventStreamConnection {
let closed = false;
let eventSource: EventSource | null = null;
let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
let backoff = 1000;
async function connect() {
if (closed) return;
const isAdmin = opts?.admin ?? false;
const ticket = await fetchTicket(isAdmin);
if (!ticket || closed) return;
const basePath = isAdmin ? '/api/v1/admin/events/stream' : '/api/v1/events/stream';
const url = `${basePath}?ticket=${encodeURIComponent(ticket)}`;
eventSource = new EventSource(url);
eventSource.onopen = () => {
backoff = 1000;
};
eventSource.onerror = () => {
eventSource?.close();
eventSource = null;
if (!closed) {
reconnectTimeout = setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, 30000);
}
};
eventSource.addEventListener('capsule.created', handleEvent);
eventSource.addEventListener('capsule.running', handleEvent);
eventSource.addEventListener('capsule.paused', handleEvent);
eventSource.addEventListener('capsule.destroyed', handleEvent);
eventSource.addEventListener('template.snapshot.created', handleEvent);
eventSource.addEventListener('template.snapshot.deleted', handleEvent);
eventSource.addEventListener('host.up', handleEvent);
eventSource.addEventListener('host.down', handleEvent);
}
function handleEvent(e: MessageEvent) {
try {
const data: SSEEvent = JSON.parse(e.data);
onEvent(data);
} catch {
// Ignore malformed messages.
}
}
function close() {
closed = true;
eventSource?.close();
eventSource = null;
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
}
connect();
return { close };
}

View File

@ -0,0 +1,75 @@
import { connectEventStream, type SSEEvent, type EventStreamConnection } from '$lib/api/events';
import { auth } from '$lib/auth.svelte';
type SSEListener = (event: SSEEvent) => void;
let connection: EventStreamConnection | null = null;
let adminConnection: EventStreamConnection | null = null;
let listeners = new Set<SSEListener>();
let adminListeners = new Set<SSEListener>();
let started = false;
let adminStarted = false;
function dispatch(event: SSEEvent) {
for (const fn of listeners) {
fn(event);
}
}
function adminDispatch(event: SSEEvent) {
for (const fn of adminListeners) {
fn(event);
}
}
function ensureConnected() {
if (connection || !auth.token) return;
connection = connectEventStream(dispatch);
}
function ensureAdminConnected() {
if (adminConnection || !auth.token) return;
adminConnection = connectEventStream(adminDispatch, { admin: true });
}
export function startSSE() {
if (started) return;
started = true;
ensureConnected();
}
export function stopSSE() {
started = false;
connection?.close();
connection = null;
listeners.clear();
}
export function startAdminSSE() {
if (adminStarted) return;
adminStarted = true;
ensureAdminConnected();
}
export function stopAdminSSE() {
adminStarted = false;
adminConnection?.close();
adminConnection = null;
adminListeners.clear();
}
export function subscribeSSE(fn: SSEListener): () => void {
listeners.add(fn);
ensureConnected();
return () => {
listeners.delete(fn);
};
}
export function subscribeAdminSSE(fn: SSEListener): () => void {
adminListeners.add(fn);
ensureAdminConnected();
return () => {
adminListeners.delete(fn);
};
}

View File

@ -1,6 +1,8 @@
<script lang="ts">
import { onMount } from 'svelte';
import AdminSidebar from '$lib/components/AdminSidebar.svelte';
import Toaster from '$lib/components/Toaster.svelte';
import { startAdminSSE, stopAdminSSE } from '$lib/sse.svelte';
let { children } = $props();
let collapsed = $state(
@ -8,6 +10,11 @@
? localStorage.getItem('wrenn_sidebar_collapsed') === 'true'
: false
);
onMount(() => {
startAdminSSE();
return () => stopAdminSSE();
});
</script>
<div class="flex h-screen overflow-hidden">

View File

@ -10,6 +10,8 @@
destroyAdminCapsule,
} from '$lib/api/admin-capsules';
import type { Capsule } from '$lib/api/capsules';
import { subscribeAdminSSE } from '$lib/sse.svelte';
import type { SSEEvent } from '$lib/api/events';
const REFRESH_INTERVAL = 15;
const SPIN_DURATION = 600;
@ -194,6 +196,29 @@
return `${Math.floor(seconds / 86400)}d ago`;
}
function handleSSEEvent(event: SSEEvent) {
if (!event.resource || event.resource.type !== 'sandbox') return;
const sandboxId = event.resource.id;
if (event.event === 'capsule.destroyed') {
capsules = capsules.filter((c) => c.id !== sandboxId);
return;
}
if (event.sandbox) {
const idx = capsules.findIndex((c) => c.id === sandboxId);
if (idx >= 0) {
capsules[idx] = event.sandbox;
capsules = capsules;
} else if (event.event === 'capsule.created') {
capsules = [event.sandbox, ...capsules];
newCapsuleId = sandboxId;
setTimeout(() => { newCapsuleId = null; }, 1600);
}
}
}
function handleVisibility() {
if (document.hidden) {
stopAutoRefresh();
@ -203,14 +228,18 @@
}
}
let unsubscribe: (() => void) | null = null;
onMount(() => {
fetchCapsules();
startAutoRefresh();
unsubscribe = subscribeAdminSSE(handleSSEEvent);
document.addEventListener('visibilitychange', handleVisibility);
});
onDestroy(() => {
stopAutoRefresh();
unsubscribe?.();
document.removeEventListener('visibilitychange', handleVisibility);
});
</script>

View File

@ -14,6 +14,8 @@
snapshotAdminCapsule,
} from '$lib/api/admin-capsules';
import type { Capsule } from '$lib/api/capsules';
import { subscribeAdminSSE } from '$lib/sse.svelte';
import type { SSEEvent } from '$lib/api/events';
const capsuleId: string = $page.params.id ?? '';
const API_BASE = '/api/v1/admin/capsules';
@ -87,6 +89,18 @@
}
let pollTimer: ReturnType<typeof setInterval> | null = null;
let unsubscribe: (() => void) | null = null;
function handleSSEEvent(event: SSEEvent) {
if (!event.resource || event.resource.id !== capsuleId) return;
if (event.event === 'capsule.destroyed') {
goto('/admin/capsules');
return;
}
if (event.sandbox) {
capsule = event.sandbox;
}
}
function startPolling() {
stopPolling();
@ -109,11 +123,13 @@
onMount(() => {
loadCapsule();
startPolling();
unsubscribe = subscribeAdminSSE(handleSSEEvent);
document.addEventListener('visibilitychange', handleVisibility);
});
onDestroy(() => {
stopPolling();
unsubscribe?.();
document.removeEventListener('visibilitychange', handleVisibility);
});
</script>

View File

@ -1,6 +1,8 @@
<script lang="ts">
import { onMount } from 'svelte';
import Sidebar from '$lib/components/Sidebar.svelte';
import Toaster from '$lib/components/Toaster.svelte';
import { startSSE, stopSSE } from '$lib/sse.svelte';
let { children } = $props();
let collapsed = $state(
@ -8,6 +10,11 @@
? localStorage.getItem('wrenn_sidebar_collapsed') === 'true'
: false
);
onMount(() => {
startSSE();
return () => stopSSE();
});
</script>
<div class="flex h-screen overflow-hidden">

View File

@ -13,6 +13,8 @@
resumeCapsule,
type Capsule
} from '$lib/api/capsules';
import { subscribeSSE } from '$lib/sse.svelte';
import type { SSEEvent } from '$lib/api/events';
const REFRESH_INTERVAL = 30;
const SPIN_DURATION = 600;
@ -256,6 +258,33 @@
return `${Math.round(sec / 3600)}h`;
}
function handleSSEEvent(event: SSEEvent) {
if (!event.resource || event.resource.type !== 'sandbox') return;
const sandboxId = event.resource.id;
if (event.event === 'capsule.destroyed') {
capsules = capsules.filter((c) => c.id !== sandboxId);
return;
}
if (event.sandbox) {
const existing = capsules.find((c) => c.id === sandboxId);
if (existing) {
for (const key of Object.keys(event.sandbox) as (keyof Capsule)[]) {
if (existing[key] !== event.sandbox[key]) {
(existing as any)[key] = event.sandbox[key];
}
}
capsules = capsules;
} else if (event.event === 'capsule.created') {
capsules = [event.sandbox, ...capsules];
newCapsuleId = sandboxId;
setTimeout(() => { newCapsuleId = null; }, 1600);
}
}
}
function handleClickOutside(event: MouseEvent) {
if (openMenuId && !(event.target as Element)?.closest('.status-menu-container')) {
openMenuId = null;
@ -274,9 +303,11 @@
onMount(() => {
fetchCapsules();
startAutoRefresh();
const unsubscribe = subscribeSSE(handleSSEEvent);
document.addEventListener('visibilitychange', handleVisibility);
return () => {
stopAutoRefresh();
unsubscribe();
document.removeEventListener('visibilitychange', handleVisibility);
};
});

View File

@ -3,6 +3,8 @@
import { page } from '$app/stores';
import { goto } from '$app/navigation';
import { getCapsule, pauseCapsule, resumeCapsule, type Capsule } from '$lib/api/capsules';
import { subscribeSSE } from '$lib/sse.svelte';
import type { SSEEvent } from '$lib/api/events';
import { toast } from '$lib/toast.svelte';
import SnapshotDialog from '$lib/components/SnapshotDialog.svelte';
import DestroyDialog from '$lib/components/DestroyDialog.svelte';
@ -21,6 +23,18 @@
let capsule = $state<Capsule | null>(null);
let capsuleLoading = $state(true);
let capsuleError = $state<string | null>(null);
let unsubscribeSSE: (() => void) | null = null;
function handleSSEEvent(event: SSEEvent) {
if (!event.resource || event.resource.id !== capsuleId) return;
if (event.event === 'capsule.destroyed') {
goto('/dashboard/capsules');
return;
}
if (event.sandbox) {
capsule = event.sandbox;
}
}
// Lifecycle action state
let actionLoading = $state<string | null>(null);
@ -373,6 +387,8 @@
range = urlRange as MetricRange;
}
unsubscribeSSE = subscribeSSE(handleSSEEvent);
await loadCapsule();
if (!metricsAvailable) return;
@ -394,6 +410,8 @@
onDestroy(() => {
stopPolling();
unsubscribeSSE?.();
unsubscribeSSE = null;
if (visibilityHandler) document.removeEventListener('visibilitychange', visibilityHandler);
chartCpu?.destroy();
chartRam?.destroy();
@ -543,6 +561,14 @@
Pause
{/if}
</button>
<button
onclick={() => { showSnapshot = true; }}
disabled={actionLoading !== null}
class="flex items-center gap-1.5 rounded-[var(--radius-button)] border border-[var(--color-border)] bg-[var(--color-bg-3)] px-3 py-1.5 text-meta font-medium text-[var(--color-text-secondary)] transition-all duration-150 hover:bg-[var(--color-bg-4)] hover:text-[var(--color-text-primary)] disabled:opacity-50"
>
<svg width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.75" stroke-linecap="round" stroke-linejoin="round"><path d="M14.5 4h-5L7 7H2v13a2 2 0 002 2h16a2 2 0 002-2V7h-5l-2.5-3z" /><circle cx="12" cy="15" r="3" /></svg>
Snapshot
</button>
{:else if capsule.status === 'paused'}
<button
onclick={handleResume}
@ -767,7 +793,7 @@
open={showSnapshot}
capsuleId={capsuleId}
onclose={() => { showSnapshot = false; }}
onsnapshot={() => { goto('/dashboard/capsules'); }}
onsnapshot={() => { loadCapsule(); }}
/>
<DestroyDialog

View File

@ -2,23 +2,27 @@ package api
import (
"encoding/json"
"log/slog"
"net/http"
"time"
"github.com/redis/go-redis/v9"
"git.omukk.dev/wrenn/wrenn/pkg/auth"
"git.omukk.dev/wrenn/wrenn/pkg/channels"
"git.omukk.dev/wrenn/wrenn/pkg/db"
"git.omukk.dev/wrenn/wrenn/pkg/id"
"git.omukk.dev/wrenn/wrenn/pkg/service"
)
type sandboxEventHandler struct {
db *db.Queries
rdb *redis.Client
db *db.Queries
rdb *redis.Client
eventPub *channels.Publisher
}
func newSandboxEventHandler(queries *db.Queries, rdb *redis.Client) *sandboxEventHandler {
return &sandboxEventHandler{db: queries, rdb: rdb}
func newSandboxEventHandler(queries *db.Queries, rdb *redis.Client, eventPub *channels.Publisher) *sandboxEventHandler {
return &sandboxEventHandler{db: queries, rdb: rdb, eventPub: eventPub}
}
type sandboxEventRequest struct {
@ -61,5 +65,21 @@ func (h *sandboxEventHandler) Handle(w http.ResponseWriter, r *http.Request) {
Timestamp: req.Timestamp,
})
// Look up sandbox to get teamID, then publish to SSE for immediate frontend update.
sandboxUUID, err := id.ParseSandboxID(req.SandboxID)
if err == nil {
if sb, dbErr := h.db.GetSandbox(r.Context(), sandboxUUID); dbErr == nil {
if sseEvt, ok := sandboxEventToSSE(service.SandboxStateEvent{
Event: req.Event, SandboxID: req.SandboxID,
TeamID: id.FormatTeamID(sb.TeamID), HostID: req.HostID,
Timestamp: req.Timestamp,
}); ok {
h.eventPub.Publish(r.Context(), sseEvt)
}
} else {
slog.Debug("sandbox event callback: sandbox lookup failed", "id", req.SandboxID, "error", dbErr)
}
}
w.WriteHeader(http.StatusNoContent)
}

View File

@ -0,0 +1,191 @@
package api
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"git.omukk.dev/wrenn/wrenn/pkg/auth"
"git.omukk.dev/wrenn/wrenn/pkg/id"
)
// sseTicket represents a short-lived opaque token that authenticates an SSE connection.
type sseTicket struct {
teamID string
isAdmin bool
expires time.Time
}
// SSETicketStore holds outstanding SSE tickets. Tickets are single-use and short-lived.
type SSETicketStore struct {
mu sync.Mutex
tickets map[string]sseTicket
}
// NewSSETicketStore creates a ticket store and starts a background cleanup goroutine.
func NewSSETicketStore() *SSETicketStore {
s := &SSETicketStore{tickets: make(map[string]sseTicket)}
go s.cleanup()
return s
}
func (s *SSETicketStore) cleanup() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
s.mu.Lock()
now := time.Now()
for k, t := range s.tickets {
if now.After(t.expires) {
delete(s.tickets, k)
}
}
s.mu.Unlock()
}
}
// Issue creates a new ticket valid for 30 seconds.
func (s *SSETicketStore) Issue(teamID string, isAdmin bool) (string, error) {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return "", err
}
ticket := hex.EncodeToString(b)
s.mu.Lock()
s.tickets[ticket] = sseTicket{teamID: teamID, isAdmin: isAdmin, expires: time.Now().Add(30 * time.Second)}
s.mu.Unlock()
return ticket, nil
}
// Redeem validates and consumes a ticket (single-use).
func (s *SSETicketStore) Redeem(ticket string) (teamID string, isAdmin bool, ok bool) {
s.mu.Lock()
t, exists := s.tickets[ticket]
if exists {
delete(s.tickets, ticket)
}
s.mu.Unlock()
if !exists || time.Now().After(t.expires) {
return "", false, false
}
return t.teamID, t.isAdmin, true
}
const sseKeepaliveInterval = 30 * time.Second
type sseHandler struct {
broker *SSEBroker
tickets *SSETicketStore
}
func newSSEHandler(broker *SSEBroker, tickets *SSETicketStore) *sseHandler {
return &sseHandler{broker: broker, tickets: tickets}
}
// IssueToken handles POST /v1/events/token — exchanges a JWT/API key for a
// short-lived, single-use SSE ticket. The ticket is passed as ?ticket= on
// the EventSource URL, avoiding long-lived tokens in server logs.
func (h *sseHandler) IssueToken(w http.ResponseWriter, r *http.Request) {
ac := auth.MustFromContext(r.Context())
ticket, err := h.tickets.Issue(id.FormatTeamID(ac.TeamID), false)
if err != nil {
writeError(w, http.StatusInternalServerError, "internal", "failed to issue ticket")
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]string{"ticket": ticket}); err != nil {
slog.Warn("sse token encode failed", "error", err)
}
}
// IssueAdminToken handles POST /v1/admin/events/token — admin variant.
func (h *sseHandler) IssueAdminToken(w http.ResponseWriter, r *http.Request) {
ac := auth.MustFromContext(r.Context())
ticket, err := h.tickets.Issue(id.FormatTeamID(ac.TeamID), true)
if err != nil {
writeError(w, http.StatusInternalServerError, "internal", "failed to issue ticket")
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]string{"ticket": ticket}); err != nil {
slog.Warn("sse token encode failed", "error", err)
}
}
// Stream handles GET /v1/events/stream — authenticates via ?ticket= query param.
func (h *sseHandler) Stream(w http.ResponseWriter, r *http.Request) {
ticket := r.URL.Query().Get("ticket")
if ticket == "" {
// Allow direct header auth for SDK consumers.
ac, ok := auth.FromContext(r.Context())
if !ok {
writeError(w, http.StatusUnauthorized, "unauthorized", "ticket or API key required")
return
}
h.serveSSE(w, r, id.FormatTeamID(ac.TeamID), false)
return
}
teamID, _, ok := h.tickets.Redeem(ticket)
if !ok {
writeError(w, http.StatusUnauthorized, "unauthorized", "invalid or expired ticket")
return
}
h.serveSSE(w, r, teamID, false)
}
// AdminStream handles GET /v1/admin/events/stream — authenticates via ?ticket=.
func (h *sseHandler) AdminStream(w http.ResponseWriter, r *http.Request) {
ticket := r.URL.Query().Get("ticket")
if ticket == "" {
writeError(w, http.StatusUnauthorized, "unauthorized", "ticket required")
return
}
teamID, isAdmin, ok := h.tickets.Redeem(ticket)
if !ok || !isAdmin {
writeError(w, http.StatusUnauthorized, "unauthorized", "invalid or expired ticket")
return
}
h.serveSSE(w, r, teamID, true)
}
func (h *sseHandler) serveSSE(w http.ResponseWriter, r *http.Request, teamID string, isAdmin bool) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
subID, ch := h.broker.Subscribe(teamID, isAdmin)
defer h.broker.Unsubscribe(subID)
// Send initial connected event.
fmt.Fprintf(w, "event: connected\ndata: {\"message\":\"connected\"}\n\n")
flusher.Flush()
keepalive := time.NewTicker(sseKeepaliveInterval)
defer keepalive.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", msg.EventType, msg.Data)
flusher.Flush()
case <-keepalive.C:
fmt.Fprintf(w, ": keepalive\n\n")
flusher.Flush()
}
}
}

View File

@ -179,12 +179,11 @@ func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgty
func (c *SandboxEventConsumer) handlePaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "pausing",
Status_2: "paused",
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
slog.Warn("sandbox event consumer: failed to update sandbox to paused", "sandbox_id", event.SandboxID, "error", err)
ID: sandboxID, Status: "pausing", Status_2: "paused",
}); err != nil {
return
}
slog.Debug("sandbox event consumer: paused fallback applied", "sandbox_id", event.SandboxID)
}
func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
@ -220,16 +219,19 @@ func (c *SandboxEventConsumer) handleFailed(ctx context.Context, sandboxID pgtyp
}
}
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, _ SandboxEvent) {
sb, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "running",
Status_2: "paused",
})
if err != nil {
return
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
// Try each plausible pre-pause state. Shutdown-time PauseAll can race a
// user-initiated pause that already moved the DB to "pausing"; without
// the second attempt that row would stay stuck until the HostMonitor
// transient-grace period elapses (2 minutes).
for _, fromStatus := range []string{"running", "pausing"} {
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID, Status: fromStatus, Status_2: "paused",
}); err == nil {
slog.Debug("sandbox event consumer: auto-paused fallback applied", "sandbox_id", event.SandboxID, "from", fromStatus)
return
}
}
c.audit.LogSandboxAutoPause(ctx, sb.TeamID, sandboxID)
}
// PublishSandboxEvent writes a sandbox lifecycle event to the Redis stream.

View File

@ -17,6 +17,7 @@ import (
"git.omukk.dev/wrenn/wrenn/pkg/channels"
"git.omukk.dev/wrenn/wrenn/pkg/cpextension"
"git.omukk.dev/wrenn/wrenn/pkg/db"
"git.omukk.dev/wrenn/wrenn/pkg/events"
"git.omukk.dev/wrenn/wrenn/pkg/lifecycle"
"git.omukk.dev/wrenn/wrenn/pkg/scheduler"
"git.omukk.dev/wrenn/wrenn/pkg/service"
@ -29,6 +30,7 @@ var openapiYAML []byte
type Server struct {
router chi.Router
BuildSvc *service.BuildService
SSERelay *SSERelay
version string
}
@ -46,6 +48,7 @@ func New(
oauthRedirectURL string,
ca *auth.CA,
al *audit.AuditLogger,
eventPub *channels.Publisher,
channelSvc *channels.Service,
mailer email.Mailer,
extensions []cpextension.Extension,
@ -75,6 +78,9 @@ func New(
Error: event.Error,
Timestamp: event.Timestamp,
})
if sseEvent, ok := sandboxEventToSSE(event); ok {
eventPub.Publish(ctx, sseEvent)
}
}
apiKeySvc := &service.APIKeyService{DB: queries}
templateSvc := &service.TemplateService{DB: queries}
@ -92,7 +98,7 @@ func New(
files := newFilesHandler(queries, pool)
filesStream := newFilesStreamHandler(queries, pool)
fsH := newFSHandler(queries, pool)
snapshots := newSnapshotHandler(templateSvc, queries, pool, al)
snapshots := newSnapshotHandler(templateSvc, sandboxSvc, queries, pool, al)
authH := newAuthHandler(queries, pgPool, jwtSecret, mailer, rdb, oauthRedirectURL)
oauthH := newOAuthHandler(queries, pgPool, jwtSecret, oauthRegistry, oauthRedirectURL)
apiKeys := newAPIKeyHandler(apiKeySvc, al)
@ -108,9 +114,15 @@ func New(
ptyH := newPtyHandler(queries, pool, jwtSecret)
processH := newProcessHandler(queries, pool, jwtSecret)
adminCapsules := newAdminCapsuleHandler(sandboxSvc, queries, pool, al)
sandboxEvtH := newSandboxEventHandler(queries, rdb)
sandboxEvtH := newSandboxEventHandler(queries, rdb, eventPub)
meH := newMeHandler(queries, pgPool, rdb, jwtSecret, mailer, oauthRegistry, oauthRedirectURL, teamSvc)
// SSE real-time event streaming.
sseBroker := NewSSEBroker()
sseRelay := NewSSERelay(rdb, queries, sseBroker)
sseTickets := NewSSETicketStore()
sseH := newSSEHandler(sseBroker, sseTickets)
// Health check.
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
@ -270,11 +282,19 @@ func New(
})
})
// SSE event stream: ticket-based auth for browsers, header auth for SDKs.
r.With(optionalAPIKeyOrJWT(queries, jwtSecret)).Get("/v1/events/stream", sseH.Stream)
r.With(requireAPIKeyOrJWT(queries, jwtSecret)).Post("/v1/events/token", sseH.IssueToken)
// JWT-authenticated: audit log.
r.With(requireJWT(jwtSecret, queries)).Get("/v1/audit-logs", auditH.List)
// Platform admin routes — require JWT + DB-validated admin status.
r.Route("/v1/admin", func(r chi.Router) {
// Admin SSE event stream (sees all teams).
r.Get("/events/stream", sseH.AdminStream)
r.With(requireJWT(jwtSecret, queries), requireAdmin(queries)).Post("/events/token", sseH.IssueAdminToken)
// Auth-required admin routes (non-capsule + capsule list/create).
r.Group(func(r chi.Router) {
r.Use(requireJWT(jwtSecret, queries))
@ -333,7 +353,7 @@ func New(
ext.RegisterRoutes(r, sctx)
}
return &Server{router: r, BuildSvc: buildSvc, version: version}
return &Server{router: r, BuildSvc: buildSvc, SSERelay: sseRelay, version: version}
}
// Handler returns the HTTP handler.
@ -378,3 +398,29 @@ func serveDocs(w http.ResponseWriter, r *http.Request) {
</body>
</html>`)
}
// sandboxEventToSSE maps an internal sandbox lifecycle event to the SSE event
// format consumed by the frontend. Returns false for events that should not be
// broadcast (e.g. internal failures without a user-visible state change).
func sandboxEventToSSE(e service.SandboxStateEvent) (events.Event, bool) {
var eventType string
switch e.Event {
case "sandbox.started":
eventType = events.CapsuleCreated
case "sandbox.resumed":
eventType = events.CapsuleRunning
case "sandbox.paused":
eventType = events.CapsulePaused
case "sandbox.stopped":
eventType = events.CapsuleDestroyed
default:
return events.Event{}, false
}
return events.Event{
Event: eventType,
Timestamp: events.Now(),
TeamID: e.TeamID,
Actor: events.Actor{Type: events.ActorSystem},
Resource: events.Resource{ID: e.SandboxID, Type: "sandbox"},
}, true
}

View File

@ -0,0 +1,80 @@
package api
import (
"encoding/json"
"log/slog"
"sync"
"sync/atomic"
)
const sseChannelBuffer = 32
type sseMessage struct {
EventType string
Data json.RawMessage
}
type sseSubscriber struct {
teamID string
isAdmin bool
ch chan sseMessage
}
// SSEBroker is an in-process fan-out hub that dispatches events to connected
// SSE clients, filtering by team ownership.
type SSEBroker struct {
mu sync.RWMutex
nextID atomic.Uint64
subscribers map[uint64]*sseSubscriber
}
// NewSSEBroker constructs a broker with no subscribers.
func NewSSEBroker() *SSEBroker {
return &SSEBroker{
subscribers: make(map[uint64]*sseSubscriber),
}
}
// Subscribe registers a new SSE client. Returns a subscriber ID (for
// Unsubscribe) and a receive-only channel that delivers filtered events.
func (b *SSEBroker) Subscribe(teamID string, isAdmin bool) (uint64, <-chan sseMessage) {
id := b.nextID.Add(1)
ch := make(chan sseMessage, sseChannelBuffer)
sub := &sseSubscriber{teamID: teamID, isAdmin: isAdmin, ch: ch}
b.mu.Lock()
b.subscribers[id] = sub
b.mu.Unlock()
slog.Debug("sse: client subscribed", "sub_id", id, "team_id", teamID, "admin", isAdmin)
return id, ch
}
// Unsubscribe removes a client. The handler loop exits via context cancellation;
// the channel is not closed here to avoid send-on-closed-channel races with Dispatch.
func (b *SSEBroker) Unsubscribe(id uint64) {
b.mu.Lock()
delete(b.subscribers, id)
b.mu.Unlock()
slog.Debug("sse: client unsubscribed", "sub_id", id)
}
// Dispatch fans out an event to all matching subscribers. Admin subscribers
// receive all events; team subscribers only receive events for their team.
// Non-blocking: events are dropped for slow consumers.
func (b *SSEBroker) Dispatch(eventType string, teamID string, data json.RawMessage) {
b.mu.RLock()
defer b.mu.RUnlock()
msg := sseMessage{EventType: eventType, Data: data}
for id, sub := range b.subscribers {
if !sub.isAdmin && sub.teamID != teamID {
continue
}
select {
case sub.ch <- msg:
default:
slog.Warn("sse: dropped event for slow consumer", "sub_id", id, "event", eventType)
}
}
}

141
internal/api/sse_relay.go Normal file
View File

@ -0,0 +1,141 @@
package api
import (
"context"
"encoding/json"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/redis/go-redis/v9"
"git.omukk.dev/wrenn/wrenn/pkg/db"
"git.omukk.dev/wrenn/wrenn/pkg/events"
"git.omukk.dev/wrenn/wrenn/pkg/id"
)
const ssePubSubChannel = "wrenn:sse"
// sseEventPayload is the JSON envelope sent to SSE clients.
type sseEventPayload struct {
Event string `json:"event"`
Timestamp string `json:"timestamp"`
TeamID string `json:"team_id"`
Actor events.Actor `json:"actor"`
Resource events.Resource `json:"resource"`
Sandbox *sandboxResponse `json:"sandbox,omitempty"`
}
// SSERelay subscribes to the Redis Pub/Sub channel and dispatches hydrated
// events to the in-process broker. One instance per CP process.
type SSERelay struct {
rdb *redis.Client
db *db.Queries
broker *SSEBroker
}
// NewSSERelay constructs the relay.
func NewSSERelay(rdb *redis.Client, queries *db.Queries, broker *SSEBroker) *SSERelay {
return &SSERelay{rdb: rdb, db: queries, broker: broker}
}
// Start launches the Pub/Sub subscription goroutine. Returns when ctx is cancelled.
func (r *SSERelay) Start(ctx context.Context) {
go r.run(ctx)
}
func (r *SSERelay) run(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}
r.subscribe(ctx)
// Backoff before reconnecting.
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Second):
}
}
}
func (r *SSERelay) subscribe(ctx context.Context) {
pubsub := r.rdb.Subscribe(ctx, ssePubSubChannel)
defer pubsub.Close()
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
return
case msg, ok := <-ch:
if !ok {
return
}
r.handleMessage(ctx, msg)
}
}
}
func (r *SSERelay) handleMessage(ctx context.Context, msg *redis.Message) {
var event events.Event
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
slog.Warn("sse relay: failed to unmarshal event", "error", err)
return
}
payload := sseEventPayload{
Event: event.Event,
Timestamp: event.Timestamp,
TeamID: event.TeamID,
Actor: event.Actor,
Resource: event.Resource,
}
// Hydrate sandbox state for capsule events.
if isCapsuleEvent(event.Event) {
sb, err := r.hydrateSandbox(ctx, event.Resource.ID)
if err != nil {
slog.Debug("sse relay: sandbox hydration failed (may be deleted)", "sandbox_id", event.Resource.ID, "error", err)
} else {
payload.Sandbox = sb
}
}
data, err := json.Marshal(payload)
if err != nil {
slog.Warn("sse relay: failed to marshal payload", "error", err)
return
}
r.broker.Dispatch(event.Event, event.TeamID, data)
}
func (r *SSERelay) hydrateSandbox(ctx context.Context, sandboxIDStr string) (*sandboxResponse, error) {
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
sandboxID, err := id.ParseSandboxID(sandboxIDStr)
if err != nil {
return nil, err
}
sb, err := r.db.GetSandbox(queryCtx, sandboxID)
if err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, err
}
resp := sandboxToResponse(sb)
return &resp, nil
}
func isCapsuleEvent(eventType string) bool {
switch eventType {
case events.CapsuleCreated, events.CapsuleRunning, events.CapsulePaused, events.CapsuleDestroyed:
return true
}
return false
}

View File

@ -173,24 +173,10 @@ func resolveHostTeamID(teamID pgtype.UUID) pgtype.UUID {
func (l *AuditLogger) LogSandboxCreate(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID, template string) {
l.Log(ctx, newEntry(ac, ac.TeamID, "team", "sandbox", id.FormatSandboxID(sandboxID), "create", "success", map[string]any{"template": template}))
l.publish(ctx, events.Event{
Event: events.CapsuleCreated,
Timestamp: events.Now(),
TeamID: id.FormatTeamID(ac.TeamID),
Actor: actorToEvent(ac),
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
})
}
func (l *AuditLogger) LogSandboxPause(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
l.Log(ctx, newEntry(ac, ac.TeamID, "team", "sandbox", id.FormatSandboxID(sandboxID), "pause", "success", nil))
l.publish(ctx, events.Event{
Event: events.CapsulePaused,
Timestamp: events.Now(),
TeamID: id.FormatTeamID(ac.TeamID),
Actor: actorToEvent(ac),
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
})
}
// LogSandboxAutoPause records a system-initiated auto-pause (TTL or host reconciler).
@ -200,35 +186,14 @@ func (l *AuditLogger) LogSandboxAutoPause(ctx context.Context, teamID, sandboxID
ResourceType: "sandbox", ResourceID: id.FormatSandboxID(sandboxID),
Action: "pause", Scope: "team", Status: "info",
})
l.publish(ctx, events.Event{
Event: events.CapsulePaused,
Timestamp: events.Now(),
TeamID: id.FormatTeamID(teamID),
Actor: systemActor(),
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
})
}
func (l *AuditLogger) LogSandboxResume(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
l.Log(ctx, newEntry(ac, ac.TeamID, "team", "sandbox", id.FormatSandboxID(sandboxID), "resume", "success", nil))
l.publish(ctx, events.Event{
Event: events.CapsuleRunning,
Timestamp: events.Now(),
TeamID: id.FormatTeamID(ac.TeamID),
Actor: actorToEvent(ac),
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
})
}
func (l *AuditLogger) LogSandboxDestroy(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
l.Log(ctx, newEntry(ac, ac.TeamID, "team", "sandbox", id.FormatSandboxID(sandboxID), "destroy", "warning", nil))
l.publish(ctx, events.Event{
Event: events.CapsuleDestroyed,
Timestamp: events.Now(),
TeamID: id.FormatTeamID(ac.TeamID),
Actor: actorToEvent(ac),
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
})
}
// --- Snapshot events (scope: team) ---

View File

@ -10,7 +10,10 @@ import (
"git.omukk.dev/wrenn/wrenn/pkg/events"
)
const streamKey = "wrenn:events"
const (
streamKey = "wrenn:events"
ssePubSubChannel = "wrenn:sse"
)
// Publisher pushes events onto the Redis stream for the dispatcher to consume.
type Publisher struct {
@ -41,4 +44,9 @@ func (p *Publisher) Publish(ctx context.Context, e events.Event) {
}).Err(); err != nil {
slog.Warn("channels: failed to publish event", "event", e.Event, "error", err)
}
// Fan-out to SSE clients via Pub/Sub (fire-and-forget).
if err := p.rdb.Publish(ctx, ssePubSubChannel, string(payload)).Err(); err != nil {
slog.Warn("channels: failed to publish SSE event", "event", e.Event, "error", err)
}
}

View File

@ -184,7 +184,7 @@ func Run(opts ...Option) {
monitor := api.NewHostMonitor(queries, hostPool, al, 5*time.Minute)
// API server.
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, monitor, o.version)
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelPub, channelSvc, mailer, o.extensions, sctx, monitor, o.version)
// Start template build workers (2 concurrent).
stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2)
@ -197,6 +197,9 @@ func Run(opts ...Option) {
sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al)
sandboxEventConsumer.Start(ctx)
// Start SSE relay (subscribes to Redis Pub/Sub, dispatches to connected clients).
srv.SSERelay.Start(ctx)
// Start host monitor loop.
monitor.Start(ctx)