1
0
forked from wrenn/wrenn
Reviewed-on: wrenn/wrenn#40
This commit is contained in:
2026-05-02 22:56:00 +00:00
parent 4fcc19e91f
commit f5a23c1fa0
173 changed files with 7421 additions and 20521 deletions

View File

@ -54,6 +54,8 @@ func agentErrToHTTP(err error) (int, string, string) {
return http.StatusConflict, "conflict", err.Error()
case connect.CodePermissionDenied:
return http.StatusForbidden, "forbidden", err.Error()
case connect.CodeUnavailable:
return http.StatusServiceUnavailable, "no_hosts_available", "no servers available — try again later"
case connect.CodeUnimplemented:
return http.StatusNotImplemented, "agent_error", err.Error()
default:
@ -108,6 +110,9 @@ func serviceErrToHTTP(err error) (int, string, string) {
return http.StatusForbidden, "forbidden", "forbidden"
case strings.Contains(msg, "invalid or expired"):
return http.StatusUnauthorized, "unauthorized", "invalid or expired credentials"
case strings.Contains(msg, "no online") && strings.Contains(msg, "hosts available"),
strings.Contains(msg, "no host has sufficient resources"):
return http.StatusServiceUnavailable, "no_hosts_available", "no servers available — try again later"
case strings.Contains(msg, "invalid"):
return http.StatusBadRequest, "invalid_request", "invalid request"
default:

View File

@ -19,10 +19,11 @@ import (
// Client wraps the Connect RPC client for envd's Process and Filesystem services.
type Client struct {
hostIP string
base string
healthURL string
httpClient *http.Client
hostIP string
base string
healthURL string
httpClient *http.Client
streamingClient *http.Client
process genconnect.ProcessClient
filesystem genconnect.FilesystemClient
@ -32,29 +33,44 @@ type Client struct {
func New(hostIP string) *Client {
base := baseURL(hostIP)
httpClient := newHTTPClient()
streamingClient := newStreamingHTTPClient()
return &Client{
hostIP: hostIP,
base: base,
healthURL: base + "/health",
httpClient: httpClient,
process: genconnect.NewProcessClient(httpClient, base),
filesystem: genconnect.NewFilesystemClient(httpClient, base),
hostIP: hostIP,
base: base,
healthURL: base + "/health",
httpClient: httpClient,
streamingClient: streamingClient,
process: genconnect.NewProcessClient(streamingClient, base),
filesystem: genconnect.NewFilesystemClient(httpClient, base),
}
}
// CloseIdleConnections closes idle connections on both the unary and streaming
// transports. Call this before taking a VM snapshot to remove stale TCP state
// from the guest.
func (c *Client) CloseIdleConnections() {
c.httpClient.CloseIdleConnections()
c.streamingClient.CloseIdleConnections()
}
// BaseURL returns the HTTP base URL for reaching envd.
func (c *Client) BaseURL() string {
return c.base
}
// HTTPClient returns the underlying http.Client used for envd requests.
// Use this instead of http.DefaultClient when making direct HTTP calls to envd
// (e.g. file streaming) to avoid sharing the global transport with proxy traffic.
// HTTPClient returns the http.Client with a 2-minute request timeout.
// Suitable for short-lived envd calls (health, init, snapshot/prepare).
func (c *Client) HTTPClient() *http.Client {
return c.httpClient
}
// StreamingHTTPClient returns the http.Client without a request timeout.
// Use for streaming file transfers or any request that may run indefinitely.
func (c *Client) StreamingHTTPClient() *http.Client {
return c.streamingClient
}
// ExecResult holds the output of a command execution.
type ExecResult struct {
Stdout []byte
@ -234,7 +250,7 @@ func (c *Client) WriteFile(ctx context.Context, path string, content []byte) err
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("write file %s: status %d: %s", path, resp.StatusCode, string(respBody))
}
@ -276,10 +292,9 @@ func (c *Client) ReadFile(ctx context.Context, path string) ([]byte, error) {
return data, nil
}
// PrepareSnapshot calls envd's POST /snapshot/prepare endpoint, which quiesces
// continuous goroutines (port scanner, forwarder) and forces a GC cycle before
// Firecracker takes a VM snapshot. This ensures the Go runtime's page allocator
// is in a consistent state when vCPUs are frozen.
// PrepareSnapshot calls envd's POST /snapshot/prepare endpoint, which stops
// the port scanner/forwarder and marks active connections for post-restore
// cleanup before Firecracker freezes vCPUs.
//
// Best-effort: the caller should log a warning on error but not abort the pause.
func (c *Client) PrepareSnapshot(ctx context.Context) error {

View File

@ -21,9 +21,27 @@ func baseURL(hostIP string) string {
// with envd RPC connections (PTY streams, exec, file ops).
func newHTTPClient() *http.Client {
return &http.Client{
Timeout: 2 * time.Minute,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
}
}
// newStreamingHTTPClient returns an http.Client without an overall timeout,
// for long-lived streaming RPCs (PTY, exec stream) that can run indefinitely.
func newStreamingHTTPClient() *http.Client {
return &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"time"
@ -46,20 +45,15 @@ func (c *Client) FetchVersion(ctx context.Context) (string, error) {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("health check returned %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil || len(body) == 0 {
return "", nil // envd may not support version reporting yet
}
var data struct {
Version string `json:"version"`
}
if err := json.Unmarshal(body, &data); err != nil {
return "", nil // non-JSON response, old envd
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return "", fmt.Errorf("decode version response: %w", err)
}
return data.Version, nil
@ -78,7 +72,7 @@ func (c *Client) healthCheck(ctx context.Context) error {
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("health check returned %d", resp.StatusCode)
}

View File

@ -109,18 +109,11 @@ func (s *Server) ResumeSandbox(
req *connect.Request[pb.ResumeSandboxRequest],
) (*connect.Response[pb.ResumeSandboxResponse], error) {
msg := req.Msg
sb, err := s.mgr.Resume(ctx, msg.SandboxId, int(msg.TimeoutSec), msg.KernelVersion)
sb, err := s.mgr.Resume(ctx, msg.SandboxId, int(msg.TimeoutSec), msg.KernelVersion, msg.DefaultUser, msg.DefaultEnv)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
// Apply template defaults (user, env vars) if provided.
if msg.DefaultUser != "" || len(msg.DefaultEnv) > 0 {
if err := s.mgr.SetDefaults(ctx, sb.ID, msg.DefaultUser, msg.DefaultEnv); err != nil {
slog.Warn("failed to set sandbox defaults on resume", "sandbox", sb.ID, "error", err)
}
}
return connect.NewResponse(&pb.ResumeSandboxResponse{
SandboxId: sb.ID,
Status: string(sb.Status),
@ -459,7 +452,7 @@ func (s *Server) WriteFileStream(
}
httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType())
resp, err := client.HTTPClient().Do(httpReq)
resp, err := client.StreamingHTTPClient().Do(httpReq)
if err != nil {
pw.CloseWithError(err)
<-errCh
@ -504,7 +497,7 @@ func (s *Server) ReadFileStream(
return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
}
resp, err := client.HTTPClient().Do(httpReq)
resp, err := client.StreamingHTTPClient().Do(httpReq)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err))
}

View File

@ -95,11 +95,9 @@ type snapshotParent struct {
}
// maxDiffGenerations caps how many incremental diff generations we chain
// before falling back to a Full snapshot to collapse the chain. Firecracker
// snapshot/restore of a Go process (envd) accumulates runtime memory state
// drift; empirically, ~10 diff-based cycles corrupt the Go page allocator.
// A Full snapshot resets the generation counter and produces a clean base,
// preventing the crash.
// before falling back to a Full snapshot to collapse the chain. Long diff
// chains increase restore latency and snapshot directory size; a periodic
// Full snapshot resets the counter and produces a clean base.
const maxDiffGenerations = 8
// buildMetadata constructs the metadata map with version information.
@ -382,14 +380,19 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
m.stopSampler(sb)
// Step 0: Drain in-flight proxy connections before freezing vCPUs.
// This prevents Go runtime corruption inside the guest caused by stale
// TCP state from connections that were alive when the VM was snapshotted.
// Stale TCP state from mid-flight connections causes issues on restore.
sb.connTracker.Drain(2 * time.Second)
slog.Debug("pause: proxy connections drained", "id", sandboxID)
// Step 0b: Signal envd to quiesce continuous goroutines (port scanner,
// forwarder) and run GC before freezing vCPUs. This prevents Go runtime
// page allocator corruption ("bad summary data") on snapshot restore.
// Step 0b: Close host-side idle connections to envd. Done before
// PrepareSnapshot so FIN packets propagate to the guest during the
// PrepareSnapshot window (no extra sleep needed).
sb.client.CloseIdleConnections()
slog.Debug("pause: envd client idle connections closed", "id", sandboxID)
// Step 0c: Signal envd to quiesce (stop port scanner/forwarder, mark
// connections for post-restore cleanup). The 3s timeout also gives time
// for the FINs from Step 0b to be processed by the guest kernel.
// Best-effort: a failure is logged but does not abort the pause.
func() {
prepCtx, prepCancel := context.WithTimeout(ctx, 3*time.Second)
@ -397,7 +400,7 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
if err := sb.client.PrepareSnapshot(prepCtx); err != nil {
slog.Warn("pause: pre-snapshot quiesce failed (best-effort)", "id", sandboxID, "error", err)
} else {
slog.Debug("pause: envd goroutines quiesced", "id", sandboxID)
slog.Debug("pause: envd quiesced", "id", sandboxID)
}
}()
@ -577,6 +580,7 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
// Record which base template this CoW was built against.
if err := snapshot.WriteMeta(pauseDir, "", &snapshot.RootfsMeta{
BaseTemplate: sb.baseImagePath,
TemplateID: uuid.UUID(sb.TemplateID).String(),
}); err != nil {
warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir))
// VM and dm-snapshot are already gone — clean up remaining resources.
@ -617,7 +621,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
// Resume restores a paused sandbox from its snapshot using UFFD for
// lazy memory loading. The sandbox gets a new network slot.
func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, kernelVersion string) (*models.Sandbox, error) {
// Optional defaultUser and defaultEnv are applied via a single PostInit
// call so that template defaults are set without an extra round-trip.
func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, kernelVersion string, defaultUser string, defaultEnv map[string]string) (*models.Sandbox, error) {
pauseDir := layout.PauseSnapshotDir(m.cfg.WrennDir, sandboxID)
if _, err := os.Stat(pauseDir); err != nil {
return nil, fmt.Errorf("no snapshot found for sandbox %s", sandboxID)
@ -731,6 +737,7 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
// Restore VM from snapshot.
vmCfg := vm.VMConfig{
SandboxID: sandboxID,
TemplateID: meta.TemplateID,
KernelPath: m.resolveKernelPath(kernelVersion),
RootfsPath: dmDev.DevicePath,
VCPUs: 1, // Placeholder; overridden by snapshot.
@ -756,12 +763,17 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
return nil, fmt.Errorf("restore VM from snapshot: %w", err)
}
// Start prefetching all guest memory pages in the background.
// This runs concurrently with envd startup and eliminates on-demand
// page fault latency for subsequent RPC calls.
uffdServer.Prefetch()
// Wait for envd to be ready.
client := envdclient.New(slot.HostIP.String())
waitCtx, waitCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout)
defer waitCancel()
if err := client.WaitUntilReady(waitCtx); err != nil {
waitCancel()
warnErr("uffd server stop error", sandboxID, uffdServer.Stop())
source.Close()
warnErr("vm destroy error", sandboxID, m.vm.Destroy(context.Background(), sandboxID))
@ -772,9 +784,14 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
m.loops.Release(baseImagePath)
return nil, fmt.Errorf("wait for envd: %w", err)
}
waitCancel()
// Trigger envd to re-read MMDS so it picks up the new sandbox/template IDs.
if err := client.PostInit(waitCtx); err != nil {
// PostInit gets its own timeout — WaitUntilReady may have consumed most
// of EnvdTimeout, starving PostInit of time for RestoreAfterSnapshot.
initCtx, initCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout)
defer initCancel()
if err := client.PostInitWithDefaults(initCtx, defaultUser, defaultEnv); err != nil {
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
}
@ -1188,12 +1205,15 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
return nil, fmt.Errorf("restore VM from snapshot: %w", err)
}
// Start prefetching all guest memory pages in the background.
uffdServer.Prefetch()
// Wait for envd.
client := envdclient.New(slot.HostIP.String())
waitCtx, waitCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout)
defer waitCancel()
if err := client.WaitUntilReady(waitCtx); err != nil {
waitCancel()
warnErr("uffd server stop error", sandboxID, uffdServer.Stop())
source.Close()
warnErr("vm destroy error", sandboxID, m.vm.Destroy(context.Background(), sandboxID))
@ -1204,9 +1224,14 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
m.loops.Release(baseRootfs)
return nil, fmt.Errorf("wait for envd: %w", err)
}
waitCancel()
// Trigger envd to re-read MMDS so it picks up the new sandbox/template IDs.
if err := client.PostInit(waitCtx); err != nil {
// PostInit gets its own timeout — WaitUntilReady may have consumed most
// of EnvdTimeout, starving PostInit of time for RestoreAfterSnapshot.
initCtx, initCancel := context.WithTimeout(ctx, m.cfg.EnvdTimeout)
defer initCancel()
if err := client.PostInit(initCtx); err != nil {
slog.Warn("post-init failed after template restore, metadata files may be stale", "sandbox", sandboxID, "error", err)
}

View File

@ -64,6 +64,7 @@ func MetaPath(baseDir, name string) string {
// RootfsMeta records which base template a CoW file was created against.
type RootfsMeta struct {
BaseTemplate string `json:"base_template"`
TemplateID string `json:"template_id,omitempty"`
}
// WriteMeta writes rootfs metadata to the snapshot directory.

View File

@ -57,6 +57,17 @@ type Server struct {
// exitPipe signals the poll loop to stop.
exitR *os.File
exitW *os.File
// Set by handle() after Firecracker connects; read by Prefetch()
// after waiting on readyCh (which establishes happens-before).
uffdFd fd
mapping *Mapping
// Prefetch lifecycle: cancel stops the goroutine, prefetchDone is
// closed when it exits. Stop() drains prefetchDone before returning
// so the caller can safely close diff file handles.
prefetchCancel context.CancelFunc
prefetchDone chan struct{}
}
// NewServer creates a UFFD server that will listen on the given socket path
@ -113,10 +124,17 @@ func (s *Server) Ready() <-chan struct{} {
}
// Stop signals the UFFD poll loop to exit and waits for it to finish.
// Also cancels and waits for any running prefetch goroutine.
func (s *Server) Stop() error {
if s.prefetchCancel != nil {
s.prefetchCancel()
}
// Write a byte to the exit pipe to wake the poll loop.
_, _ = s.exitW.Write([]byte{0})
<-s.doneCh
if s.prefetchDone != nil {
<-s.prefetchDone
}
return s.doneErr
}
@ -172,6 +190,10 @@ func (s *Server) handle(ctx context.Context) error {
mapping := NewMapping(regions)
// Store for use by Prefetch().
s.uffdFd = uffdFd
s.mapping = mapping
slog.Info("uffd handler connected",
"regions", len(regions),
"fd", int(uffdFd),
@ -294,6 +316,66 @@ func (s *Server) faultPage(ctx context.Context, uffdFd fd, addr uintptr, offset
return nil
}
// Prefetch proactively loads all guest memory pages in the background.
// It iterates over every page in every UFFD region and copies it from the
// diff file into guest memory via UFFDIO_COPY. Pages already loaded by
// on-demand faults return nil from faultPage (EEXIST handled internally).
// This eliminates the per-request latency caused by lazy page faulting
// after snapshot restore.
//
// The goroutine blocks on readyCh before reading the uffd fd and mapping
// fields (establishes happens-before with handle()). It uses an internal
// context independent of the caller's RPC context so it survives after the
// create/resume RPC returns. Stop() cancels and joins the goroutine.
func (s *Server) Prefetch() {
ctx, cancel := context.WithCancel(context.Background())
s.prefetchCancel = cancel
s.prefetchDone = make(chan struct{})
go func() {
defer close(s.prefetchDone)
// Wait for Firecracker to connect and send the uffd fd.
select {
case <-s.readyCh:
case <-ctx.Done():
return
}
uffdFd := s.uffdFd
mapping := s.mapping
if mapping == nil {
return
}
var total, errored int
for _, region := range mapping.Regions {
pageSize := region.PageSize
if pageSize == 0 {
continue
}
for off := uintptr(0); off < region.Size; off += pageSize {
if ctx.Err() != nil {
slog.Debug("uffd prefetch cancelled",
"pages", total, "errors", errored)
return
}
addr := region.BaseHostVirtAddr + off
memOffset := int64(off) + int64(region.Offset)
if err := s.faultPage(ctx, uffdFd, addr, memOffset, pageSize); err != nil {
errored++
} else {
total++
}
}
}
slog.Info("uffd prefetch complete",
"pages", total, "errors", errored)
}()
}
// DiffFileSource serves pages from a snapshot's compact diff file using
// the header's block mapping to resolve offsets.
type DiffFileSource struct {

View File

@ -90,7 +90,7 @@ func (c *VMConfig) applyDefaults() {
// kernelArgs builds the kernel command line for the VM.
func (c *VMConfig) kernelArgs() string {
// ip= format: <client-ip>::<gw-ip>:<netmask>:<hostname>:<iface>:<autoconf>
ipArg := fmt.Sprintf("ip=%s::%s:%s:sandbox:eth0:off",
ipArg := fmt.Sprintf("ip=%s::%s:%s:capsule:eth0:off",
c.GuestIP, c.GatewayIP, c.NetMask,
)