fix(sandbox): stop leaking paused sandboxes as stopped

Paused sandboxes were silently turning into 'stopped' rows in the CP
DB whenever the auto_paused callback failed delivery and the agent
later restarted. Snapshot dirs survived on disk but the agent had no
restore path, so HostMonitor's missing->stopped reconcile orphaned
them.

Agent
- Manager.RestorePausedSandboxes: scans WRENN_DIR/sandboxes/ on
  startup, picks newest-per-slot, reserves the slot, registers a
  Paused entry in m.boxes. Losers and corrupt metas are trashed.
- Manager.Destroy: re-insert + bail if a racing Pause completed
  between m.boxes delete and lifecycleMu acquisition, so an
  in-flight user Pause's snapshot is not wiped by a concurrent
  Destroy or by Shutdown's destroy loop. Distinguishes from
  legitimate destroy-of-paused via statusAtEntry.
- Manager.Shutdown: emit sandbox.stopped (SendAsync) after each
  non-paused Destroy so the CP flips DB out of running/error.
- ErrDraining + atomic.Bool guard: Create/Resume bounce with
  CodeUnavailable once Shutdown begins, preventing new lifecycle
  work from racing the destroy loop.
- resumeFromMeta: set sb.baseImagePath so cleanup's loops.Release
  pairs with the Acquire just performed (latent bug for any
  Resume->Destroy after the restore path lands).
- ListSandboxes: nil-guard HostIP so paused entries don't emit
  '<nil>' into proto.

Control plane
- HostMonitor: new reconcile block. When the agent reports a
  sandbox as paused but the DB row is stopped (legacy zombie),
  issue DestroySandbox to release the on-disk snapshot and slot.
  Gated on len(paused) > 0 to avoid unbounded stopped-row query.

Host agent main
- Call RestorePausedSandboxes after SetEventSender, before
  StartTTLReaper and before HTTP serve.
- Keep mgr.Shutdown BEFORE httpServer.Shutdown so main does not
  exit while pause work is still draining.
This commit is contained in:
2026-05-20 05:26:31 +06:00
parent 0e8ba30c7a
commit 76587e17a2
6 changed files with 351 additions and 1 deletions

View File

@ -161,6 +161,14 @@ func main() {
cb := hostagent.NewCallbackSender(cpURL, credsFile, creds.HostID)
mgr.SetEventSender(hostagent.NewEventSender(cb))
// Restore paused sandboxes from disk so ListSandboxes reports them as
// 'paused' immediately. Without this, the CP's HostMonitor would mark
// every paused-on-disk sandbox 'stopped' via the missing→stopped
// reconcile path on the first ListSandboxes after agent restart.
// Must run before HTTP server starts serving (an early Create would
// race the slot reservation).
mgr.RestorePausedSandboxes()
mgr.StartTTLReaper(ctx)
// httpServer is declared here so the shutdown func can reference it.
@ -211,6 +219,13 @@ func main() {
// process alive indefinitely — a second signal force-exits anyway.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer shutdownCancel()
// Order matters: mgr.Shutdown FIRST so it runs to completion
// before httpServer.Shutdown unblocks main's Serve and lets the
// process exit. mgr.Shutdown internally flips a draining flag
// that rejects new Create/Resume RPCs with Unavailable so any
// in-flight HTTP handlers can't add sandboxes after PauseAll
// snapshotted state. User-initiated Pauses already running are
// awaited by PauseAll/Destroy's lifecycleMu serialization.
mgr.Shutdown(shutdownCtx)
sandbox.ShrinkMinimalImage(rootDir)
if err := httpServer.Shutdown(shutdownCtx); err != nil {

View File

@ -296,6 +296,54 @@ func (m *HostMonitor) checkHost(ctx context.Context, host db.Host) {
}
}
// --- Reconcile DB-stopped + agent-paused zombies ---
// A sandbox the agent reports as 'paused' but DB has as 'stopped' is an
// orphan from a previous bug where a successful pause's auto_paused
// callback was lost (e.g. CP unreachable during agent shutdown). With the
// agent-side fix (RestorePausedSandboxes), the snapshot survives across
// agent restarts and surfaces here. Authoritative direction: DB wins
// (user already saw 'stopped' and may have stopped tracking it).
// Issue Destroy so the on-disk snapshot dir is removed and the agent's
// slot reservation released.
//
// Gate: only run the DB query if the agent reports at least one paused
// sandbox. Otherwise we'd fetch every historically-stopped sandbox on
// this host every monitor tick — unbounded growth over a host's lifetime.
hasPaused := false
for _, status := range aliveStatus {
if status == "paused" {
hasPaused = true
break
}
}
if hasPaused {
stoppedSandboxes, err := m.db.ListSandboxesByHostAndStatus(ctx, db.ListSandboxesByHostAndStatusParams{
HostID: host.ID,
Column2: []string{"stopped"},
})
if err != nil {
slog.Warn("host monitor: failed to list stopped sandboxes", "host_id", id.FormatHostID(host.ID), "error", err)
} else {
for _, sb := range stoppedSandboxes {
sbIDStr := id.FormatSandboxID(sb.ID)
status, ok := aliveStatus[sbIDStr]
if !ok || status != "paused" {
continue
}
slog.Info("host monitor: destroying DB-stopped agent-paused zombie",
"host_id", id.FormatHostID(host.ID), "sandbox_id", sbIDStr)
if _, err := agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{
SandboxId: sbIDStr,
})); err != nil && connect.CodeOf(err) != connect.CodeNotFound {
slog.Warn("host monitor: zombie destroy failed",
"sandbox_id", sbIDStr, "error", err)
continue
}
m.audit.LogSandboxDestroySystem(ctx, sb.TeamID, sb.ID, "paused_zombie_cleanup", nil)
}
}
}
// --- Reconcile transient statuses (starting, resuming, pausing, stopping) ---
// These represent in-flight operations. If the sandbox is no longer alive on
// the host, infer the final state based on the transient status.

