diff --git a/cmd/host-agent/main.go b/cmd/host-agent/main.go index d49d9e0..ddfc82c 100644 --- a/cmd/host-agent/main.go +++ b/cmd/host-agent/main.go @@ -147,6 +147,11 @@ func main() { mgr := sandbox.New(cfg) + // Set up lifecycle event callback sender so autonomous events + // (auto-pause, auto-destroy) are pushed to the CP proactively. + cb := hostagent.NewCallbackSender(cpURL, credsFile, creds.HostID) + mgr.SetEventSender(hostagent.NewEventSender(cb)) + mgr.StartTTLReaper(ctx) // httpServer is declared here so the shutdown func can reference it. @@ -226,8 +231,9 @@ func main() { func() { doShutdown("host deleted from CP") }, - // onCredsRefreshed: hot-swap the TLS certificate after a JWT refresh. + // onCredsRefreshed: hot-swap the TLS certificate and update callback JWT. func(tf *hostagent.TokenFile) { + cb.UpdateJWT(tf.JWT) if tf.CertPEM == "" || tf.KeyPEM == "" { return } diff --git a/db/queries/sandboxes.sql b/db/queries/sandboxes.sql index 2bf5db7..ddd3ada 100644 --- a/db/queries/sandboxes.sql +++ b/db/queries/sandboxes.sql @@ -72,7 +72,7 @@ ORDER BY created_at DESC; UPDATE sandboxes SET status = 'missing', last_updated = NOW() -WHERE host_id = $1 AND status IN ('running', 'starting', 'pending'); +WHERE host_id = $1 AND status IN ('running', 'starting', 'pending', 'pausing', 'resuming', 'stopping'); -- name: UpdateSandboxMetadata :exec UPDATE sandboxes @@ -80,6 +80,30 @@ SET metadata = $2, last_updated = NOW() WHERE id = $1; +-- name: UpdateSandboxRunningIf :one +-- Conditionally transition a sandbox to running only if the current status +-- matches the expected value. Prevents races where a user destroys a sandbox +-- while the create/resume goroutine is still in-flight. +UPDATE sandboxes +SET status = 'running', + host_ip = $3, + guest_ip = $4, + started_at = $5, + last_active_at = $5, + last_updated = NOW() +WHERE id = $1 AND status = $2 +RETURNING *; + +-- name: UpdateSandboxStatusIf :one +-- Atomically update status only when the current status matches the expected value. +-- Prevents background goroutines from overwriting a status that has since changed +-- (e.g. user destroyed a sandbox while Create was in-flight). +UPDATE sandboxes +SET status = $3, + last_updated = NOW() +WHERE id = $1 AND status = $2 +RETURNING *; + -- name: BulkRestoreRunning :exec -- Called by the reconciler when a host comes back online and its sandboxes are -- confirmed alive. Restores only sandboxes that are in 'missing' state. diff --git a/frontend/src/lib/api/client.ts b/frontend/src/lib/api/client.ts index d6e6459..6afdc50 100644 --- a/frontend/src/lib/api/client.ts +++ b/frontend/src/lib/api/client.ts @@ -2,6 +2,19 @@ import { auth } from '$lib/auth.svelte'; export type ApiResult = { ok: true; data: T } | { ok: false; error: string }; +async function parseResponse(res: Response): Promise> { + if (res.status === 204 || res.status === 202) { + const text = await res.text(); + if (!text) return { ok: true, data: undefined as T }; + const data = JSON.parse(text); + return { ok: true, data: data as T }; + } + + const data = await res.json(); + if (!res.ok) return { ok: false, error: data?.error?.message ?? 'Something went wrong' }; + return { ok: true, data: data as T }; +} + export async function apiFetch(method: string, path: string, body?: unknown): Promise> { try { const headers: Record = { 'Content-Type': 'application/json' }; @@ -13,11 +26,7 @@ export async function apiFetch(method: string, path: string, body?: unknown): body: body ? JSON.stringify(body) : undefined }); - if (res.status === 204) return { ok: true, data: undefined as T }; - - const data = await res.json(); - if (!res.ok) return { ok: false, error: data?.error?.message ?? 'Something went wrong' }; - return { ok: true, data: data as T }; + return await parseResponse(res); } catch { return { ok: false, error: 'Unable to connect to the server' }; } @@ -34,11 +43,7 @@ export async function apiFetchMultipart(method: string, path: string, formDat body: formData }); - if (res.status === 204) return { ok: true, data: undefined as T }; - - const data = await res.json(); - if (!res.ok) return { ok: false, error: data?.error?.message ?? 'Something went wrong' }; - return { ok: true, data: data as T }; + return await parseResponse(res); } catch { return { ok: false, error: 'Unable to connect to the server' }; } diff --git a/frontend/src/routes/admin/capsules/+page.svelte b/frontend/src/routes/admin/capsules/+page.svelte index b0f1923..49bcec8 100644 --- a/frontend/src/routes/admin/capsules/+page.svelte +++ b/frontend/src/routes/admin/capsules/+page.svelte @@ -149,6 +149,8 @@ case 'running': return 'var(--color-accent)'; case 'paused': return 'var(--color-amber)'; case 'error': return 'var(--color-red)'; + case 'starting': case 'resuming': case 'pausing': case 'stopping': + return 'var(--color-blue)'; default: return 'var(--color-text-muted)'; } } @@ -158,6 +160,8 @@ case 'running': return 'rgba(94,140,88,0.12)'; case 'paused': return 'rgba(212,167,60,0.12)'; case 'error': return 'rgba(207,129,114,0.12)'; + case 'starting': case 'resuming': case 'pausing': case 'stopping': + return 'rgba(90,159,212,0.12)'; default: return 'rgba(255,255,255,0.05)'; } } @@ -167,6 +171,8 @@ case 'running': return 'rgba(94,140,88,0.3)'; case 'paused': return 'rgba(212,167,60,0.3)'; case 'error': return 'rgba(207,129,114,0.3)'; + case 'starting': case 'resuming': case 'pausing': case 'stopping': + return 'rgba(90,159,212,0.3)'; default: return 'rgba(255,255,255,0.08)'; } } @@ -418,7 +424,8 @@ {:else} {#each filteredCapsules as capsule, i (capsule.id)} - {@const stripeColor = capsule.status === 'running' ? 'bg-[var(--color-accent)]' : capsule.status === 'paused' ? 'bg-[var(--color-amber)]' : capsule.status === 'error' ? 'bg-[var(--color-red)]' : 'bg-[var(--color-text-muted)]'} + {@const isTransient = ['starting', 'resuming', 'pausing', 'stopping'].includes(capsule.status)} + {@const stripeColor = capsule.status === 'running' ? 'bg-[var(--color-accent)]' : capsule.status === 'paused' ? 'bg-[var(--color-amber)]' : capsule.status === 'error' ? 'bg-[var(--color-red)]' : isTransient ? 'bg-[var(--color-blue)]' : 'bg-[var(--color-text-muted)]'}
{:else if capsule.status === 'error'} + {:else if isTransient} + + + + {:else} {/if} diff --git a/frontend/src/routes/dashboard/capsules/+page.svelte b/frontend/src/routes/dashboard/capsules/+page.svelte index 97b2ad0..74a1703 100644 --- a/frontend/src/routes/dashboard/capsules/+page.svelte +++ b/frontend/src/routes/dashboard/capsules/+page.svelte @@ -470,7 +470,8 @@
{:else} {#each filteredCapsules as capsule, i (capsule.id)} - {@const stripeColor = capsule.status === 'running' ? 'bg-[var(--color-accent)]' : capsule.status === 'paused' ? 'bg-[var(--color-amber)]' : 'bg-[var(--color-text-muted)]'} + {@const isTransient = ['starting', 'resuming', 'pausing', 'stopping'].includes(capsule.status)} + {@const stripeColor = capsule.status === 'running' ? 'bg-[var(--color-accent)]' : capsule.status === 'paused' ? 'bg-[var(--color-amber)]' : isTransient ? 'bg-[var(--color-blue)]' : 'bg-[var(--color-text-muted)]'}
{:else if capsule.status === 'paused'} + {:else if isTransient} + + + + {:else} {/if} @@ -556,7 +562,7 @@ openMenuId = capsule.id; } }} - class="inline-flex items-center gap-1.5 rounded-[var(--radius-button)] border px-2.5 py-1 text-label font-semibold uppercase tracking-[0.04em] transition-colors duration-150 {capsule.status === 'running' ? 'border-[var(--color-accent)]/40 bg-[var(--color-accent-glow)] text-[var(--color-accent-mid)] hover:border-[var(--color-accent)]/70 hover:text-[var(--color-accent-bright)]' : capsule.status === 'paused' ? 'border-[var(--color-amber)]/30 bg-[var(--color-amber)]/5 text-[var(--color-amber)] hover:border-[var(--color-amber)]/60' : 'border-[var(--color-border)] bg-[var(--color-bg-2)] text-[var(--color-text-secondary)] hover:border-[var(--color-border-mid)] hover:text-[var(--color-text-primary)]'}" + class="inline-flex items-center gap-1.5 rounded-[var(--radius-button)] border px-2.5 py-1 text-label font-semibold uppercase tracking-[0.04em] transition-colors duration-150 {capsule.status === 'running' ? 'border-[var(--color-accent)]/40 bg-[var(--color-accent-glow)] text-[var(--color-accent-mid)] hover:border-[var(--color-accent)]/70 hover:text-[var(--color-accent-bright)]' : capsule.status === 'paused' ? 'border-[var(--color-amber)]/30 bg-[var(--color-amber)]/5 text-[var(--color-amber)] hover:border-[var(--color-amber)]/60' : isTransient ? 'border-[var(--color-blue)]/30 bg-[var(--color-blue)]/5 text-[var(--color-blue)]' : 'border-[var(--color-border)] bg-[var(--color-bg-2)] text-[var(--color-text-secondary)] hover:border-[var(--color-border-mid)] hover:text-[var(--color-text-primary)]'}" > {capsule.status} "}, + Count: 10, + Block: 5 * time.Second, + }).Result() + + if err != nil { + if err == redis.Nil || ctx.Err() != nil { + continue + } + slog.Warn("sandbox event consumer: xreadgroup error", "error", err) + time.Sleep(1 * time.Second) + continue + } + + for _, stream := range streams { + for _, msg := range stream.Messages { + c.handleMessage(ctx, msg) + } + } + } +} + +func (c *SandboxEventConsumer) handleMessage(ctx context.Context, msg redis.XMessage) { + // Use a non-cancellable context for XAck so shutdown doesn't leave + // messages permanently stuck in the pending entries list. + defer func() { + ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ackCancel() + if err := c.rdb.XAck(ackCtx, sandboxEventStream, sandboxEventGroup, msg.ID).Err(); err != nil { + slog.Warn("sandbox event consumer: xack failed", "id", msg.ID, "error", err) + } + }() + + payload, ok := msg.Values["payload"].(string) + if !ok { + slog.Warn("sandbox event consumer: message missing payload", "id", msg.ID) + return + } + + var event SandboxEvent + if err := json.Unmarshal([]byte(payload), &event); err != nil { + slog.Warn("sandbox event consumer: failed to unmarshal event", "id", msg.ID, "error", err) + return + } + + sandboxID, err := id.ParseSandboxID(event.SandboxID) + if err != nil { + slog.Warn("sandbox event consumer: invalid sandbox ID", "sandbox_id", event.SandboxID, "error", err) + return + } + + switch event.Event { + case SandboxEventStarted: + c.handleStarted(ctx, sandboxID, event, "starting") + case SandboxEventResumed: + c.handleStarted(ctx, sandboxID, event, "resuming") + case SandboxEventPaused: + c.handlePaused(ctx, sandboxID, event) + case SandboxEventStopped: + c.handleStopped(ctx, sandboxID, event) + case SandboxEventFailed: + c.handleFailed(ctx, sandboxID) + case SandboxEventAutoPaused: + c.handleAutoPaused(ctx, sandboxID, event) + default: + slog.Warn("sandbox event consumer: unknown event type", "event", event.Event) + } +} + +// handleStarted is a fallback writer for sandbox.started and sandbox.resumed +// events. The background goroutine in SandboxService is the primary writer; +// this only succeeds if the goroutine's conditional update was missed. +func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent, fromStatus string) { + now := time.Now() + if _, err := c.db.UpdateSandboxRunningIf(ctx, db.UpdateSandboxRunningIfParams{ + ID: sandboxID, + Status: fromStatus, + HostIp: event.HostIP, + StartedAt: pgtype.Timestamptz{ + Time: now, + Valid: true, + }, + }); err != nil { + return + } + + if len(event.Metadata) > 0 { + metaJSON, _ := json.Marshal(event.Metadata) + _ = c.db.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{ + ID: sandboxID, + Metadata: metaJSON, + }) + } +} + +func (c *SandboxEventConsumer) handlePaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) { + if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, + Status: "pausing", + Status_2: "paused", + }); err != nil && !errors.Is(err, pgx.ErrNoRows) { + slog.Warn("sandbox event consumer: failed to update sandbox to paused", "sandbox_id", event.SandboxID, "error", err) + } +} + +func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) { + if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, + Status: "stopping", + Status_2: "stopped", + }); err != nil && !errors.Is(err, pgx.ErrNoRows) { + slog.Warn("sandbox event consumer: failed to update sandbox to stopped", "sandbox_id", event.SandboxID, "error", err) + } +} + +// handleFailed is a no-op fallback — the background goroutine already +// performed the conditional DB update before publishing this event. +// We keep the case arm so unknown event types are flagged, but avoid +// an unconditional status write that could clobber concurrent operations. +func (c *SandboxEventConsumer) handleFailed(_ context.Context, _ pgtype.UUID) {} + +func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, _ SandboxEvent) { + sb, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, + Status: "running", + Status_2: "paused", + }) + if err != nil { + return + } + c.audit.LogSandboxAutoPause(ctx, sb.TeamID, sandboxID) +} + +// PublishSandboxEvent writes a sandbox lifecycle event to the Redis stream. +// Used by both the SandboxService background goroutines and the callback endpoint. +func PublishSandboxEvent(ctx context.Context, rdb *redis.Client, event SandboxEvent) { + payload, err := json.Marshal(event) + if err != nil { + slog.Warn("sandbox event: failed to marshal", "event", event.Event, "error", err) + return + } + + if err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: sandboxEventStream, + MaxLen: 50000, + Approx: true, + Values: map[string]any{ + "payload": string(payload), + }, + }).Err(); err != nil { + slog.Warn("sandbox event: failed to publish", "event", event.Event, "error", err) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index e59eecd..a4e3e3b 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -1,6 +1,7 @@ package api import ( + "context" _ "embed" "fmt" "net/http" @@ -63,6 +64,17 @@ func New( // Shared service layer. sandboxSvc := &service.SandboxService{DB: queries, Pool: pool, Scheduler: sched} + sandboxSvc.PublishEvent = func(ctx context.Context, event service.SandboxStateEvent) { + PublishSandboxEvent(ctx, rdb, SandboxEvent{ + Event: event.Event, + SandboxID: event.SandboxID, + HostID: event.HostID, + HostIP: event.HostIP, + Metadata: event.Metadata, + Error: event.Error, + Timestamp: event.Timestamp, + }) + } apiKeySvc := &service.APIKeyService{DB: queries} templateSvc := &service.TemplateService{DB: queries} hostSvc := &service.HostService{DB: queries, Redis: rdb, JWT: jwtSecret, Pool: pool, CA: ca} @@ -95,6 +107,7 @@ func New( ptyH := newPtyHandler(queries, pool, jwtSecret) processH := newProcessHandler(queries, pool, jwtSecret) adminCapsules := newAdminCapsuleHandler(sandboxSvc, queries, pool, al) + sandboxEvtH := newSandboxEventHandler(queries, rdb) meH := newMeHandler(queries, pgPool, rdb, jwtSecret, mailer, oauthRegistry, oauthRedirectURL, teamSvc) // Health check. @@ -221,8 +234,9 @@ func New( // Unauthenticated: refresh token exchange. r.Post("/auth/refresh", hostH.RefreshToken) - // Host-token-authenticated: heartbeat. + // Host-token-authenticated: heartbeat and lifecycle callbacks. r.With(requireHostToken(jwtSecret)).Post("/{id}/heartbeat", hostH.Heartbeat) + r.With(requireHostToken(jwtSecret)).Post("/sandbox-events", sandboxEvtH.Handle) // JWT-authenticated: host CRUD and tags. r.Group(func(r chi.Router) { diff --git a/internal/hostagent/callback.go b/internal/hostagent/callback.go new file mode 100644 index 0000000..55fb9a2 --- /dev/null +++ b/internal/hostagent/callback.go @@ -0,0 +1,129 @@ +package hostagent + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strings" + "sync" + "time" +) + +// CallbackEvent is the payload sent to the CP's sandbox event callback endpoint. +type CallbackEvent struct { + Event string `json:"event"` + SandboxID string `json:"sandbox_id"` + HostID string `json:"host_id"` + Timestamp int64 `json:"timestamp"` +} + +// CallbackSender sends sandbox lifecycle events to the CP via HTTP POST. +// Used for autonomous agent-side events (auto-pause, auto-destroy) that +// the CP cannot observe through its own RPC goroutines. +type CallbackSender struct { + cpURL string + hostID string + credFile string + client *http.Client + mu sync.RWMutex + jwt string +} + +// NewCallbackSender creates a callback sender. +func NewCallbackSender(cpURL, credFile, hostID string) *CallbackSender { + jwt := "" + if tf, err := LoadTokenFile(credFile); err == nil { + jwt = tf.JWT + } + return &CallbackSender{ + cpURL: strings.TrimRight(cpURL, "/"), + hostID: hostID, + credFile: credFile, + client: &http.Client{Timeout: 10 * time.Second}, + jwt: jwt, + } +} + +// UpdateJWT refreshes the JWT used for callback authentication. +// Called from the heartbeat's onCredsRefreshed hook. +func (s *CallbackSender) UpdateJWT(jwt string) { + s.mu.Lock() + s.jwt = jwt + s.mu.Unlock() +} + +func (s *CallbackSender) getJWT() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.jwt +} + +// Send sends a callback event to the CP synchronously with retries. +func (s *CallbackSender) Send(ctx context.Context, ev CallbackEvent) error { + ev.HostID = s.hostID + if ev.Timestamp == 0 { + ev.Timestamp = time.Now().Unix() + } + + body, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("marshal callback event: %w", err) + } + + url := s.cpURL + "/v1/hosts/sandbox-events" + + var lastErr error + for attempt := 0; attempt < 3; attempt++ { + if attempt > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(attempt) * 500 * time.Millisecond): + } + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("create callback request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Host-Token", s.getJWT()) + + resp, err := s.client.Do(req) + if err != nil { + lastErr = err + continue + } + resp.Body.Close() + + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + if newCreds, refreshErr := RefreshCredentials(ctx, s.cpURL, s.credFile); refreshErr == nil { + s.UpdateJWT(newCreds.JWT) + } + lastErr = fmt.Errorf("callback auth failed: %d", resp.StatusCode) + continue + } + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + + lastErr = fmt.Errorf("callback failed: status %d", resp.StatusCode) + } + + return fmt.Errorf("callback failed after 3 attempts: %w", lastErr) +} + +// SendAsync sends a callback event in a background goroutine. +func (s *CallbackSender) SendAsync(ev CallbackEvent) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := s.Send(ctx, ev); err != nil { + slog.Warn("callback send failed (reconciler will catch it)", "event", ev.Event, "sandbox_id", ev.SandboxID, "error", err) + } + }() +} diff --git a/internal/hostagent/callback_adapter.go b/internal/hostagent/callback_adapter.go new file mode 100644 index 0000000..5cc7360 --- /dev/null +++ b/internal/hostagent/callback_adapter.go @@ -0,0 +1,22 @@ +package hostagent + +import ( + "git.omukk.dev/wrenn/wrenn/internal/sandbox" +) + +// callbackAdapter adapts CallbackSender to satisfy sandbox.EventSender. +type callbackAdapter struct { + sender *CallbackSender +} + +// NewEventSender wraps a CallbackSender as a sandbox.EventSender. +func NewEventSender(sender *CallbackSender) sandbox.EventSender { + return &callbackAdapter{sender: sender} +} + +func (a *callbackAdapter) SendAsync(event sandbox.LifecycleEvent) { + a.sender.SendAsync(CallbackEvent{ + Event: event.Event, + SandboxID: event.SandboxID, + }) +} diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 5917396..11fd5df 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -41,6 +41,17 @@ type Config struct { AgentVersion string // host agent version (injected via ldflags) } +// LifecycleEvent describes an autonomous state change initiated by the agent. +type LifecycleEvent struct { + Event string + SandboxID string +} + +// EventSender sends autonomous lifecycle events to the control plane. +type EventSender interface { + SendAsync(event LifecycleEvent) +} + // Manager orchestrates sandbox lifecycle: VM, network, filesystem, envd. type Manager struct { cfg Config @@ -57,6 +68,11 @@ type Manager struct { // onDestroy is called with the sandbox ID after cleanup completes. // Used by ProxyHandler to evict cached reverse proxies. onDestroy func(sandboxID string) + + // eventSender sends autonomous lifecycle events (auto-pause, auto-destroy) + // to the CP via HTTP callback. Optional — nil means events are only + // propagated through the HostMonitor reconciler. + eventSender EventSender } // SetOnDestroy registers a callback invoked after each sandbox is cleaned up. @@ -64,6 +80,11 @@ func (m *Manager) SetOnDestroy(fn func(sandboxID string)) { m.onDestroy = fn } +// SetEventSender registers the callback sender for autonomous lifecycle events. +func (m *Manager) SetEventSender(sender EventSender) { + m.eventSender = sender +} + // sandboxState holds the runtime state for a single sandbox. type sandboxState struct { models.Sandbox @@ -1681,6 +1702,13 @@ func (m *Manager) reapExpired(_ context.Context) { m.autoPausedMu.Lock() m.autoPausedIDs = append(m.autoPausedIDs, id) m.autoPausedMu.Unlock() + + if m.eventSender != nil { + m.eventSender.SendAsync(LifecycleEvent{ + Event: "sandbox.auto_paused", + SandboxID: id, + }) + } } } diff --git a/pkg/cpserver/run.go b/pkg/cpserver/run.go index e49b4e2..7678d47 100644 --- a/pkg/cpserver/run.go +++ b/pkg/cpserver/run.go @@ -187,8 +187,13 @@ func Run(opts ...Option) { // Start channel event dispatcher. channelDispatcher.Start(ctx) - // Start host monitor (passive + active reconciliation every 30s). - monitor := api.NewHostMonitor(queries, hostPool, al, 15*time.Second) + // Start sandbox event consumer (processes lifecycle events from Redis stream). + sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al) + sandboxEventConsumer.Start(ctx) + + // Start host monitor (passive + active reconciliation every 60s). + // Reduced from 15s since async events handle the normal case. + monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second) monitor.Start(ctx) // Hard-delete accounts that have been soft-deleted for more than 15 days (runs every 24h). diff --git a/pkg/db/sandboxes.sql.go b/pkg/db/sandboxes.sql.go index c48c9ab..9986e35 100644 --- a/pkg/db/sandboxes.sql.go +++ b/pkg/db/sandboxes.sql.go @@ -375,7 +375,7 @@ const markSandboxesMissingByHost = `-- name: MarkSandboxesMissingByHost :exec UPDATE sandboxes SET status = 'missing', last_updated = NOW() -WHERE host_id = $1 AND status IN ('running', 'starting', 'pending') +WHERE host_id = $1 AND status IN ('running', 'starting', 'pending', 'pausing', 'resuming', 'stopping') ` // Called when the host monitor marks a host unreachable. @@ -470,6 +470,61 @@ func (q *Queries) UpdateSandboxRunning(ctx context.Context, arg UpdateSandboxRun return i, err } +const updateSandboxRunningIf = `-- name: UpdateSandboxRunningIf :one +UPDATE sandboxes +SET status = 'running', + host_ip = $3, + guest_ip = $4, + started_at = $5, + last_active_at = $5, + last_updated = NOW() +WHERE id = $1 AND status = $2 +RETURNING id, team_id, host_id, template, status, vcpus, memory_mb, timeout_sec, disk_size_mb, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated, template_id, template_team_id, metadata +` + +type UpdateSandboxRunningIfParams struct { + ID pgtype.UUID `json:"id"` + Status string `json:"status"` + HostIp string `json:"host_ip"` + GuestIp string `json:"guest_ip"` + StartedAt pgtype.Timestamptz `json:"started_at"` +} + +// Conditionally transition a sandbox to running only if the current status +// matches the expected value. Prevents races where a user destroys a sandbox +// while the create/resume goroutine is still in-flight. +func (q *Queries) UpdateSandboxRunningIf(ctx context.Context, arg UpdateSandboxRunningIfParams) (Sandbox, error) { + row := q.db.QueryRow(ctx, updateSandboxRunningIf, + arg.ID, + arg.Status, + arg.HostIp, + arg.GuestIp, + arg.StartedAt, + ) + var i Sandbox + err := row.Scan( + &i.ID, + &i.TeamID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.DiskSizeMb, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + &i.TemplateID, + &i.TemplateTeamID, + &i.Metadata, + ) + return i, err +} + const updateSandboxStatus = `-- name: UpdateSandboxStatus :one UPDATE sandboxes SET status = $2, @@ -508,3 +563,46 @@ func (q *Queries) UpdateSandboxStatus(ctx context.Context, arg UpdateSandboxStat ) return i, err } + +const updateSandboxStatusIf = `-- name: UpdateSandboxStatusIf :one +UPDATE sandboxes +SET status = $3, + last_updated = NOW() +WHERE id = $1 AND status = $2 +RETURNING id, team_id, host_id, template, status, vcpus, memory_mb, timeout_sec, disk_size_mb, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated, template_id, template_team_id, metadata +` + +type UpdateSandboxStatusIfParams struct { + ID pgtype.UUID `json:"id"` + Status string `json:"status"` + Status_2 string `json:"status_2"` +} + +// Atomically update status only when the current status matches the expected value. +// Prevents background goroutines from overwriting a status that has since changed +// (e.g. user destroyed a sandbox while Create was in-flight). +func (q *Queries) UpdateSandboxStatusIf(ctx context.Context, arg UpdateSandboxStatusIfParams) (Sandbox, error) { + row := q.db.QueryRow(ctx, updateSandboxStatusIf, arg.ID, arg.Status, arg.Status_2) + var i Sandbox + err := row.Scan( + &i.ID, + &i.TeamID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.DiskSizeMb, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + &i.TemplateID, + &i.TemplateTeamID, + &i.Metadata, + ) + return i, err +} diff --git a/pkg/service/sandbox.go b/pkg/service/sandbox.go index aa736da..1c78f7c 100644 --- a/pkg/service/sandbox.go +++ b/pkg/service/sandbox.go @@ -18,12 +18,27 @@ import ( pb "git.omukk.dev/wrenn/wrenn/proto/hostagent/gen" ) +// SandboxEventPublisher writes sandbox lifecycle events to the Redis stream. +type SandboxEventPublisher func(ctx context.Context, event SandboxStateEvent) + +// SandboxStateEvent is the event payload published to the Redis stream. +type SandboxStateEvent struct { + Event string `json:"event"` + SandboxID string `json:"sandbox_id"` + HostID string `json:"host_id"` + HostIP string `json:"host_ip,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` + Error string `json:"error,omitempty"` + Timestamp int64 `json:"timestamp"` +} + // SandboxService provides sandbox lifecycle operations shared between the // REST API and the dashboard. type SandboxService struct { - DB *db.Queries - Pool *lifecycle.HostClientPool - Scheduler scheduler.HostScheduler + DB *db.Queries + Pool *lifecycle.HostClientPool + Scheduler scheduler.HostScheduler + PublishEvent SandboxEventPublisher } // SandboxCreateParams holds the parameters for creating a sandbox. @@ -53,6 +68,12 @@ func (s *SandboxService) agentForSandbox(ctx context.Context, sandboxID pgtype.U return agent, sb, nil } +func (s *SandboxService) publishEvent(ctx context.Context, event SandboxStateEvent) { + if s.PublishEvent != nil { + s.PublishEvent(ctx, event) + } +} + // hostagentClient is a local alias to avoid the full package path in signatures. type hostagentClient = interface { CreateSandbox(ctx context.Context, req *connect.Request[pb.CreateSandboxRequest]) (*connect.Response[pb.CreateSandboxResponse], error) @@ -64,8 +85,10 @@ type hostagentClient = interface { FlushSandboxMetrics(ctx context.Context, req *connect.Request[pb.FlushSandboxMetricsRequest]) (*connect.Response[pb.FlushSandboxMetricsResponse], error) } -// Create creates a new sandbox: picks a host via the scheduler, inserts a pending -// DB record, calls the host agent, and updates the record to running. +// Create creates a new sandbox asynchronously: picks a host, inserts a +// "starting" DB record, fires the agent RPC in a background goroutine, and +// returns the sandbox immediately. The background goroutine publishes a +// sandbox event to the Redis stream when the operation completes. func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db.Sandbox, error) { if p.Template == "" { p.Template = "minimal" @@ -96,11 +119,9 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. templateTeamID = tmpl.TeamID templateID = tmpl.ID templateDefaultUser = tmpl.DefaultUser - // Parse default_env JSONB into a map. if len(tmpl.DefaultEnv) > 0 { _ = json.Unmarshal(tmpl.DefaultEnv, &templateDefaultEnv) } - // If the template is a snapshot, use its baked-in vcpus/memory. if tmpl.Type == "snapshot" { p.VCPUs = tmpl.Vcpus p.MemoryMB = tmpl.MemoryMb @@ -111,13 +132,11 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. return db.Sandbox{}, fmt.Errorf("invalid request: team_id is required") } - // Determine whether this team uses BYOC hosts or platform hosts. team, err := s.DB.GetTeam(ctx, p.TeamID) if err != nil { return db.Sandbox{}, fmt.Errorf("team not found: %w", err) } - // Pick a host for this sandbox. host, err := s.Scheduler.SelectHost(ctx, p.TeamID, team.IsByoc, p.MemoryMB, p.DiskSizeMB) if err != nil { return db.Sandbox{}, fmt.Errorf("select host: %w", err) @@ -130,13 +149,14 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. sandboxID := id.NewSandboxID() sandboxIDStr := id.FormatSandboxID(sandboxID) + hostIDStr := id.FormatHostID(host.ID) - if _, err := s.DB.InsertSandbox(ctx, db.InsertSandboxParams{ + sb, err := s.DB.InsertSandbox(ctx, db.InsertSandboxParams{ ID: sandboxID, TeamID: p.TeamID, HostID: host.ID, Template: p.Template, - Status: "pending", + Status: "starting", Vcpus: p.VCPUs, MemoryMb: p.MemoryMB, TimeoutSec: p.TimeoutSec, @@ -144,11 +164,26 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. TemplateID: templateID, TemplateTeamID: templateTeamID, Metadata: []byte("{}"), - }); err != nil { + }) + if err != nil { return db.Sandbox{}, fmt.Errorf("insert sandbox: %w", err) } - resp, err := agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{ + go s.createInBackground(sandboxID, sandboxIDStr, hostIDStr, agent, p, templateTeamID, templateID, templateDefaultUser, templateDefaultEnv) + + return sb, nil +} + +func (s *SandboxService) createInBackground( + sandboxID pgtype.UUID, sandboxIDStr, hostIDStr string, + agent hostagentClient, p SandboxCreateParams, + templateTeamID, templateID pgtype.UUID, + defaultUser string, defaultEnv map[string]string, +) { + bgCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + resp, err := agent.CreateSandbox(bgCtx, connect.NewRequest(&pb.CreateSandboxRequest{ SandboxId: sandboxIDStr, Template: p.Template, TeamId: id.UUIDString(templateTeamID), @@ -157,45 +192,52 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. MemoryMb: p.MemoryMB, TimeoutSec: p.TimeoutSec, DiskSizeMb: p.DiskSizeMB, - DefaultUser: templateDefaultUser, - DefaultEnv: templateDefaultEnv, + DefaultUser: defaultUser, + DefaultEnv: defaultEnv, })) if err != nil { - if _, dbErr := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ - ID: sandboxID, Status: "error", + slog.Warn("background create failed", "sandbox_id", sandboxIDStr, "error", err) + errCtx, errCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer errCancel() + if _, dbErr := s.DB.UpdateSandboxStatusIf(errCtx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "starting", Status_2: "error", }); dbErr != nil { - slog.Warn("failed to update sandbox status to error", "id", sandboxIDStr, "error", dbErr) + slog.Warn("failed to update sandbox to error after create failure", "id", sandboxIDStr, "error", dbErr) } - return db.Sandbox{}, fmt.Errorf("agent create: %w", err) + s.publishEvent(errCtx, SandboxStateEvent{ + Event: "sandbox.failed", SandboxID: sandboxIDStr, HostID: hostIDStr, + Error: err.Error(), Timestamp: time.Now().Unix(), + }) + return } now := time.Now() - sb, err := s.DB.UpdateSandboxRunning(ctx, db.UpdateSandboxRunningParams{ - ID: sandboxID, - HostIp: resp.Msg.HostIp, - GuestIp: "", + if _, dbErr := s.DB.UpdateSandboxRunningIf(bgCtx, db.UpdateSandboxRunningIfParams{ + ID: sandboxID, + Status: "starting", + HostIp: resp.Msg.HostIp, StartedAt: pgtype.Timestamptz{ Time: now, Valid: true, }, - }) - if err != nil { - return db.Sandbox{}, fmt.Errorf("update sandbox running: %w", err) + }); dbErr != nil { + slog.Warn("failed to update sandbox running after create", "id", sandboxIDStr, "error", dbErr) } - // Store runtime metadata from the agent (envd/kernel/firecracker/agent versions). if meta := resp.Msg.Metadata; len(meta) > 0 { metaJSON, _ := json.Marshal(meta) - if err := s.DB.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{ - ID: sandboxID, - Metadata: metaJSON, + if err := s.DB.UpdateSandboxMetadata(bgCtx, db.UpdateSandboxMetadataParams{ + ID: sandboxID, Metadata: metaJSON, }); err != nil { slog.Warn("failed to store sandbox metadata", "id", sandboxIDStr, "error", err) } - sb.Metadata = metaJSON } - return sb, nil + s.publishEvent(bgCtx, SandboxStateEvent{ + Event: "sandbox.started", SandboxID: sandboxIDStr, HostID: hostIDStr, + HostIP: resp.Msg.HostIp, Metadata: resp.Msg.Metadata, + Timestamp: now.Unix(), + }) } // List returns active sandboxes (excludes stopped/error) belonging to the given team. @@ -208,7 +250,9 @@ func (s *SandboxService) Get(ctx context.Context, sandboxID, teamID pgtype.UUID) return s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID}) } -// Pause snapshots and freezes a running sandbox to disk. +// Pause snapshots and freezes a running sandbox to disk asynchronously. +// Pre-marks the DB status as "pausing" and fires the agent RPC in a +// background goroutine. func (s *SandboxService) Pause(ctx context.Context, sandboxID, teamID pgtype.UUID) (db.Sandbox, error) { sb, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID}) if err != nil { @@ -224,25 +268,29 @@ func (s *SandboxService) Pause(ctx context.Context, sandboxID, teamID pgtype.UUI } sandboxIDStr := id.FormatSandboxID(sandboxID) + hostIDStr := id.FormatHostID(sb.HostID) - // Pre-mark as "paused" in DB before the RPC so the reconciler does not - // mark the sandbox "stopped" while the host agent processes the pause. - if _, err := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ - ID: sandboxID, Status: "paused", - }); err != nil { - return db.Sandbox{}, fmt.Errorf("pre-mark paused: %w", err) + sb, err = s.DB.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "running", Status_2: "pausing", + }) + if err != nil { + return db.Sandbox{}, fmt.Errorf("sandbox status changed concurrently") } - // Flush all metrics tiers before pausing so data survives in DB. - s.flushAndPersistMetrics(ctx, agent, sandboxID, true) + go s.pauseInBackground(sandboxID, sandboxIDStr, hostIDStr, agent) - if _, err := agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{ + return sb, nil +} + +func (s *SandboxService) pauseInBackground(sandboxID pgtype.UUID, sandboxIDStr, hostIDStr string, agent hostagentClient) { + bgCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + s.flushAndPersistMetrics(bgCtx, agent, sandboxID, true) + + if _, err := agent.PauseSandbox(bgCtx, connect.NewRequest(&pb.PauseSandboxRequest{ SandboxId: sandboxIDStr, })); err != nil { - // Check if the agent still has this sandbox. If it was destroyed - // (e.g. frozen VM couldn't be resumed), mark as "error" instead of - // reverting to "running" — which would create a ghost record. - // Use a fresh context since the original ctx may already be expired. revertStatus := "running" pingCtx, pingCancel := context.WithTimeout(context.Background(), 10*time.Second) if _, pingErr := agent.PingSandbox(pingCtx, connect.NewRequest(&pb.PingSandboxRequest{ @@ -253,23 +301,37 @@ func (s *SandboxService) Pause(ctx context.Context, sandboxID, teamID pgtype.UUI } pingCancel() dbCtx, dbCancel := context.WithTimeout(context.Background(), 5*time.Second) - if _, dbErr := s.DB.UpdateSandboxStatus(dbCtx, db.UpdateSandboxStatusParams{ - ID: sandboxID, Status: revertStatus, + if _, dbErr := s.DB.UpdateSandboxStatusIf(dbCtx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "pausing", Status_2: revertStatus, }); dbErr != nil { slog.Warn("failed to revert sandbox status after pause error", "sandbox_id", sandboxIDStr, "error", dbErr) } dbCancel() - return db.Sandbox{}, fmt.Errorf("agent pause: %w", err) + + evtCtx, evtCancel := context.WithTimeout(context.Background(), 5*time.Second) + s.publishEvent(evtCtx, SandboxStateEvent{ + Event: "sandbox.failed", SandboxID: sandboxIDStr, HostID: hostIDStr, + Error: err.Error(), Timestamp: time.Now().Unix(), + }) + evtCancel() + return } - sb, err = s.DB.GetSandbox(ctx, sandboxID) - if err != nil { - return db.Sandbox{}, fmt.Errorf("get sandbox after pause: %w", err) + if _, err := s.DB.UpdateSandboxStatusIf(bgCtx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "pausing", Status_2: "paused", + }); err != nil { + slog.Warn("failed to update sandbox to paused", "sandbox_id", sandboxIDStr, "error", err) } - return sb, nil + + s.publishEvent(bgCtx, SandboxStateEvent{ + Event: "sandbox.paused", SandboxID: sandboxIDStr, HostID: hostIDStr, + Timestamp: time.Now().Unix(), + }) } -// Resume restores a paused sandbox from snapshot. +// Resume restores a paused sandbox from snapshot asynchronously. +// Pre-marks the DB status as "resuming" and fires the agent RPC in a +// background goroutine. func (s *SandboxService) Resume(ctx context.Context, sandboxID, teamID pgtype.UUID) (db.Sandbox, error) { sb, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID}) if err != nil { @@ -285,8 +347,8 @@ func (s *SandboxService) Resume(ctx context.Context, sandboxID, teamID pgtype.UU } sandboxIDStr := id.FormatSandboxID(sandboxID) + hostIDStr := id.FormatHostID(sb.HostID) - // Look up template defaults for resume. var resumeDefaultUser string var resumeDefaultEnv map[string]string if sb.TemplateID.Valid { @@ -299,7 +361,6 @@ func (s *SandboxService) Resume(ctx context.Context, sandboxID, teamID pgtype.UU } } - // Extract kernel version hint from existing sandbox metadata. var kernelVersion string if len(sb.Metadata) > 0 { var meta map[string]string @@ -308,52 +369,88 @@ func (s *SandboxService) Resume(ctx context.Context, sandboxID, teamID pgtype.UU } } - resp, err := agent.ResumeSandbox(ctx, connect.NewRequest(&pb.ResumeSandboxRequest{ - SandboxId: sandboxIDStr, - TimeoutSec: sb.TimeoutSec, - DefaultUser: resumeDefaultUser, - DefaultEnv: resumeDefaultEnv, - KernelVersion: kernelVersion, - })) - if err != nil { - return db.Sandbox{}, fmt.Errorf("agent resume: %w", err) - } - - now := time.Now() - sb, err = s.DB.UpdateSandboxRunning(ctx, db.UpdateSandboxRunningParams{ - ID: sandboxID, - HostIp: resp.Msg.HostIp, - GuestIp: "", - StartedAt: pgtype.Timestamptz{ - Time: now, - Valid: true, - }, + sb, err = s.DB.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "paused", Status_2: "resuming", }) if err != nil { - return db.Sandbox{}, fmt.Errorf("update status: %w", err) + return db.Sandbox{}, fmt.Errorf("sandbox status changed concurrently") } - // Update metadata with actual versions used after resume. - if meta := resp.Msg.Metadata; len(meta) > 0 { - metaJSON, _ := json.Marshal(meta) - if err := s.DB.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{ - ID: sandboxID, - Metadata: metaJSON, - }); err != nil { - slog.Warn("failed to update sandbox metadata after resume", "id", sandboxIDStr, "error", err) - } - sb.Metadata = metaJSON - } + go s.resumeInBackground(sandboxID, sandboxIDStr, hostIDStr, agent, sb.TimeoutSec, resumeDefaultUser, resumeDefaultEnv, kernelVersion) return sb, nil } -// Destroy stops a sandbox and marks it as stopped. +func (s *SandboxService) resumeInBackground( + sandboxID pgtype.UUID, sandboxIDStr, hostIDStr string, + agent hostagentClient, timeoutSec int32, + defaultUser string, defaultEnv map[string]string, kernelVersion string, +) { + bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + resp, err := agent.ResumeSandbox(bgCtx, connect.NewRequest(&pb.ResumeSandboxRequest{ + SandboxId: sandboxIDStr, + TimeoutSec: timeoutSec, + DefaultUser: defaultUser, + DefaultEnv: defaultEnv, + KernelVersion: kernelVersion, + })) + if err != nil { + slog.Warn("background resume failed", "sandbox_id", sandboxIDStr, "error", err) + errCtx, errCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer errCancel() + if _, dbErr := s.DB.UpdateSandboxStatusIf(errCtx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "resuming", Status_2: "paused", + }); dbErr != nil { + slog.Warn("failed to revert sandbox to paused after resume failure", "id", sandboxIDStr, "error", dbErr) + } + s.publishEvent(errCtx, SandboxStateEvent{ + Event: "sandbox.failed", SandboxID: sandboxIDStr, HostID: hostIDStr, + Error: err.Error(), Timestamp: time.Now().Unix(), + }) + return + } + + now := time.Now() + if _, dbErr := s.DB.UpdateSandboxRunningIf(bgCtx, db.UpdateSandboxRunningIfParams{ + ID: sandboxID, + Status: "resuming", + HostIp: resp.Msg.HostIp, + StartedAt: pgtype.Timestamptz{ + Time: now, + Valid: true, + }, + }); dbErr != nil { + slog.Warn("failed to update sandbox running after resume", "id", sandboxIDStr, "error", dbErr) + } + + if meta := resp.Msg.Metadata; len(meta) > 0 { + metaJSON, _ := json.Marshal(meta) + if err := s.DB.UpdateSandboxMetadata(bgCtx, db.UpdateSandboxMetadataParams{ + ID: sandboxID, Metadata: metaJSON, + }); err != nil { + slog.Warn("failed to update sandbox metadata after resume", "id", sandboxIDStr, "error", err) + } + } + + s.publishEvent(bgCtx, SandboxStateEvent{ + Event: "sandbox.resumed", SandboxID: sandboxIDStr, HostID: hostIDStr, + HostIP: resp.Msg.HostIp, Metadata: resp.Msg.Metadata, + Timestamp: now.Unix(), + }) +} + +// Destroy stops a sandbox asynchronously. Pre-marks the DB status as +// "stopping" and fires the agent RPC in a background goroutine. func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID pgtype.UUID) error { sb, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID}) if err != nil { return fmt.Errorf("sandbox not found: %w", err) } + if sb.Status == "stopped" || sb.Status == "error" { + return nil + } agent, _, err := s.agentForSandbox(ctx, sandboxID) if err != nil { @@ -361,35 +458,53 @@ func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID pgtype.U } sandboxIDStr := id.FormatSandboxID(sandboxID) + hostIDStr := id.FormatHostID(sb.HostID) + prevStatus := sb.Status - // If running, flush 24h tier metrics for analytics before destroying. - if sb.Status == "running" { - s.flushAndPersistMetrics(ctx, agent, sandboxID, false) + if _, err := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ + ID: sandboxID, Status: "stopping", + }); err != nil { + return fmt.Errorf("pre-mark stopping: %w", err) } - // Destroy on host agent. A not-found response is fine — sandbox is already gone. - if _, err := agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{ + go s.destroyInBackground(sandboxID, sandboxIDStr, hostIDStr, agent, prevStatus) + + return nil +} + +func (s *SandboxService) destroyInBackground(sandboxID pgtype.UUID, sandboxIDStr, hostIDStr string, agent hostagentClient, prevStatus string) { + bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + if prevStatus == "running" || prevStatus == "pausing" { + s.flushAndPersistMetrics(bgCtx, agent, sandboxID, false) + } + + if _, err := agent.DestroySandbox(bgCtx, connect.NewRequest(&pb.DestroySandboxRequest{ SandboxId: sandboxIDStr, })); err != nil && connect.CodeOf(err) != connect.CodeNotFound { - return fmt.Errorf("agent destroy: %w", err) + slog.Warn("background destroy failed", "sandbox_id", sandboxIDStr, "error", err) } - // For a paused sandbox, only keep 24h tier; remove the finer-grained tiers. - if sb.Status == "paused" { - _ = s.DB.DeleteSandboxMetricPointsByTier(ctx, db.DeleteSandboxMetricPointsByTierParams{ + if prevStatus == "paused" { + _ = s.DB.DeleteSandboxMetricPointsByTier(bgCtx, db.DeleteSandboxMetricPointsByTierParams{ SandboxID: sandboxID, Tier: "10m", }) - _ = s.DB.DeleteSandboxMetricPointsByTier(ctx, db.DeleteSandboxMetricPointsByTierParams{ + _ = s.DB.DeleteSandboxMetricPointsByTier(bgCtx, db.DeleteSandboxMetricPointsByTierParams{ SandboxID: sandboxID, Tier: "2h", }) } - if _, err := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ - ID: sandboxID, Status: "stopped", + if _, err := s.DB.UpdateSandboxStatusIf(bgCtx, db.UpdateSandboxStatusIfParams{ + ID: sandboxID, Status: "stopping", Status_2: "stopped", }); err != nil { - return fmt.Errorf("update status: %w", err) + slog.Warn("failed to update sandbox to stopped", "sandbox_id", sandboxIDStr, "error", err) } - return nil + + s.publishEvent(bgCtx, SandboxStateEvent{ + Event: "sandbox.stopped", SandboxID: sandboxIDStr, HostID: hostIDStr, + Timestamp: time.Now().Unix(), + }) } // flushAndPersistMetrics calls FlushSandboxMetrics on the agent and stores