diff --git a/.gitignore b/.gitignore index 4be2db8..bca25e0 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ go.work.sum e2b/ .impeccable.md .gstack +.mcp.json ## Builds builds/ @@ -49,3 +50,5 @@ frontend/build/ internal/dashboard/static/* !internal/dashboard/static/.gitkeep.dual-graph/ .dual-graph/ +# Added by code-review-graph +.code-review-graph/ diff --git a/CLAUDE.md b/CLAUDE.md index 56fdbbc..d8f8e52 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -372,3 +372,42 @@ All values are CSS custom properties in `frontend/src/app.css`. 4. **Legible at speed.** Users scan dashboards in seconds. Strong typographic contrast (serif h1, mono IDs, sans body), consistent patterns, and predictable placement let users orientate instantly without reading everything. 5. **Craft signals trust.** For infrastructure that runs production code, the quality of the UI is a proxy for the quality of the product. Pixel-level decisions matter. Polish is not decoration — it's a trust signal. + + +## MCP Tools: code-review-graph + +**IMPORTANT: This project has a knowledge graph. ALWAYS use the +code-review-graph MCP tools BEFORE using Grep/Glob/Read to explore +the codebase.** The graph is faster, cheaper (fewer tokens), and gives +you structural context (callers, dependents, test coverage) that file +scanning cannot. + +### When to use graph tools FIRST + +- **Exploring code**: `semantic_search_nodes` or `query_graph` instead of Grep +- **Understanding impact**: `get_impact_radius` instead of manually tracing imports +- **Code review**: `detect_changes` + `get_review_context` instead of reading entire files +- **Finding relationships**: `query_graph` with callers_of/callees_of/imports_of/tests_for +- **Architecture questions**: `get_architecture_overview` + `list_communities` + +Fall back to Grep/Glob/Read **only** when the graph doesn't cover what you need. + +### Key Tools + +| Tool | Use when | +|------|----------| +| `detect_changes` | Reviewing code changes — gives risk-scored analysis | +| `get_review_context` | Need source snippets for review — token-efficient | +| `get_impact_radius` | Understanding blast radius of a change | +| `get_affected_flows` | Finding which execution paths are impacted | +| `query_graph` | Tracing callers, callees, imports, tests, dependencies | +| `semantic_search_nodes` | Finding functions/classes by name or keyword | +| `get_architecture_overview` | Understanding high-level codebase structure | +| `refactor_tool` | Planning renames, finding dead code | + +### Workflow + +1. The graph auto-updates on file changes (via hooks). +2. Use `detect_changes` for code review. +3. Use `get_affected_flows` to understand impact. +4. Use `query_graph` pattern="tests_for" to check coverage. diff --git a/VERSION_AGENT b/VERSION_AGENT index 17e51c3..d917d3e 100644 --- a/VERSION_AGENT +++ b/VERSION_AGENT @@ -1 +1 @@ -0.1.1 +0.1.2 diff --git a/VERSION_CP b/VERSION_CP index 845639e..9faa1b7 100644 --- a/VERSION_CP +++ b/VERSION_CP @@ -1 +1 @@ -0.1.4 +0.1.5 diff --git a/envd/VERSION b/envd/VERSION index 17e51c3..d917d3e 100644 --- a/envd/VERSION +++ b/envd/VERSION @@ -1 +1 @@ -0.1.1 +0.1.2 diff --git a/envd/internal/api/conntracker.go b/envd/internal/api/conntracker.go index 054f920..cc3750e 100644 --- a/envd/internal/api/conntracker.go +++ b/envd/internal/api/conntracker.go @@ -51,7 +51,7 @@ func (t *ServerConnTracker) Track(conn net.Conn, state http.ConnState) { // (with keep-alives disabled, the connection closes), RestoreAfterSnapshot // will close any that survived into the snapshot as zombie TCP sockets. // -// GC cycles are handled by PortSubsystem.Stop() which runs before this. +// GC is handled by PostSnapshotPrepare after this returns. func (t *ServerConnTracker) PrepareForSnapshot() { t.mu.Lock() defer t.mu.Unlock() diff --git a/envd/internal/api/init.go b/envd/internal/api/init.go index 68a1b86..ac4f8eb 100644 --- a/envd/internal/api/init.go +++ b/envd/internal/api/init.go @@ -150,15 +150,17 @@ func (a *API) PostInit(w http.ResponseWriter, r *http.Request) { host.PollForMMDSOpts(ctx, a.mmdsChan, a.defaults.EnvVars) }() - // Close zombie connections from before the snapshot and re-enable - // keep-alives. On first boot this is a no-op (no zombie connections). + // Safety net: if the health check's postRestoreRecovery didn't run yet + // (e.g. PostInit arrived before the first health check), re-enable GC + // here. On first boot needsRestore is false so CAS is a no-op. + if a.needsRestore.CompareAndSwap(true, false) { + a.postRestoreRecovery() + } + // RestoreAfterSnapshot is idempotent (clears preSnapshot set), and + // Start is a no-op if already running. if a.connTracker != nil { a.connTracker.RestoreAfterSnapshot() } - - // Start the port scanner and forwarder if they were stopped by a - // pre-snapshot prepare call. Start is a no-op if already running, - // so this is safe on first boot and only takes effect after restore. if a.portSubsystem != nil { a.portSubsystem.Start(a.rootCtx) } diff --git a/envd/internal/api/snapshot.go b/envd/internal/api/snapshot.go index 6d13381..0e84dec 100644 --- a/envd/internal/api/snapshot.go +++ b/envd/internal/api/snapshot.go @@ -5,6 +5,8 @@ package api import ( "net/http" + "runtime" + "runtime/debug" ) // PostSnapshotPrepare quiesces continuous goroutines (port scanner, forwarder), @@ -13,6 +15,14 @@ import ( // TCP state after snapshot restore. Keep-alives are disabled so the current // request's connection also closes after the response. // +// To prevent Go page allocator corruption, GOMAXPROCS is set to 1 after the +// final GC. With a single P, all goroutines (including any that allocate +// between now and the VM freeze) run sequentially. This eliminates concurrent +// page allocator access, so even if the freeze lands mid-allocation, the +// in-flight operation completes atomically on restore before any GC reads +// the summary tree. GOMAXPROCS is restored on the first health check after +// restore (see postRestoreRecovery). +// // Called by the host agent as a best-effort signal before vm.Pause(). func (a *API) PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -27,6 +37,26 @@ func (a *API) PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) { a.logger.Info().Msg("snapshot/prepare: idle connections closed, keep-alives disabled") } + // Send the response before the GC so HTTP buffer allocations happen + // while GOMAXPROCS is still at its normal value. w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusNoContent) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + // Final GC pass after all major allocations (connection cleanup, + // response write) are complete. + runtime.GC() + runtime.GC() + debug.FreeOSMemory() + + // Reduce to a single P so any post-GC allocations (HTTP server + // connection teardown) run sequentially — no concurrent page allocator + // access that could leave the summary tree inconsistent if the VM + // freezes mid-update. + a.prevGOMAXPROCS = runtime.GOMAXPROCS(1) + + a.needsRestore.Store(true) + a.logger.Info().Msg("snapshot/prepare: GOMAXPROCS=1, ready for freeze") } diff --git a/envd/internal/api/store.go b/envd/internal/api/store.go index 5365604..ba4d445 100644 --- a/envd/internal/api/store.go +++ b/envd/internal/api/store.go @@ -7,7 +7,10 @@ import ( "context" "encoding/json" "net/http" + "runtime" + "runtime/debug" "sync" + "sync/atomic" "github.com/rs/zerolog" @@ -48,6 +51,12 @@ type API struct { rootCtx context.Context portSubsystem *publicport.PortSubsystem connTracker *ServerConnTracker + + // needsRestore is set by PostSnapshotPrepare and cleared on the first + // health check or PostInit after restore. While set, GOMAXPROCS is 1 + // to prevent concurrent page allocator access during the freeze window. + needsRestore atomic.Bool + prevGOMAXPROCS int // GOMAXPROCS value before PrepareSnapshot reduced it to 1 } func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool, rootCtx context.Context, portSubsystem *publicport.PortSubsystem, connTracker *ServerConnTracker, version string) *API { @@ -69,6 +78,14 @@ func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host. func (a *API) GetHealth(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() + // On the first health check after snapshot restore, re-enable GC and + // clean up stale state. By this point, any goroutine that was mid- + // allocation when the VM was frozen has completed, so the page allocator + // summary tree is consistent and safe for GC to read. + if a.needsRestore.CompareAndSwap(true, false) { + a.postRestoreRecovery() + } + a.logger.Trace().Msg("Health check") w.Header().Set("Cache-Control", "no-store") @@ -79,6 +96,35 @@ func (a *API) GetHealth(w http.ResponseWriter, r *http.Request) { }) } +// postRestoreRecovery restores GOMAXPROCS, runs a clean GC cycle, closes +// zombie TCP connections from before the snapshot, re-enables HTTP keep-alives, +// and restarts the port subsystem. Called exactly once per restore cycle, +// guarded by a CAS on needsRestore in both GetHealth and PostInit. +func (a *API) postRestoreRecovery() { + // Restore parallelism first — any goroutine that was mid-allocation + // when the VM froze has already completed by the time a health check + // or PostInit request is being served, so the page allocator summary + // tree is consistent and safe for a full GC. + prev := a.prevGOMAXPROCS + if prev > 0 { + runtime.GOMAXPROCS(prev) + } + runtime.GC() + runtime.GC() + debug.FreeOSMemory() + a.logger.Info().Msg("restore: GOMAXPROCS restored, GC complete") + + if a.connTracker != nil { + a.connTracker.RestoreAfterSnapshot() + a.logger.Info().Msg("restore: zombie connections closed, keep-alives re-enabled") + } + + if a.portSubsystem != nil { + a.portSubsystem.Start(a.rootCtx) + a.logger.Info().Msg("restore: port subsystem restarted") + } +} + func (a *API) GetMetrics(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() diff --git a/envd/internal/port/subsystem.go b/envd/internal/port/subsystem.go index 094b2c4..e70a2db 100644 --- a/envd/internal/port/subsystem.go +++ b/envd/internal/port/subsystem.go @@ -5,8 +5,6 @@ package port import ( "context" - "runtime" - "runtime/debug" "sync" "time" @@ -72,9 +70,12 @@ func (p *PortSubsystem) Start(parentCtx context.Context) { }() } -// Stop quiesces the scanner and forwarder goroutines and forces a GC cycle -// to put the Go runtime's page allocator in a consistent state before snapshot. +// Stop quiesces the scanner and forwarder goroutines. // Blocks until both goroutines have exited. Safe to call if already stopped. +// +// GC is NOT run here — it is deferred to PostSnapshotPrepare so that the +// GC happens after all allocations (connection cleanup, HTTP response) are +// complete, minimizing the window where page allocator corruption can occur. func (p *PortSubsystem) Stop() { p.mu.Lock() if !p.running { @@ -90,12 +91,6 @@ func (p *PortSubsystem) Stop() { cancelFn() wg.Wait() - - // Force two GC cycles to ensure all spans are swept and the page - // allocator summary tree is fully consistent before the VM is frozen. - runtime.GC() - runtime.GC() - debug.FreeOSMemory() } // Restart stops the subsystem (if running) and starts it again with a fresh diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 82dba06..371d95b 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -768,12 +768,17 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, return nil, fmt.Errorf("restore VM from snapshot: %w", err) } + // Start prefetching all guest memory pages in the background. + // This runs concurrently with envd startup and eliminates on-demand + // page fault latency for subsequent RPC calls. + uffdServer.Prefetch() + // Wait for envd to be ready. client := envdclient.New(slot.HostIP.String()) waitCtx, waitCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout) - defer waitCancel() if err := client.WaitUntilReady(waitCtx); err != nil { + waitCancel() warnErr("uffd server stop error", sandboxID, uffdServer.Stop()) source.Close() warnErr("vm destroy error", sandboxID, m.vm.Destroy(context.Background(), sandboxID)) @@ -784,9 +789,14 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, m.loops.Release(baseImagePath) return nil, fmt.Errorf("wait for envd: %w", err) } + waitCancel() - // Trigger envd to re-read MMDS and apply template defaults in a single call. - if err := client.PostInitWithDefaults(waitCtx, defaultUser, defaultEnv); err != nil { + // PostInit gets its own timeout — WaitUntilReady may have consumed most + // of EnvdTimeout, starving PostInit of time for RestoreAfterSnapshot. + initCtx, initCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout) + defer initCancel() + + if err := client.PostInitWithDefaults(initCtx, defaultUser, defaultEnv); err != nil { slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err) } @@ -1200,12 +1210,15 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team return nil, fmt.Errorf("restore VM from snapshot: %w", err) } + // Start prefetching all guest memory pages in the background. + uffdServer.Prefetch() + // Wait for envd. client := envdclient.New(slot.HostIP.String()) waitCtx, waitCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout) - defer waitCancel() if err := client.WaitUntilReady(waitCtx); err != nil { + waitCancel() warnErr("uffd server stop error", sandboxID, uffdServer.Stop()) source.Close() warnErr("vm destroy error", sandboxID, m.vm.Destroy(context.Background(), sandboxID)) @@ -1216,9 +1229,14 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team m.loops.Release(baseRootfs) return nil, fmt.Errorf("wait for envd: %w", err) } + waitCancel() - // Trigger envd to re-read MMDS so it picks up the new sandbox/template IDs. - if err := client.PostInit(waitCtx); err != nil { + // PostInit gets its own timeout — WaitUntilReady may have consumed most + // of EnvdTimeout, starving PostInit of time for RestoreAfterSnapshot. + initCtx, initCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout) + defer initCancel() + + if err := client.PostInit(initCtx); err != nil { slog.Warn("post-init failed after template restore, metadata files may be stale", "sandbox", sandboxID, "error", err) } diff --git a/internal/uffd/server.go b/internal/uffd/server.go index be53f53..b838cbc 100644 --- a/internal/uffd/server.go +++ b/internal/uffd/server.go @@ -57,6 +57,17 @@ type Server struct { // exitPipe signals the poll loop to stop. exitR *os.File exitW *os.File + + // Set by handle() after Firecracker connects; read by Prefetch() + // after waiting on readyCh (which establishes happens-before). + uffdFd fd + mapping *Mapping + + // Prefetch lifecycle: cancel stops the goroutine, prefetchDone is + // closed when it exits. Stop() drains prefetchDone before returning + // so the caller can safely close diff file handles. + prefetchCancel context.CancelFunc + prefetchDone chan struct{} } // NewServer creates a UFFD server that will listen on the given socket path @@ -113,10 +124,17 @@ func (s *Server) Ready() <-chan struct{} { } // Stop signals the UFFD poll loop to exit and waits for it to finish. +// Also cancels and waits for any running prefetch goroutine. func (s *Server) Stop() error { + if s.prefetchCancel != nil { + s.prefetchCancel() + } // Write a byte to the exit pipe to wake the poll loop. _, _ = s.exitW.Write([]byte{0}) <-s.doneCh + if s.prefetchDone != nil { + <-s.prefetchDone + } return s.doneErr } @@ -172,6 +190,10 @@ func (s *Server) handle(ctx context.Context) error { mapping := NewMapping(regions) + // Store for use by Prefetch(). + s.uffdFd = uffdFd + s.mapping = mapping + slog.Info("uffd handler connected", "regions", len(regions), "fd", int(uffdFd), @@ -294,6 +316,66 @@ func (s *Server) faultPage(ctx context.Context, uffdFd fd, addr uintptr, offset return nil } +// Prefetch proactively loads all guest memory pages in the background. +// It iterates over every page in every UFFD region and copies it from the +// diff file into guest memory via UFFDIO_COPY. Pages already loaded by +// on-demand faults return nil from faultPage (EEXIST handled internally). +// This eliminates the per-request latency caused by lazy page faulting +// after snapshot restore. +// +// The goroutine blocks on readyCh before reading the uffd fd and mapping +// fields (establishes happens-before with handle()). It uses an internal +// context independent of the caller's RPC context so it survives after the +// create/resume RPC returns. Stop() cancels and joins the goroutine. +func (s *Server) Prefetch() { + ctx, cancel := context.WithCancel(context.Background()) + s.prefetchCancel = cancel + s.prefetchDone = make(chan struct{}) + + go func() { + defer close(s.prefetchDone) + + // Wait for Firecracker to connect and send the uffd fd. + select { + case <-s.readyCh: + case <-ctx.Done(): + return + } + + uffdFd := s.uffdFd + mapping := s.mapping + if mapping == nil { + return + } + + var total, errored int + for _, region := range mapping.Regions { + pageSize := region.PageSize + if pageSize == 0 { + continue + } + for off := uintptr(0); off < region.Size; off += pageSize { + if ctx.Err() != nil { + slog.Debug("uffd prefetch cancelled", + "pages", total, "errors", errored) + return + } + + addr := region.BaseHostVirtAddr + off + memOffset := int64(off) + int64(region.Offset) + + if err := s.faultPage(ctx, uffdFd, addr, memOffset, pageSize); err != nil { + errored++ + } else { + total++ + } + } + } + slog.Info("uffd prefetch complete", + "pages", total, "errors", errored) + }() +} + // DiffFileSource serves pages from a snapshot's compact diff file using // the header's block mapping to resolve offsets. type DiffFileSource struct { diff --git a/pkg/cpserver/run.go b/pkg/cpserver/run.go index 58ef7f1..e49b4e2 100644 --- a/pkg/cpserver/run.go +++ b/pkg/cpserver/run.go @@ -256,8 +256,10 @@ func Run(opts ...Option) { proxyWrapper := api.NewSandboxProxyWrapper(srv.Handler(), queries, hostPool) httpServer := &http.Server{ - Addr: cfg.ListenAddr, - Handler: proxyWrapper, + Addr: cfg.ListenAddr, + Handler: proxyWrapper, + ReadHeaderTimeout: 10 * time.Second, + IdleTimeout: 620 * time.Second, // > typical LB/Cloudflare upstream timeout } // Graceful shutdown on signal.