View File

@ -80,6 +80,9 @@ func (s *Server) CreateSandbox(
int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec), int(msg.DiskSizeMb),
msg.DefaultUser, msg.DefaultEnv)
if err != nil {
if errors.Is(err, sandbox.ErrDraining) {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create sandbox: %w", err))
}
@ -185,6 +188,8 @@ func mapSandboxError(err error) error {
return connect.NewError(connect.CodeNotFound, err)
case errors.Is(err, sandbox.ErrNotRunning), errors.Is(err, sandbox.ErrNotPaused):
return connect.NewError(connect.CodeFailedPrecondition, err)
case errors.Is(err, sandbox.ErrDraining):
return connect.NewError(connect.CodeUnavailable, err)
case errors.Is(err, sandbox.ErrInvalidRange):
return connect.NewError(connect.CodeInvalidArgument, err)
default:
@ -570,6 +575,14 @@ func (s *Server) ListSandboxes(
infos := make([]*pb.SandboxInfo, len(sandboxes))
for i, sb := range sandboxes {
// Paused / restored-paused sandboxes have no active network slot, so
// HostIP is nil — net.IP(nil).String() returns the literal "<nil>"
// which would leak into DB host_ip columns and SDK responses. Emit
// empty string instead.
hostIP := ""
if sb.HostIP != nil {
hostIP = sb.HostIP.String()
}
infos[i] = &pb.SandboxInfo{
SandboxId: sb.ID,
Status: string(sb.Status),
@ -577,7 +590,7 @@ func (s *Server) ListSandboxes(
TemplateId: uuid.UUID(sb.TemplateID).String(),
Vcpus: int32(sb.VCPUs),
MemoryMb: int32(sb.MemoryMB),
HostIp: sb.HostIP.String(),
HostIp: hostIP,
CreatedAtUnix: sb.CreatedAt.Unix(),
LastActiveAtUnix: sb.LastActiveAt.Unix(),
TimeoutSec: int32(sb.TimeoutSec),

View File

@ -110,6 +110,12 @@ type EventSender interface {
Send(ctx context.Context, event LifecycleEvent) error
}
// ErrDraining is returned by Create / Resume when the manager has begun
// shutdown. The agent process is about to pause every running sandbox and
// exit; admitting new lifecycle work would race the destroy loop and leave
// orphaned VMs after the process is gone.
var ErrDraining = errors.New("agent is draining for shutdown")
// Manager orchestrates sandbox lifecycle: VM, network, filesystem, envd.
type Manager struct {
cfg Config
@ -119,6 +125,10 @@ type Manager struct {
mu sync.RWMutex
boxes map[string]*sandboxState
stopCh chan struct{}
// draining is set at the start of Shutdown. Create and Resume check it
// (atomically, no lock needed) and refuse new work so the destroy loop
// can run to completion without racing fresh RPCs.
draining atomic.Bool
// creates tracks in-flight Create calls by sandbox ID. An entry exists
// only while Create is acquiring resources / booting the VM, before the
@ -232,6 +242,9 @@ func (m *Manager) Create(
defaultUser string,
defaultEnv map[string]string,
) (*models.Sandbox, error) {
if m.draining.Load() {
return nil, ErrDraining
}
if sandboxID == "" {
sandboxID = id.FormatSandboxID(id.NewSandboxID())
}
@ -457,7 +470,14 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
m.mu.Lock()
}
sb, ok := m.boxes[sandboxID]
// statusAtEntry distinguishes "user is destroying an already-paused
// sandbox" (legitimate cleanup → fall through) from "user is destroying
// a running sandbox that raced to Paused before we got lifecycleMu"
// (preserve snapshot → re-insert and bail). Captured under m.mu so it
// reflects the same generation as the boxes-map delete.
var statusAtEntry models.SandboxStatus
if ok {
statusAtEntry = sb.Status
delete(m.boxes, sandboxID)
}
m.mu.Unlock()
@ -466,6 +486,25 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
// Wait for any in-progress Pause to finish before tearing down resources.
sb.lifecycleMu.Lock()
defer sb.lifecycleMu.Unlock()
// Racing-Pause guard. Only fires when the sandbox was NOT paused at
// entry but became paused while we waited for lifecycleMu — i.e. a
// concurrent Pause completed under us. In that case the snapshot was
// just written to disk and destroying now would wipe a freshly-paused
// sandbox. Re-insert into m.boxes (releaseRuntime already cleared
// runtime refs; slot reservation retained for Resume) and return nil
// so the agent's view stays consistent with the on-disk state.
//
// A legitimate Destroy of an already-paused sandbox (statusAtEntry ==
// Paused) falls through to cleanup, which releases the slot and
// removes the snapshot dir — the user explicitly asked for deletion.
if statusAtEntry != models.StatusPaused && sb.Status == models.StatusPaused {
m.mu.Lock()
m.boxes[sandboxID] = sb
m.mu.Unlock()
slog.Info("destroy: racing pause completed, preserving snapshot", "id", sandboxID)
return nil
}
m.cleanup(ctx, sb)
}
@ -822,6 +861,10 @@ func (m *Manager) reapExpired(_ context.Context) {
// Starting/Resuming/Error) are destroyed to release VM / dm / loop / netns.
// Finally the shared loop registry is fully released.
func (m *Manager) Shutdown(ctx context.Context) {
// Flip draining BEFORE close(stopCh) so any Create/Resume already inside
// its handler-goroutine sees the flag on its next check. Subsequent RPC
// handlers that load the flag get ErrDraining and return immediately.
m.draining.Store(true)
close(m.stopCh)
// Cancel in-flight Create calls and wait for them to settle. A slow create
@ -866,6 +909,20 @@ func (m *Manager) Shutdown(ctx context.Context) {
slog.Info("shutdown: destroying sandbox", "id", sbID)
if err := m.Destroy(ctx, sbID); err != nil {
slog.Warn("shutdown destroy failed", "id", sbID, "error", err)
continue
}
// Notify CP so the DB row flips off running/pausing/error to stopped.
// Async: a sync Send with CP unreachable can burn ~31s per sandbox
// (3 × 10s HTTP timeout + backoff) and blow the 5min shutdown budget.
// Best-effort — if the agent process exits before the goroutine's
// HTTP request lands, HostMonitor's missing-confirmed-dead reconcile
// catches it after the next agent restart (it sees the sandbox in DB
// as 'running'/'missing' but not present in ListSandboxes → stopped).
if m.eventSender != nil {
m.eventSender.SendAsync(LifecycleEvent{
Event: "sandbox.stopped",
SandboxID: sbID,
})
}
}

View File

@ -423,6 +423,9 @@ func (m *Manager) releaseRuntime(sb *sandboxState, cow cowDisposition) {
// The remaining args (defaultUser, env, etc.) are forwarded to envd's /init
// so the resumed sandbox sees the same execution environment as before.
func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, defaultUser, _ string, envVars map[string]string) (*models.Sandbox, error) {
if m.draining.Load() {
return nil, ErrDraining
}
sb, err := m.get(sandboxID)
if err != nil {
return nil, err
@ -546,6 +549,12 @@ func (m *Manager) resumeFromMeta(ctx context.Context, sb *sandboxState, meta *sn
sb.client.Store(client)
sb.dmDevice = dmDev
sb.sandboxDirOverride = meta.SandboxDir
// baseImagePath pairs the loop refcount we just Acquire'd with the
// matching Release inside cleanup() / releaseRuntime(). For a sandbox
// rehydrated from RestorePausedSandboxes this is the first time
// baseImagePath is populated — the restored entry intentionally leaves
// it empty so a Destroy-before-Resume cannot underflow the registry.
sb.baseImagePath = meta.BaseTemplate
sb.connTracker.Reset()
sb.HostIP = slot.HostIP
sb.RootfsPath = dmDev.DevicePath

View File

@ -0,0 +1,208 @@
package sandbox
import (
"fmt"
"log/slog"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/google/uuid"
"git.omukk.dev/wrenn/wrenn/internal/layout"
"git.omukk.dev/wrenn/wrenn/internal/models"
)
// RestorePausedSandboxes scans WRENN_DIR/sandboxes/ for paused-sandbox
// snapshots left behind by a previous agent instance and re-registers them
// in m.boxes as StatusPaused. Without this, ListSandboxes would not report
// these sandboxes, and the CP's HostMonitor would mark them stopped via
// the missing-confirmed-dead reconcile path — orphaning the on-disk
// snapshot dir and surfacing a leaked "stopped" sandbox to users.
//
// Restored sandboxes hold ONLY the slot reservation; VM / network / dm /
// loop refcount stay unowned until Resume rebuilds them. baseImagePath is
// deliberately NOT set on the in-memory entry so cleanup() does not call
// loops.Release on a loop that was never Acquire'd — the registry tolerates
// a Release of an unknown key, but a coincident-same-base running sandbox
// would have its refcount decremented incorrectly.
//
// Must be called once at agent startup, AFTER CleanupOrphanPauseDirs (so
// .staging-* / .trash-* dirs are gone) and BEFORE the HTTP server starts
// serving — otherwise an early Create RPC can race the slot reservation.
//
// Corrupt snapshot dirs (unparseable meta, missing slot index) are renamed
// to .trash-{ts}/ so a future CleanupOrphanPauseDirs sweeps them. Soft
// errors are logged; this function never returns an error — startup should
// not fail because a single sandbox is unrecoverable.
func (m *Manager) RestorePausedSandboxes() {
sandboxesDir := layout.SandboxesDir(m.cfg.WrennDir)
entries, err := os.ReadDir(sandboxesDir)
if err != nil {
// Directory does not exist yet — fresh install, nothing to restore.
return
}
type candidate struct {
sandboxID string
snapDir string
meta *snapshotMeta
teamID [16]byte
templID [16]byte
}
// Pass 1: parse every snapshot meta. Trash anything unreadable or
// missing the slot index — those are crash artefacts, not recoverable
// sandboxes.
candidates := make([]candidate, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
continue
}
name := e.Name()
// Skip CleanupOrphanPauseDirs's territory. If it ran before us
// these are already gone; if not, leave them alone.
if strings.Contains(name, ".staging-") || strings.Contains(name, ".trash-") {
continue
}
snapDir := layout.PauseSnapshotDir(m.cfg.WrennDir, name)
meta, err := readSnapshotMeta(snapDir)
if err != nil {
slog.Warn("restore: unreadable snapshot meta, trashing dir",
"id", name, "error", err)
trashCorruptDir(snapDir)
continue
}
if meta.SlotIndex == 0 {
slog.Warn("restore: snapshot has no slot_index, trashing dir", "id", name)
trashCorruptDir(snapDir)
continue
}
teamBytes, err := parsePlainUUID(meta.TeamID)
if err != nil {
slog.Warn("restore: bad team_id in snapshot meta", "id", name, "error", err)
trashCorruptDir(snapDir)
continue
}
templateBytes, err := parsePlainUUID(meta.TemplateID)
if err != nil {
slog.Warn("restore: bad template_id in snapshot meta", "id", name, "error", err)
trashCorruptDir(snapDir)
continue
}
candidates = append(candidates, candidate{
sandboxID: name,
snapDir: snapDir,
meta: meta,
teamID: teamBytes,
templID: templateBytes,
})
}
// Pass 2: bucket by slot index, pick the newest CreatedAt per slot.
// Multiple candidates per slot happen when older paused-sandbox dirs
// were left on disk by the pre-fix leak (DB row marked stopped but the
// snapshot was never cleaned). The newest is the most likely live one;
// older losers are trashed so CleanupOrphanPauseDirs sweeps them on
// the next startup.
bySlot := make(map[int][]candidate, len(candidates))
for _, c := range candidates {
bySlot[c.meta.SlotIndex] = append(bySlot[c.meta.SlotIndex], c)
}
restored := 0
pruned := 0
for slot, cands := range bySlot {
sort.Slice(cands, func(i, j int) bool {
return cands[i].meta.CreatedAt.After(cands[j].meta.CreatedAt)
})
// Trash every loser. The host_monitor's zombie-cleanup path catches
// the winner if its DB row says 'stopped' — but losers never enter
// m.boxes and would otherwise sit on disk indefinitely.
for _, stale := range cands[1:] {
slog.Info("restore: pruning older snapshot for same slot",
"id", stale.sandboxID, "slot", slot, "created", stale.meta.CreatedAt,
"winner", cands[0].sandboxID, "winner_created", cands[0].meta.CreatedAt)
trashCorruptDir(stale.snapDir)
pruned++
}
winner := cands[0]
if err := m.slots.Reserve(winner.meta.SlotIndex); err != nil {
// Reserve only fails if another candidate (different slot value
// in meta but same numeric index) already grabbed it, or if the
// allocator is corrupt. Either way the snapshot is unusable
// without a slot, so trash it.
slog.Warn("restore: slot reservation failed, trashing dir",
"id", winner.sandboxID, "slot", winner.meta.SlotIndex, "error", err)
trashCorruptDir(winner.snapDir)
pruned++
continue
}
sb := &sandboxState{
Sandbox: models.Sandbox{
ID: winner.sandboxID,
Status: models.StatusPaused,
TemplateTeamID: winner.teamID,
TemplateID: winner.templID,
VCPUs: winner.meta.VCPUs,
MemoryMB: winner.meta.MemoryMB,
TimeoutSec: winner.meta.TimeoutSec,
SlotIndex: winner.meta.SlotIndex,
CreatedAt: winner.meta.CreatedAt,
// LastActiveAt cosmetic only — TTL reaper ignores non-Running.
LastActiveAt: winner.meta.CreatedAt,
},
// connTracker must be non-nil: resumeFromMeta calls Reset() on it
// unconditionally during rehydration. A nil pointer would panic.
connTracker: &ConnTracker{},
// baseImagePath intentionally left empty — see function doc.
// sandboxDirOverride intentionally left empty — resumeFromMeta
// reads meta.SandboxDir from disk on the resume path.
}
m.mu.Lock()
m.boxes[winner.sandboxID] = sb
m.mu.Unlock()
restored++
slog.Info("restored paused sandbox", "id", winner.sandboxID,
"slot", winner.meta.SlotIndex, "vcpus", winner.meta.VCPUs, "memory_mb", winner.meta.MemoryMB)
}
if restored > 0 || pruned > 0 {
slog.Info("paused sandbox restore complete", "restored", restored, "pruned", pruned)
}
}
// parsePlainUUID turns a standard hyphenated UUID string (as produced by
// id.UUIDString) back into the 16-byte representation used by sandboxState.
func parsePlainUUID(s string) ([16]byte, error) {
if s == "" {
return [16]byte{}, fmt.Errorf("empty uuid string")
}
u, err := uuid.Parse(s)
if err != nil {
return [16]byte{}, err
}
return [16]byte(u), nil
}
// trashCorruptDir renames a corrupt snapshot directory aside so a future
// CleanupOrphanPauseDirs sweeps it. Best-effort: if rename fails we log
// and move on — leaving the directory in place is safe (restore will skip
// it again next startup) but unwanted.
func trashCorruptDir(dir string) {
parent := filepath.Dir(dir)
base := filepath.Base(dir)
trash := filepath.Join(parent, fmt.Sprintf("%s.trash-%d", base, time.Now().UnixNano()))
if err := os.Rename(dir, trash); err != nil {
slog.Warn("restore: failed to trash corrupt snapshot dir",
"src", dir, "dst", trash, "error", err)
}
}