diff --git a/internal/api/handlers_pty.go b/internal/api/handlers_pty.go index f23954d..6799ffa 100644 --- a/internal/api/handlers_pty.go +++ b/internal/api/handlers_pty.go @@ -418,6 +418,13 @@ func runPtyLoop( } }() + // When any pump cancels the context, close the websocket to unblock + // the reader goroutine stuck in ReadMessage. + go func() { + <-ctx.Done() + ws.conn.Close() + }() + wg.Wait() } diff --git a/internal/hostagent/proxy.go b/internal/hostagent/proxy.go index d7c875f..d95306f 100644 --- a/internal/hostagent/proxy.go +++ b/internal/hostagent/proxy.go @@ -135,6 +135,20 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer tracker.Release() + // Derive request context from the tracker's context so ForceClose() + // during pause aborts this proxied request. + trackerCtx := tracker.Context() + reqCtx, reqCancel := context.WithCancel(r.Context()) + defer reqCancel() + go func() { + select { + case <-trackerCtx.Done(): + reqCancel() + case <-reqCtx.Done(): + } + }() + r = r.WithContext(reqCtx) + proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum)) proxy.ServeHTTP(w, r) } diff --git a/internal/models/sandbox.go b/internal/models/sandbox.go index 8228679..ab79867 100644 --- a/internal/models/sandbox.go +++ b/internal/models/sandbox.go @@ -11,6 +11,7 @@ type SandboxStatus string const ( StatusPending SandboxStatus = "pending" StatusRunning SandboxStatus = "running" + StatusPausing SandboxStatus = "pausing" StatusPaused SandboxStatus = "paused" StatusStopped SandboxStatus = "stopped" StatusError SandboxStatus = "error" diff --git a/internal/sandbox/conntracker.go b/internal/sandbox/conntracker.go index b46a39f..4e7c839 100644 --- a/internal/sandbox/conntracker.go +++ b/internal/sandbox/conntracker.go @@ -1,6 +1,7 @@ package sandbox import ( + "context" "sync" "sync/atomic" "time" @@ -17,6 +18,20 @@ type ConnTracker struct { // goroutine to exit, preventing goroutine leaks on repeated pause failures. cancelMu sync.Mutex cancelDrain chan struct{} + + // ctx is cancelled by ForceClose to abort all in-flight proxy requests. + // Initialized lazily on first Acquire; replaced by Reset after a failed + // pause so new connections get a fresh, non-cancelled context. + ctxMu sync.Mutex + ctx context.Context + cancel context.CancelFunc +} + +// ensureCtx lazily initializes the cancellable context. +func (t *ConnTracker) ensureCtx() { + if t.ctx == nil { + t.ctx, t.cancel = context.WithCancel(context.Background()) + } } // Acquire registers one in-flight connection. Returns false if the tracker @@ -35,6 +50,16 @@ func (t *ConnTracker) Acquire() bool { return true } +// Context returns a context that is cancelled when ForceClose is called. +// Proxy handlers should derive their request context from this so that +// force-close during pause aborts in-flight proxied requests. +func (t *ConnTracker) Context() context.Context { + t.ctxMu.Lock() + defer t.ctxMu.Unlock() + t.ensureCtx() + return t.ctx +} + // Release marks one connection as complete. Must be called exactly once // per successful Acquire. func (t *ConnTracker) Release() { @@ -65,9 +90,33 @@ func (t *ConnTracker) Drain(timeout time.Duration) { } } +// ForceClose cancels all in-flight proxy connections by cancelling the +// shared context. Connections whose request context derives from Context() +// will see their requests aborted, causing the proxy handler to return +// and call Release(). Waits briefly for connections to actually release. +func (t *ConnTracker) ForceClose() { + t.ctxMu.Lock() + if t.cancel != nil { + t.cancel() + } + t.ctxMu.Unlock() + + // Wait briefly for force-closed connections to call Release(). + done := make(chan struct{}) + go func() { + t.wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + } +} + // Reset re-enables the tracker after a failed drain. This allows the // sandbox to accept proxy connections again if the pause operation fails -// and the VM is resumed. It also cancels any lingering Drain goroutine. +// and the VM is resumed. It also cancels any lingering Drain goroutine +// and creates a fresh context for new connections. func (t *ConnTracker) Reset() { t.cancelMu.Lock() if t.cancelDrain != nil { @@ -81,5 +130,10 @@ func (t *ConnTracker) Reset() { } t.cancelMu.Unlock() + // Replace the cancelled context with a fresh one. + t.ctxMu.Lock() + t.ctx, t.cancel = context.WithCancel(context.Background()) + t.ctxMu.Unlock() + t.draining.Store(false) } diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index f994b11..3589fb1 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -377,28 +377,36 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status) } + // Mark sandbox as pausing to block new exec/file/PTY operations. + sb.Status = models.StatusPausing + + // restoreRunning reverts state if any pre-freeze step fails. + restoreRunning := func() { + _ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0) + sb.connTracker.Reset() + sb.Status = models.StatusRunning + } + // Stop the metrics sampler goroutine before tearing down any resources // it reads (dm device, Firecracker PID). Without this, the sampler // leaks on every successful pause. m.stopSampler(sb) - // Step 0: Drain in-flight proxy connections before freezing vCPUs. - // Stale TCP state from mid-flight connections causes issues on restore. - sb.connTracker.Drain(2 * time.Second) - slog.Debug("pause: proxy connections drained", "id", sandboxID) + // ── Step 1: Isolate from external traffic ───────────────────────── + // Drain in-flight proxy connections (grace period for clean shutdown). + sb.connTracker.Drain(5 * time.Second) + // Force-close any connections that didn't finish during grace period. + sb.connTracker.ForceClose() + slog.Debug("pause: external connections closed", "id", sandboxID) - // Step 0b: Close host-side idle connections to envd. Done before - // PrepareSnapshot so FIN packets propagate to the guest during the - // PrepareSnapshot window (no extra sleep needed). + // Close host-side idle connections to envd so FIN packets propagate + // to the guest kernel before snapshot. sb.client.CloseIdleConnections() - slog.Debug("pause: envd client idle connections closed", "id", sandboxID) - // Step 0c: Signal envd to quiesce (stop port scanner/forwarder, mark - // connections for post-restore cleanup). Also drops page cache which - // can take significant time on large-memory VMs (20GB+). The timeout - // also gives time for the FINs from Step 0b to be processed by the - // guest kernel. - // Best-effort: a failure is logged but does not abort the pause. + // ── Step 2: Drop page cache ────────────────────────────────────── + // Signal envd to quiesce: drops page cache, stops port subsystem, + // marks connections for post-restore cleanup. Page cache drop can + // take significant time on large-memory VMs (20GB+). func() { prepCtx, prepCancel := context.WithTimeout(ctx, 30*time.Second) defer prepCancel() @@ -409,11 +417,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { } }() - // Step 0d: Inflate balloon to reclaim free guest memory before snapshot. - // Freed pages become zero from FC's perspective, so ProcessMemfile can - // skip them → dramatically smaller memfile (e.g. 20GB → 1GB). - // Best-effort: balloon may not be available (e.g. snapshot-restored VMs - // from before balloon was configured). + // ── Step 3: Inflate balloon to reclaim free guest memory ───────── + // Freed pages become zero from FC's perspective, so ProcessMemfile + // skips them → dramatically smaller memfile (e.g. 20GB → 1GB). func() { memUsed, err := readEnvdMemUsed(sb.client) if err != nil { @@ -421,7 +427,6 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { return } usedMiB := int(memUsed / (1024 * 1024)) - // Leave 2x used memory + 128MB headroom for kernel/envd. keepMiB := max(usedMiB*2, 256) + 128 inflateMiB := sb.MemoryMB - keepMiB if inflateMiB <= 0 { @@ -434,18 +439,15 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { slog.Debug("pause: balloon inflate failed (non-fatal)", "id", sandboxID, "error", err) return } - // Give guest kernel time to process balloon requests and release pages. time.Sleep(2 * time.Second) slog.Info("pause: balloon inflated", "id", sandboxID, "inflate_mib", inflateMiB, "guest_used_mib", usedMiB) }() + // ── Step 4: Freeze vCPUs ───────────────────────────────────────── pauseStart := time.Now() - // Step 1: Pause the VM (freeze vCPUs). if err := m.vm.Pause(ctx, sandboxID); err != nil { - // Deflate balloon before returning so sandbox is usable. - _ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0) - sb.connTracker.Reset() + restoreRunning() return fmt.Errorf("pause VM: %w", err) } slog.Debug("pause: VM paused", "id", sandboxID, "elapsed", time.Since(pauseStart)) @@ -460,13 +462,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { // resumeOnError unpauses the VM so the sandbox stays usable when a // post-freeze step fails. If the resume itself fails, the sandbox is - // frozen and unrecoverable — destroy it to avoid a zombie that reports - // "running" but can't execute anything. + // frozen and unrecoverable — destroy it to avoid a zombie. resumeOnError := func() { - sb.connTracker.Reset() - // Use a fresh context — the caller's ctx may already be cancelled - // (e.g. CP-side ResponseHeaderTimeout fired), which would make the - // resume fail immediately and destroy a perfectly resumable VM. + // Use a fresh context — the caller's ctx may already be cancelled. resumeCtx, resumeCancel := context.WithTimeout(context.Background(), 30*time.Second) defer resumeCancel() if err := m.vm.Resume(resumeCtx, sandboxID); err != nil { @@ -478,7 +476,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { if m.onDestroy != nil { m.onDestroy(sandboxID) } + return } + restoreRunning() } // Step 2: Take VM state snapshot (snapfile + memfile). diff --git a/internal/uffd/fd.go b/internal/uffd/fd.go index 492a520..8e0fba2 100644 --- a/internal/uffd/fd.go +++ b/internal/uffd/fd.go @@ -29,6 +29,10 @@ import ( const ( UFFD_EVENT_PAGEFAULT = C.UFFD_EVENT_PAGEFAULT + UFFD_EVENT_FORK = C.UFFD_EVENT_FORK + UFFD_EVENT_REMAP = C.UFFD_EVENT_REMAP + UFFD_EVENT_REMOVE = C.UFFD_EVENT_REMOVE + UFFD_EVENT_UNMAP = C.UFFD_EVENT_UNMAP UFFD_PAGEFAULT_FLAG_WRITE = C.UFFD_PAGEFAULT_FLAG_WRITE UFFDIO_COPY = C.UFFDIO_COPY UFFDIO_COPY_MODE_WP = C.UFFDIO_COPY_MODE_WP diff --git a/internal/uffd/server.go b/internal/uffd/server.go index b838cbc..d7fd8d0 100644 --- a/internal/uffd/server.go +++ b/internal/uffd/server.go @@ -253,8 +253,17 @@ func (s *Server) serve(ctx context.Context, uffdFd fd, mapping *Mapping) error { } msg := *(*uffdMsg)(unsafe.Pointer(&buf[0])) - if getMsgEvent(&msg) != UFFD_EVENT_PAGEFAULT { - return fmt.Errorf("unexpected uffd event type: %d", getMsgEvent(&msg)) + event := getMsgEvent(&msg) + + switch event { + case UFFD_EVENT_PAGEFAULT: + // Handled below. + case UFFD_EVENT_REMOVE, UFFD_EVENT_UNMAP, UFFD_EVENT_REMAP, UFFD_EVENT_FORK: + // Non-fatal lifecycle events from the guest kernel (e.g. balloon + // deflation, mmap/munmap). No action needed — continue polling. + continue + default: + return fmt.Errorf("unexpected uffd event type: %d", event) } arg := getMsgArg(&msg)