From 962860ba7476ed57fd57142698d16514f4317033 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Mon, 13 Apr 2026 05:21:10 +0600 Subject: [PATCH] Pre-pause snapshot signal to prevent Go runtime crash on restore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit envd crashes with "fatal error: bad summary data" after Firecracker snapshot/restore because the page allocator radix tree is inconsistent when vCPUs are frozen mid-allocation. The port scanner goroutine allocates heavily every second, making it the primary trigger. Add POST /snapshot/prepare to envd — the host agent calls it before vm.Pause to quiesce continuous goroutines and force GC. On restore, PostInit restarts the port subsystem via the existing /init endpoint. - New PortSubsystem abstraction with Start/Stop/Restart lifecycle - Context-based goroutine cancellation (replaces irreversible channel close) - Context-aware Signal to prevent scanner/forwarder deadlock - Fix forwarder goroutine leak (was spinning forever on closed channel) - Kill socat children on stop to prevent orphans across snapshots - Fix double cmd.Wait panic (exec.Command instead of CommandContext) --- envd/internal/api/api.gen.go | 54 +++++++++++--- envd/internal/api/auth.go | 2 + envd/internal/api/download_test.go | 14 ++-- envd/internal/api/init.go | 7 ++ envd/internal/api/init_test.go | 2 +- envd/internal/api/snapshot.go | 25 +++++++ envd/internal/api/store.go | 25 +++++-- envd/internal/port/forward.go | 28 ++++++- envd/internal/port/scan.go | 24 +++--- envd/internal/port/scanSubscriber.go | 22 ++++-- envd/internal/port/subsystem.go | 106 +++++++++++++++++++++++++++ envd/main.go | 19 ++--- envd/spec/envd.yaml | 8 ++ internal/envdclient/client.go | 26 +++++++ internal/sandbox/manager.go | 14 ++++ 15 files changed, 317 insertions(+), 59 deletions(-) create mode 100644 envd/internal/api/snapshot.go create mode 100644 envd/internal/port/subsystem.go diff --git a/envd/internal/api/api.gen.go b/envd/internal/api/api.gen.go index 512747b..257326d 100644 --- a/envd/internal/api/api.gen.go +++ b/envd/internal/api/api.gen.go @@ -1,6 +1,6 @@ // Package api provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.1 DO NOT EDIT. +// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.6.0 DO NOT EDIT. package api import ( @@ -23,6 +23,16 @@ const ( File EntryInfoType = "file" ) +// Valid indicates whether the value is a known member of the EntryInfoType enum. +func (e EntryInfoType) Valid() bool { + switch e { + case File: + return true + default: + return false + } +} + // EntryInfo defines model for EntryInfo. type EntryInfo struct { // Name Name of the file @@ -193,6 +203,9 @@ type ServerInterface interface { // Get the stats of the service // (GET /metrics) GetMetrics(w http.ResponseWriter, r *http.Request) + // Quiesce continuous goroutines before Firecracker snapshot + // (POST /snapshot/prepare) + PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) } // Unimplemented server implementation that returns http.StatusNotImplemented for each endpoint. @@ -235,6 +248,12 @@ func (_ Unimplemented) GetMetrics(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) } +// Quiesce continuous goroutines before Firecracker snapshot +// (POST /snapshot/prepare) +func (_ Unimplemented) PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) +} + // ServerInterfaceWrapper converts contexts to parameters. type ServerInterfaceWrapper struct { Handler ServerInterface @@ -280,7 +299,7 @@ func (siw *ServerInterfaceWrapper) GetFiles(w http.ResponseWriter, r *http.Reque // ------------- Optional query parameter "path" ------------- - err = runtime.BindQueryParameter("form", true, false, "path", r.URL.Query(), ¶ms.Path) + err = runtime.BindQueryParameterWithOptions("form", true, false, "path", r.URL.Query(), ¶ms.Path, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "path", Err: err}) return @@ -288,7 +307,7 @@ func (siw *ServerInterfaceWrapper) GetFiles(w http.ResponseWriter, r *http.Reque // ------------- Optional query parameter "username" ------------- - err = runtime.BindQueryParameter("form", true, false, "username", r.URL.Query(), ¶ms.Username) + err = runtime.BindQueryParameterWithOptions("form", true, false, "username", r.URL.Query(), ¶ms.Username, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "username", Err: err}) return @@ -296,7 +315,7 @@ func (siw *ServerInterfaceWrapper) GetFiles(w http.ResponseWriter, r *http.Reque // ------------- Optional query parameter "signature" ------------- - err = runtime.BindQueryParameter("form", true, false, "signature", r.URL.Query(), ¶ms.Signature) + err = runtime.BindQueryParameterWithOptions("form", true, false, "signature", r.URL.Query(), ¶ms.Signature, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "signature", Err: err}) return @@ -304,7 +323,7 @@ func (siw *ServerInterfaceWrapper) GetFiles(w http.ResponseWriter, r *http.Reque // ------------- Optional query parameter "signature_expiration" ------------- - err = runtime.BindQueryParameter("form", true, false, "signature_expiration", r.URL.Query(), ¶ms.SignatureExpiration) + err = runtime.BindQueryParameterWithOptions("form", true, false, "signature_expiration", r.URL.Query(), ¶ms.SignatureExpiration, runtime.BindQueryParameterOptions{Type: "integer", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "signature_expiration", Err: err}) return @@ -337,7 +356,7 @@ func (siw *ServerInterfaceWrapper) PostFiles(w http.ResponseWriter, r *http.Requ // ------------- Optional query parameter "path" ------------- - err = runtime.BindQueryParameter("form", true, false, "path", r.URL.Query(), ¶ms.Path) + err = runtime.BindQueryParameterWithOptions("form", true, false, "path", r.URL.Query(), ¶ms.Path, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "path", Err: err}) return @@ -345,7 +364,7 @@ func (siw *ServerInterfaceWrapper) PostFiles(w http.ResponseWriter, r *http.Requ // ------------- Optional query parameter "username" ------------- - err = runtime.BindQueryParameter("form", true, false, "username", r.URL.Query(), ¶ms.Username) + err = runtime.BindQueryParameterWithOptions("form", true, false, "username", r.URL.Query(), ¶ms.Username, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "username", Err: err}) return @@ -353,7 +372,7 @@ func (siw *ServerInterfaceWrapper) PostFiles(w http.ResponseWriter, r *http.Requ // ------------- Optional query parameter "signature" ------------- - err = runtime.BindQueryParameter("form", true, false, "signature", r.URL.Query(), ¶ms.Signature) + err = runtime.BindQueryParameterWithOptions("form", true, false, "signature", r.URL.Query(), ¶ms.Signature, runtime.BindQueryParameterOptions{Type: "string", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "signature", Err: err}) return @@ -361,7 +380,7 @@ func (siw *ServerInterfaceWrapper) PostFiles(w http.ResponseWriter, r *http.Requ // ------------- Optional query parameter "signature_expiration" ------------- - err = runtime.BindQueryParameter("form", true, false, "signature_expiration", r.URL.Query(), ¶ms.SignatureExpiration) + err = runtime.BindQueryParameterWithOptions("form", true, false, "signature_expiration", r.URL.Query(), ¶ms.SignatureExpiration, runtime.BindQueryParameterOptions{Type: "integer", Format: ""}) if err != nil { siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "signature_expiration", Err: err}) return @@ -432,6 +451,20 @@ func (siw *ServerInterfaceWrapper) GetMetrics(w http.ResponseWriter, r *http.Req handler.ServeHTTP(w, r) } +// PostSnapshotPrepare operation middleware +func (siw *ServerInterfaceWrapper) PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) { + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.PostSnapshotPrepare(w, r) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + type UnescapedCookieParamError struct { ParamName string Err error @@ -563,6 +596,9 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl r.Group(func(r chi.Router) { r.Get(options.BaseURL+"/metrics", wrapper.GetMetrics) }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/snapshot/prepare", wrapper.PostSnapshotPrepare) + }) return r } diff --git a/envd/internal/api/auth.go b/envd/internal/api/auth.go index b626f5a..fc6b97c 100644 --- a/envd/internal/api/auth.go +++ b/envd/internal/api/auth.go @@ -1,4 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk package api @@ -30,6 +31,7 @@ var authExcludedPaths = []string{ "GET/files", "POST/files", "POST/init", + "POST/snapshot/prepare", } func (a *API) WithAuthorization(handler http.Handler) http.Handler { diff --git a/envd/internal/api/download_test.go b/envd/internal/api/download_test.go index 235a613..d9ab79a 100644 --- a/envd/internal/api/download_test.go +++ b/envd/internal/api/download_test.go @@ -1,10 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk package api import ( "bytes" "compress/gzip" + "context" "io" "mime/multipart" "net/http" @@ -97,7 +99,7 @@ func TestGetFilesContentDisposition(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -146,7 +148,7 @@ func TestGetFilesContentDispositionWithNestedPath(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -189,7 +191,7 @@ func TestGetFiles_GzipEncoding_ExplicitIdentityOffWithRange(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -230,7 +232,7 @@ func TestGetFiles_GzipDownload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) req.Header.Set("Accept-Encoding", "gzip") @@ -295,7 +297,7 @@ func TestPostFiles_GzipUpload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) req.Header.Set("Content-Type", mpWriter.FormDataContentType()) @@ -355,7 +357,7 @@ func TestGzipUploadThenGzipDownload(t *testing.T) { EnvVars: utils.NewMap[string, string](), User: currentUser.Username, } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) uploadReq := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) uploadReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) diff --git a/envd/internal/api/init.go b/envd/internal/api/init.go index 301400c..3b2be4b 100644 --- a/envd/internal/api/init.go +++ b/envd/internal/api/init.go @@ -150,6 +150,13 @@ func (a *API) PostInit(w http.ResponseWriter, r *http.Request) { host.PollForMMDSOpts(ctx, a.mmdsChan, a.defaults.EnvVars) }() + // Start the port scanner and forwarder if they were stopped by a + // pre-snapshot prepare call. Start is a no-op if already running, + // so this is safe on first boot and only takes effect after restore. + if a.portSubsystem != nil { + a.portSubsystem.Start(a.rootCtx) + } + w.Header().Set("Cache-Control", "no-store") w.Header().Set("Content-Type", "") diff --git a/envd/internal/api/init_test.go b/envd/internal/api/init_test.go index f3db361..979adad 100644 --- a/envd/internal/api/init_test.go +++ b/envd/internal/api/init_test.go @@ -79,7 +79,7 @@ func newTestAPI(accessToken *SecureToken, mmdsClient MMDSClient) *API { defaults := &execcontext.Defaults{ EnvVars: utils.NewMap[string, string](), } - api := New(&logger, defaults, nil, false) + api := New(&logger, defaults, nil, false, context.Background(), nil) if accessToken != nil { api.accessToken.TakeFrom(accessToken) } diff --git a/envd/internal/api/snapshot.go b/envd/internal/api/snapshot.go new file mode 100644 index 0000000..d9e2edd --- /dev/null +++ b/envd/internal/api/snapshot.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk + +package api + +import ( + "net/http" +) + +// PostSnapshotPrepare quiesces continuous goroutines (port scanner, forwarder) +// and forces a GC cycle before Firecracker takes a VM snapshot. This ensures +// the Go runtime's page allocator is in a consistent state when vCPUs are frozen. +// +// Called by the host agent as a best-effort signal before vm.Pause(). +func (a *API) PostSnapshotPrepare(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + if a.portSubsystem != nil { + a.portSubsystem.Stop() + a.logger.Info().Msg("snapshot/prepare: port subsystem quiesced") + } + + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusNoContent) +} diff --git a/envd/internal/api/store.go b/envd/internal/api/store.go index 088222a..ddb726a 100644 --- a/envd/internal/api/store.go +++ b/envd/internal/api/store.go @@ -1,4 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk package api @@ -12,6 +13,7 @@ import ( "git.omukk.dev/wrenn/sandbox/envd/internal/execcontext" "git.omukk.dev/wrenn/sandbox/envd/internal/host" + publicport "git.omukk.dev/wrenn/sandbox/envd/internal/port" "git.omukk.dev/wrenn/sandbox/envd/internal/utils" ) @@ -39,17 +41,24 @@ type API struct { lastSetTime *utils.AtomicMax initLock sync.Mutex + + // rootCtx is the parent context from main(), used to restart + // long-lived goroutines after snapshot restore. + rootCtx context.Context + portSubsystem *publicport.PortSubsystem } -func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool) *API { +func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool, rootCtx context.Context, portSubsystem *publicport.PortSubsystem) *API { return &API{ - logger: l, - defaults: defaults, - mmdsChan: mmdsChan, - isNotFC: isNotFC, - mmdsClient: &DefaultMMDSClient{}, - lastSetTime: utils.NewAtomicMax(), - accessToken: &SecureToken{}, + logger: l, + defaults: defaults, + mmdsChan: mmdsChan, + isNotFC: isNotFC, + mmdsClient: &DefaultMMDSClient{}, + lastSetTime: utils.NewAtomicMax(), + accessToken: &SecureToken{}, + rootCtx: rootCtx, + portSubsystem: portSubsystem, } } diff --git a/envd/internal/port/forward.go b/envd/internal/port/forward.go index bf516ff..cc71a41 100644 --- a/envd/internal/port/forward.go +++ b/envd/internal/port/forward.go @@ -1,4 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk // portf (port forward) periodaically scans opened TCP ports on the 127.0.0.1 (or localhost) // and launches `socat` process for every such port in the background. @@ -80,8 +81,16 @@ func (f *Forwarder) StartForwarding(ctx context.Context) { } for { - // procs is an array of currently opened ports. - if procs, ok := <-f.scannerSubscriber.Messages; ok { + select { + case <-ctx.Done(): + f.stopAllForwarding() + return + case procs, ok := <-f.scannerSubscriber.Messages: + if !ok { + f.stopAllForwarding() + return + } + // Now we are going to refresh all ports that are being forwarded in the `ports` map. Maybe add new ones // and maybe remove some. @@ -133,11 +142,22 @@ func (f *Forwarder) StartForwarding(ctx context.Context) { } } -func (f *Forwarder) startPortForwarding(ctx context.Context, p *PortToForward) { +func (f *Forwarder) stopAllForwarding() { + for _, p := range f.ports { + f.stopPortForwarding(p) + } + f.ports = make(map[string]*PortToForward) +} + +func (f *Forwarder) startPortForwarding(_ context.Context, p *PortToForward) { // https://unix.stackexchange.com/questions/311492/redirect-application-listening-on-localhost-to-listening-on-external-interface // socat -d -d TCP4-LISTEN:4000,bind=169.254.0.21,fork TCP4:localhost:4000 // reuseaddr is used to fix the "Address already in use" error when restarting socat quickly. - cmd := exec.CommandContext(ctx, + // + // We use exec.Command (not CommandContext) because stopAllForwarding kills + // socat via SIGKILL to the process group. CommandContext would also call + // cmd.Wait() on context cancellation, racing with the wait goroutine below. + cmd := exec.Command( "socat", "-d", "-d", "-d", fmt.Sprintf("TCP4-LISTEN:%v,bind=%s,reuseaddr,fork", p.port, f.sourceIP.To4()), fmt.Sprintf("TCP%d:localhost:%v", p.family, p.port), diff --git a/envd/internal/port/scan.go b/envd/internal/port/scan.go index 2b15523..878b361 100644 --- a/envd/internal/port/scan.go +++ b/envd/internal/port/scan.go @@ -1,8 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk package port import ( + "context" "sync" "time" @@ -10,8 +12,7 @@ import ( ) type Scanner struct { - scanExit chan struct{} - period time.Duration + period time.Duration // Plain mutex-protected map instead of concurrent-map. The concurrent-map // library's Items() spawns goroutines and uses a WaitGroup internally, @@ -20,15 +21,10 @@ type Scanner struct { subs map[string]*ScannerSubscriber } -func (s *Scanner) Destroy() { - close(s.scanExit) -} - func NewScanner(period time.Duration) *Scanner { return &Scanner{ - period: period, - subs: make(map[string]*ScannerSubscriber), - scanExit: make(chan struct{}), + period: period, + subs: make(map[string]*ScannerSubscriber), } } @@ -51,7 +47,8 @@ func (s *Scanner) Unsubscribe(sub *ScannerSubscriber) { } // ScanAndBroadcast starts scanning open TCP ports and broadcasts every open port to all subscribers. -func (s *Scanner) ScanAndBroadcast() { +// It exits when ctx is cancelled. +func (s *Scanner) ScanAndBroadcast(ctx context.Context) { for { // Read directly from /proc/net/tcp and /proc/net/tcp6 instead of // using gopsutil's net.Connections(), which walks /proc/{pid}/fd @@ -60,15 +57,14 @@ func (s *Scanner) ScanAndBroadcast() { s.mu.RLock() for _, sub := range s.subs { - sub.Signal(conns) + sub.Signal(ctx, conns) } s.mu.RUnlock() select { - case <-s.scanExit: + case <-ctx.Done(): return - default: - time.Sleep(s.period) + case <-time.After(s.period): } } } diff --git a/envd/internal/port/scanSubscriber.go b/envd/internal/port/scanSubscriber.go index bad9908..312f8d2 100644 --- a/envd/internal/port/scanSubscriber.go +++ b/envd/internal/port/scanSubscriber.go @@ -1,8 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk package port import ( + "context" + "github.com/rs/zerolog" ) @@ -33,19 +36,26 @@ func (ss *ScannerSubscriber) Destroy() { close(ss.Messages) } -func (ss *ScannerSubscriber) Signal(conns []ConnStat) { - // Filter isn't specified. Accept everything. +// Signal sends the (filtered) connection list to the subscriber. It respects +// ctx cancellation so the scanner goroutine is never stuck waiting for a +// consumer that has already exited. +func (ss *ScannerSubscriber) Signal(ctx context.Context, conns []ConnStat) { + var payload []ConnStat + if ss.filter == nil { - ss.Messages <- conns + payload = conns } else { filtered := []ConnStat{} for i := range conns { - // We need to access the list directly otherwise there will be implicit memory aliasing - // If the filter matched a connection, we will send it to a channel. if ss.filter.Match(&conns[i]) { filtered = append(filtered, conns[i]) } } - ss.Messages <- filtered + payload = filtered + } + + select { + case ss.Messages <- payload: + case <-ctx.Done(): } } diff --git a/envd/internal/port/subsystem.go b/envd/internal/port/subsystem.go new file mode 100644 index 0000000..094b2c4 --- /dev/null +++ b/envd/internal/port/subsystem.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: Apache-2.0 +// Modifications by M/S Omukk + +package port + +import ( + "context" + "runtime" + "runtime/debug" + "sync" + "time" + + "github.com/rs/zerolog" + + "git.omukk.dev/wrenn/sandbox/envd/internal/services/cgroups" +) + +// PortSubsystem owns the port scanner and forwarder lifecycle. +// It supports stop/restart across Firecracker snapshot/restore cycles. +type PortSubsystem struct { + logger *zerolog.Logger + cgroupManager cgroups.Manager + period time.Duration + + mu sync.Mutex + cancel context.CancelFunc + wg *sync.WaitGroup // per-cycle WaitGroup; nil when not running + running bool +} + +// NewPortSubsystem creates a new PortSubsystem. Call Start() to begin scanning. +func NewPortSubsystem(logger *zerolog.Logger, cgroupManager cgroups.Manager, period time.Duration) *PortSubsystem { + return &PortSubsystem{ + logger: logger, + cgroupManager: cgroupManager, + period: period, + } +} + +// Start creates a fresh scanner and forwarder, launching their goroutines. +// Safe to call multiple times; does nothing if already running. +func (p *PortSubsystem) Start(parentCtx context.Context) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.running { + return + } + + ctx, cancel := context.WithCancel(parentCtx) + p.cancel = cancel + p.running = true + + // Allocate a fresh WaitGroup for this lifecycle so a concurrent Stop + // on the previous cycle's WaitGroup cannot interfere. + wg := &sync.WaitGroup{} + p.wg = wg + + scanner := NewScanner(p.period) + forwarder := NewForwarder(p.logger, scanner, p.cgroupManager) + + wg.Add(2) + + go func() { + defer wg.Done() + forwarder.StartForwarding(ctx) + }() + + go func() { + defer wg.Done() + scanner.ScanAndBroadcast(ctx) + }() +} + +// Stop quiesces the scanner and forwarder goroutines and forces a GC cycle +// to put the Go runtime's page allocator in a consistent state before snapshot. +// Blocks until both goroutines have exited. Safe to call if already stopped. +func (p *PortSubsystem) Stop() { + p.mu.Lock() + if !p.running { + p.mu.Unlock() + return + } + cancelFn := p.cancel + wg := p.wg + p.cancel = nil + p.wg = nil + p.running = false + p.mu.Unlock() + + cancelFn() + wg.Wait() + + // Force two GC cycles to ensure all spans are swept and the page + // allocator summary tree is fully consistent before the VM is frozen. + runtime.GC() + runtime.GC() + debug.FreeOSMemory() +} + +// Restart stops the subsystem (if running) and starts it again with a fresh +// scanner and forwarder. Used after snapshot restore via PostInit. +func (p *PortSubsystem) Restart(parentCtx context.Context) { + p.Stop() + p.Start(parentCtx) +} diff --git a/envd/main.go b/envd/main.go index 751788d..5fb1813 100644 --- a/envd/main.go +++ b/envd/main.go @@ -190,7 +190,14 @@ func main() { processLogger := l.With().Str("logger", "process").Logger() processService := processRpc.Handle(m, &processLogger, defaults, cgroupManager) - service := api.New(&envLogger, defaults, mmdsChan, isNotFC) + // Port scanner and forwarder are managed by PortSubsystem, which + // supports stop/restart across Firecracker snapshot/restore cycles. + portLogger := l.With().Str("logger", "port-forwarder").Logger() + portSubsystem := publicport.NewPortSubsystem(&portLogger, cgroupManager, portScannerInterval) + portSubsystem.Start(ctx) + defer portSubsystem.Stop() + + service := api.New(&envLogger, defaults, mmdsChan, isNotFC, ctx, portSubsystem) handler := api.HandlerFromMux(service, m) middleware := authn.NewMiddleware(permissions.AuthenticateUsername) @@ -229,16 +236,6 @@ func main() { } } - // Bind all open ports on 127.0.0.1 and localhost to the eth0 interface - portScanner := publicport.NewScanner(portScannerInterval) - defer portScanner.Destroy() - - portLogger := l.With().Str("logger", "port-forwarder").Logger() - portForwarder := publicport.NewForwarder(&portLogger, portScanner, cgroupManager) - go portForwarder.StartForwarding(ctx) - - go portScanner.ScanAndBroadcast() - err := s.ListenAndServe() if err != nil { log.Fatalf("error starting server: %v", err) diff --git a/envd/spec/envd.yaml b/envd/spec/envd.yaml index b86d563..5160ab7 100644 --- a/envd/spec/envd.yaml +++ b/envd/spec/envd.yaml @@ -1,4 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 +# Modifications by M/S Omukk openapi: 3.0.0 info: @@ -70,6 +71,13 @@ paths: "204": description: Env vars set, the time and metadata is synced with the host + /snapshot/prepare: + post: + summary: Quiesce continuous goroutines before Firecracker snapshot + responses: + "204": + description: Goroutines quiesced, safe to snapshot + /envs: get: summary: Get the environment variables diff --git a/internal/envdclient/client.go b/internal/envdclient/client.go index 278050e..03994b2 100644 --- a/internal/envdclient/client.go +++ b/internal/envdclient/client.go @@ -269,6 +269,32 @@ func (c *Client) ReadFile(ctx context.Context, path string) ([]byte, error) { return data, nil } +// PrepareSnapshot calls envd's POST /snapshot/prepare endpoint, which quiesces +// continuous goroutines (port scanner, forwarder) and forces a GC cycle before +// Firecracker takes a VM snapshot. This ensures the Go runtime's page allocator +// is in a consistent state when vCPUs are frozen. +// +// Best-effort: the caller should log a warning on error but not abort the pause. +func (c *Client) PrepareSnapshot(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.base+"/snapshot/prepare", nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("prepare snapshot: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("prepare snapshot: status %d: %s", resp.StatusCode, string(respBody)) + } + + return nil +} + // PostInit calls envd's POST /init endpoint, which triggers a re-read of // Firecracker MMDS metadata. This updates WRENN_SANDBOX_ID, WRENN_TEMPLATE_ID // env vars and the corresponding files under /run/wrenn/ inside the guest. diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index b792406..206b5fb 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -327,6 +327,20 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { sb.connTracker.Drain(2 * time.Second) slog.Debug("pause: proxy connections drained", "id", sandboxID) + // Step 0b: Signal envd to quiesce continuous goroutines (port scanner, + // forwarder) and run GC before freezing vCPUs. This prevents Go runtime + // page allocator corruption ("bad summary data") on snapshot restore. + // Best-effort: a failure is logged but does not abort the pause. + func() { + prepCtx, prepCancel := context.WithTimeout(ctx, 3*time.Second) + defer prepCancel() + if err := sb.client.PrepareSnapshot(prepCtx); err != nil { + slog.Warn("pause: pre-snapshot quiesce failed (best-effort)", "id", sandboxID, "error", err) + } else { + slog.Debug("pause: envd goroutines quiesced", "id", sandboxID) + } + }() + pauseStart := time.Now() // Step 1: Pause the VM (freeze vCPUs).