forked from wrenn/wrenn
fix: harden pause flow with connection isolation and UFFD event handling
Restructure pause to: block new operations (StatusPausing), drain proxy connections with 5s grace, force-close remaining via context cancellation, drop page cache, inflate balloon, then freeze vCPUs. Previously connections could arrive during the pause window and API operations weren't blocked. Handle UFFD_EVENT_REMOVE/UNMAP/REMAP/FORK gracefully instead of crashing the UFFD server. These events fire during balloon deflation on snapshot restore, killing the page fault handler and preventing VM boot. Also adds ConnTracker.ForceClose() with cancellable context propagated through the proxy handler, so lingering proxy connections are actively terminated rather than left dangling.
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
package sandbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -17,6 +18,20 @@ type ConnTracker struct {
|
||||
// goroutine to exit, preventing goroutine leaks on repeated pause failures.
|
||||
cancelMu sync.Mutex
|
||||
cancelDrain chan struct{}
|
||||
|
||||
// ctx is cancelled by ForceClose to abort all in-flight proxy requests.
|
||||
// Initialized lazily on first Acquire; replaced by Reset after a failed
|
||||
// pause so new connections get a fresh, non-cancelled context.
|
||||
ctxMu sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// ensureCtx lazily initializes the cancellable context.
|
||||
func (t *ConnTracker) ensureCtx() {
|
||||
if t.ctx == nil {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire registers one in-flight connection. Returns false if the tracker
|
||||
@ -35,6 +50,16 @@ func (t *ConnTracker) Acquire() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Context returns a context that is cancelled when ForceClose is called.
|
||||
// Proxy handlers should derive their request context from this so that
|
||||
// force-close during pause aborts in-flight proxied requests.
|
||||
func (t *ConnTracker) Context() context.Context {
|
||||
t.ctxMu.Lock()
|
||||
defer t.ctxMu.Unlock()
|
||||
t.ensureCtx()
|
||||
return t.ctx
|
||||
}
|
||||
|
||||
// Release marks one connection as complete. Must be called exactly once
|
||||
// per successful Acquire.
|
||||
func (t *ConnTracker) Release() {
|
||||
@ -65,9 +90,33 @@ func (t *ConnTracker) Drain(timeout time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// ForceClose cancels all in-flight proxy connections by cancelling the
|
||||
// shared context. Connections whose request context derives from Context()
|
||||
// will see their requests aborted, causing the proxy handler to return
|
||||
// and call Release(). Waits briefly for connections to actually release.
|
||||
func (t *ConnTracker) ForceClose() {
|
||||
t.ctxMu.Lock()
|
||||
if t.cancel != nil {
|
||||
t.cancel()
|
||||
}
|
||||
t.ctxMu.Unlock()
|
||||
|
||||
// Wait briefly for force-closed connections to call Release().
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
// Reset re-enables the tracker after a failed drain. This allows the
|
||||
// sandbox to accept proxy connections again if the pause operation fails
|
||||
// and the VM is resumed. It also cancels any lingering Drain goroutine.
|
||||
// and the VM is resumed. It also cancels any lingering Drain goroutine
|
||||
// and creates a fresh context for new connections.
|
||||
func (t *ConnTracker) Reset() {
|
||||
t.cancelMu.Lock()
|
||||
if t.cancelDrain != nil {
|
||||
@ -81,5 +130,10 @@ func (t *ConnTracker) Reset() {
|
||||
}
|
||||
t.cancelMu.Unlock()
|
||||
|
||||
// Replace the cancelled context with a fresh one.
|
||||
t.ctxMu.Lock()
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.ctxMu.Unlock()
|
||||
|
||||
t.draining.Store(false)
|
||||
}
|
||||
|
||||
@ -377,28 +377,36 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
|
||||
}
|
||||
|
||||
// Mark sandbox as pausing to block new exec/file/PTY operations.
|
||||
sb.Status = models.StatusPausing
|
||||
|
||||
// restoreRunning reverts state if any pre-freeze step fails.
|
||||
restoreRunning := func() {
|
||||
_ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0)
|
||||
sb.connTracker.Reset()
|
||||
sb.Status = models.StatusRunning
|
||||
}
|
||||
|
||||
// Stop the metrics sampler goroutine before tearing down any resources
|
||||
// it reads (dm device, Firecracker PID). Without this, the sampler
|
||||
// leaks on every successful pause.
|
||||
m.stopSampler(sb)
|
||||
|
||||
// Step 0: Drain in-flight proxy connections before freezing vCPUs.
|
||||
// Stale TCP state from mid-flight connections causes issues on restore.
|
||||
sb.connTracker.Drain(2 * time.Second)
|
||||
slog.Debug("pause: proxy connections drained", "id", sandboxID)
|
||||
// ── Step 1: Isolate from external traffic ─────────────────────────
|
||||
// Drain in-flight proxy connections (grace period for clean shutdown).
|
||||
sb.connTracker.Drain(5 * time.Second)
|
||||
// Force-close any connections that didn't finish during grace period.
|
||||
sb.connTracker.ForceClose()
|
||||
slog.Debug("pause: external connections closed", "id", sandboxID)
|
||||
|
||||
// Step 0b: Close host-side idle connections to envd. Done before
|
||||
// PrepareSnapshot so FIN packets propagate to the guest during the
|
||||
// PrepareSnapshot window (no extra sleep needed).
|
||||
// Close host-side idle connections to envd so FIN packets propagate
|
||||
// to the guest kernel before snapshot.
|
||||
sb.client.CloseIdleConnections()
|
||||
slog.Debug("pause: envd client idle connections closed", "id", sandboxID)
|
||||
|
||||
// Step 0c: Signal envd to quiesce (stop port scanner/forwarder, mark
|
||||
// connections for post-restore cleanup). Also drops page cache which
|
||||
// can take significant time on large-memory VMs (20GB+). The timeout
|
||||
// also gives time for the FINs from Step 0b to be processed by the
|
||||
// guest kernel.
|
||||
// Best-effort: a failure is logged but does not abort the pause.
|
||||
// ── Step 2: Drop page cache ──────────────────────────────────────
|
||||
// Signal envd to quiesce: drops page cache, stops port subsystem,
|
||||
// marks connections for post-restore cleanup. Page cache drop can
|
||||
// take significant time on large-memory VMs (20GB+).
|
||||
func() {
|
||||
prepCtx, prepCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer prepCancel()
|
||||
@ -409,11 +417,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
}
|
||||
}()
|
||||
|
||||
// Step 0d: Inflate balloon to reclaim free guest memory before snapshot.
|
||||
// Freed pages become zero from FC's perspective, so ProcessMemfile can
|
||||
// skip them → dramatically smaller memfile (e.g. 20GB → 1GB).
|
||||
// Best-effort: balloon may not be available (e.g. snapshot-restored VMs
|
||||
// from before balloon was configured).
|
||||
// ── Step 3: Inflate balloon to reclaim free guest memory ─────────
|
||||
// Freed pages become zero from FC's perspective, so ProcessMemfile
|
||||
// skips them → dramatically smaller memfile (e.g. 20GB → 1GB).
|
||||
func() {
|
||||
memUsed, err := readEnvdMemUsed(sb.client)
|
||||
if err != nil {
|
||||
@ -421,7 +427,6 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
return
|
||||
}
|
||||
usedMiB := int(memUsed / (1024 * 1024))
|
||||
// Leave 2x used memory + 128MB headroom for kernel/envd.
|
||||
keepMiB := max(usedMiB*2, 256) + 128
|
||||
inflateMiB := sb.MemoryMB - keepMiB
|
||||
if inflateMiB <= 0 {
|
||||
@ -434,18 +439,15 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
slog.Debug("pause: balloon inflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||
return
|
||||
}
|
||||
// Give guest kernel time to process balloon requests and release pages.
|
||||
time.Sleep(2 * time.Second)
|
||||
slog.Info("pause: balloon inflated", "id", sandboxID, "inflate_mib", inflateMiB, "guest_used_mib", usedMiB)
|
||||
}()
|
||||
|
||||
// ── Step 4: Freeze vCPUs ─────────────────────────────────────────
|
||||
pauseStart := time.Now()
|
||||
|
||||
// Step 1: Pause the VM (freeze vCPUs).
|
||||
if err := m.vm.Pause(ctx, sandboxID); err != nil {
|
||||
// Deflate balloon before returning so sandbox is usable.
|
||||
_ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0)
|
||||
sb.connTracker.Reset()
|
||||
restoreRunning()
|
||||
return fmt.Errorf("pause VM: %w", err)
|
||||
}
|
||||
slog.Debug("pause: VM paused", "id", sandboxID, "elapsed", time.Since(pauseStart))
|
||||
@ -460,13 +462,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
|
||||
// resumeOnError unpauses the VM so the sandbox stays usable when a
|
||||
// post-freeze step fails. If the resume itself fails, the sandbox is
|
||||
// frozen and unrecoverable — destroy it to avoid a zombie that reports
|
||||
// "running" but can't execute anything.
|
||||
// frozen and unrecoverable — destroy it to avoid a zombie.
|
||||
resumeOnError := func() {
|
||||
sb.connTracker.Reset()
|
||||
// Use a fresh context — the caller's ctx may already be cancelled
|
||||
// (e.g. CP-side ResponseHeaderTimeout fired), which would make the
|
||||
// resume fail immediately and destroy a perfectly resumable VM.
|
||||
// Use a fresh context — the caller's ctx may already be cancelled.
|
||||
resumeCtx, resumeCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer resumeCancel()
|
||||
if err := m.vm.Resume(resumeCtx, sandboxID); err != nil {
|
||||
@ -478,7 +476,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
if m.onDestroy != nil {
|
||||
m.onDestroy(sandboxID)
|
||||
}
|
||||
return
|
||||
}
|
||||
restoreRunning()
|
||||
}
|
||||
|
||||
// Step 2: Take VM state snapshot (snapfile + memfile).
|
||||
|
||||
Reference in New Issue
Block a user