From 74f85ce4e960e34007e5c5cd20379cc9fab8f3e3 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sun, 17 May 2026 02:11:48 +0600 Subject: [PATCH] refactor: polish control plane and host agent code - Decompose executeBuild (318 lines) into provisionBuildSandbox and finalizeBuild helpers for readability - Extract cleanupPauseFailure in sandbox manager to unify 3 inconsistent inline teardown paths (also fixes CoW file leak on rename failure) - Remove unused ctx parameter from startProcess/startProcessForRestore - Add missing MASQUERADE rollback entry in CreateNetwork for symmetry - Consolidate duplicate writeJSON for UTF-8/base64 exec response --- internal/api/handlers_exec.go | 20 +--- internal/network/setup.go | 3 + internal/sandbox/manager.go | 59 ++++------ internal/vm/manager.go | 4 +- internal/vm/process.go | 4 +- pkg/service/build.go | 204 ++++++++++++++++++---------------- 6 files changed, 146 insertions(+), 148 deletions(-) diff --git a/internal/api/handlers_exec.go b/internal/api/handlers_exec.go index 7a94388..28fd0e1 100644 --- a/internal/api/handlers_exec.go +++ b/internal/api/handlers_exec.go @@ -130,30 +130,22 @@ func (h *execHandler) Exec(w http.ResponseWriter, r *http.Request) { updateLastActive(h.db, sandboxID, sandboxIDStr) - // Use base64 encoding if output contains non-UTF-8 bytes. stdout := resp.Msg.Stdout stderr := resp.Msg.Stderr - encoding := "utf-8" + encoding := "utf-8" + stdoutStr, stderrStr := string(stdout), string(stderr) if !utf8.Valid(stdout) || !utf8.Valid(stderr) { encoding = "base64" - writeJSON(w, http.StatusOK, execResponse{ - SandboxID: sandboxIDStr, - Cmd: req.Cmd, - Stdout: base64.StdEncoding.EncodeToString(stdout), - Stderr: base64.StdEncoding.EncodeToString(stderr), - ExitCode: resp.Msg.ExitCode, - DurationMs: duration.Milliseconds(), - Encoding: encoding, - }) - return + stdoutStr = base64.StdEncoding.EncodeToString(stdout) + stderrStr = base64.StdEncoding.EncodeToString(stderr) } writeJSON(w, http.StatusOK, execResponse{ SandboxID: sandboxIDStr, Cmd: req.Cmd, - Stdout: string(stdout), - Stderr: string(stderr), + Stdout: stdoutStr, + Stderr: stderrStr, ExitCode: resp.Msg.ExitCode, DurationMs: duration.Milliseconds(), Encoding: encoding, diff --git a/internal/network/setup.go b/internal/network/setup.go index 23ab7e3..5edc0f1 100644 --- a/internal/network/setup.go +++ b/internal/network/setup.go @@ -430,6 +430,9 @@ func CreateNetwork(slot *Slot) error { rollback() return fmt.Errorf("add masquerade rule: %w", err) } + rollbacks = append(rollbacks, func() { + _ = iptablesHost("-t", "nat", "-D", "POSTROUTING", "-s", fmt.Sprintf("%s/32", slot.VpeerIP.String()), "-o", defaultIface, "-j", "MASQUERADE") + }) slog.Info("network created", "ns", slot.NamespaceID, diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 9698e64..df7750b 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -359,6 +359,25 @@ func (m *Manager) cleanup(ctx context.Context, sb *sandboxState) { } } +// cleanupPauseFailure is best-effort teardown when a pause operation fails +// after the VM has already been destroyed. It releases all resources and removes +// the sandbox from the in-memory map. +func (m *Manager) cleanupPauseFailure(sb *sandboxState, sandboxID string, pauseDir string) { + warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir)) + warnErr("network cleanup error during pause", sandboxID, network.RemoveNetwork(sb.slot)) + m.slots.Release(sb.SlotIndex) + if sb.dmDevice != nil { + warnErr("dm-snapshot remove error during pause", sandboxID, devicemapper.RemoveSnapshot(context.Background(), sb.dmDevice)) + os.Remove(sb.dmDevice.CowPath) + } + if sb.baseImagePath != "" { + m.loops.Release(sb.baseImagePath) + } + m.mu.Lock() + delete(m.boxes, sandboxID) + m.mu.Unlock() +} + // Pause takes a snapshot of a running sandbox, then destroys all resources. // The sandbox's snapshot files are stored at SnapshotsDir/{sandboxID}/. // After this call, the sandbox is no longer running but can be resumed. @@ -513,45 +532,21 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { slog.Warn("pause: failed to remove old snapshot dir", "id", sandboxID, "error", err) } if err := os.Rename(tmpPauseDir, pauseDir); err != nil { - warnErr("network cleanup error during pause", sandboxID, network.RemoveNetwork(sb.slot)) - m.slots.Release(sb.SlotIndex) - if sb.dmDevice != nil { - warnErr("dm-snapshot remove error during pause", sandboxID, devicemapper.RemoveSnapshot(context.Background(), sb.dmDevice)) - os.Remove(sb.dmDevice.CowPath) - } - if sb.baseImagePath != "" { - m.loops.Release(sb.baseImagePath) - } - m.mu.Lock() - delete(m.boxes, sandboxID) - m.mu.Unlock() + m.cleanupPauseFailure(sb, sandboxID, pauseDir) return fmt.Errorf("rename snapshot dir: %w", err) } // ── Step 7: Remove dm-snapshot and save CoW ────────────────────── if sb.dmDevice != nil { if err := devicemapper.RemoveSnapshot(ctx, sb.dmDevice); err != nil { - warnErr("network cleanup error during pause", sandboxID, network.RemoveNetwork(sb.slot)) - m.slots.Release(sb.SlotIndex) - warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir)) - m.mu.Lock() - delete(m.boxes, sandboxID) - m.mu.Unlock() + m.cleanupPauseFailure(sb, sandboxID, pauseDir) return fmt.Errorf("remove dm-snapshot: %w", err) } snapshotCow := snapshot.CowPath(pauseDir, "") if err := os.Rename(sb.dmDevice.CowPath, snapshotCow); err != nil { - warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir)) - warnErr("network cleanup error during pause", sandboxID, network.RemoveNetwork(sb.slot)) - m.slots.Release(sb.SlotIndex) os.Remove(sb.dmDevice.CowPath) - if sb.baseImagePath != "" { - m.loops.Release(sb.baseImagePath) - } - m.mu.Lock() - delete(m.boxes, sandboxID) - m.mu.Unlock() + m.cleanupPauseFailure(sb, sandboxID, pauseDir) return fmt.Errorf("move cow file: %w", err) } @@ -561,15 +556,7 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { VCPUs: sb.VCPUs, MemoryMB: sb.MemoryMB, }); err != nil { - warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir)) - warnErr("network cleanup error during pause", sandboxID, network.RemoveNetwork(sb.slot)) - m.slots.Release(sb.SlotIndex) - if sb.baseImagePath != "" { - m.loops.Release(sb.baseImagePath) - } - m.mu.Lock() - delete(m.boxes, sandboxID) - m.mu.Unlock() + m.cleanupPauseFailure(sb, sandboxID, pauseDir) return fmt.Errorf("write rootfs meta: %w", err) } } diff --git a/internal/vm/manager.go b/internal/vm/manager.go index 3233a49..019d83e 100644 --- a/internal/vm/manager.go +++ b/internal/vm/manager.go @@ -47,7 +47,7 @@ func (m *Manager) Create(ctx context.Context, cfg VMConfig) (*VM, error) { ) // Step 1: Launch the Cloud Hypervisor process. - proc, err := startProcess(ctx, &cfg) + proc, err := startProcess(&cfg) if err != nil { return nil, fmt.Errorf("start process: %w", err) } @@ -220,7 +220,7 @@ func (m *Manager) CreateFromSnapshot(ctx context.Context, cfg VMConfig, snapshot ) // Step 1: Launch bare CH process (no --restore). - proc, err := startProcessForRestore(ctx, &cfg) + proc, err := startProcessForRestore(&cfg) if err != nil { return nil, fmt.Errorf("start process: %w", err) } diff --git a/internal/vm/process.go b/internal/vm/process.go index da0d6e2..ed05f4b 100644 --- a/internal/vm/process.go +++ b/internal/vm/process.go @@ -30,7 +30,7 @@ type process struct { // 4. symlink kernel and rootfs into SandboxDir // 5. ip netns exec : enters the network namespace where TAP is configured // 6. exec cloud-hypervisor with the API socket path -func startProcess(ctx context.Context, cfg *VMConfig) (*process, error) { +func startProcess(cfg *VMConfig) (*process, error) { script := buildStartScript(cfg) return launchScript(script, cfg) } @@ -38,7 +38,7 @@ func startProcess(ctx context.Context, cfg *VMConfig) (*process, error) { // startProcessForRestore launches a bare Cloud Hypervisor process (no --restore). // The restore is performed via the API after the socket is ready, which allows // passing memory_restore_mode=OnDemand for UFFD lazy paging. -func startProcessForRestore(ctx context.Context, cfg *VMConfig) (*process, error) { +func startProcessForRestore(cfg *VMConfig) (*process, error) { script := buildRestoreScript(cfg) return launchScript(script, cfg) } diff --git a/pkg/service/build.go b/pkg/service/build.go index dee639c..5d37692 100644 --- a/pkg/service/build.go +++ b/pkg/service/build.go @@ -282,69 +282,11 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) { return } - // Pick a platform host and create a sandbox. - host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false, build.MemoryMb, 5120) + agent, sandboxIDStr, sandboxMetadata, err := s.provisionBuildSandbox(buildCtx, buildID, buildIDStr, build, log) if err != nil { - s.failBuild(buildCtx, buildID, fmt.Sprintf("no host available: %v", err)) return } - - agent, err := s.Pool.GetForHost(host) - if err != nil { - s.failBuild(buildCtx, buildID, fmt.Sprintf("agent client error: %v", err)) - return - } - - sandboxID := id.NewSandboxID() - sandboxIDStr := id.FormatSandboxID(sandboxID) - log = log.With("sandbox_id", sandboxIDStr, "host_id", id.FormatHostID(host.ID)) - - // Resolve the base template to UUIDs. "minimal" is the zero sentinel. - baseTeamID := id.PlatformTeamID - baseTemplateID := id.MinimalTemplateID - if build.BaseTemplate != "minimal" { - baseTmpl, err := s.DB.GetPlatformTemplateByName(buildCtx, build.BaseTemplate) - if err != nil { - s.failBuild(buildCtx, buildID, fmt.Sprintf("base template %q not found: %v", build.BaseTemplate, err)) - return - } - baseTeamID = baseTmpl.TeamID - baseTemplateID = baseTmpl.ID - } - - resp, err := agent.CreateSandbox(buildCtx, connect.NewRequest(&pb.CreateSandboxRequest{ - SandboxId: sandboxIDStr, - Template: build.BaseTemplate, - TeamId: id.UUIDString(baseTeamID), - TemplateId: id.UUIDString(baseTemplateID), - Vcpus: build.Vcpus, - MemoryMb: build.MemoryMb, - TimeoutSec: 0, // no auto-pause for builds - DiskSizeMb: 5120, // 5 GB for template builds - })) - if err != nil { - s.failBuild(buildCtx, buildID, fmt.Sprintf("create sandbox failed: %v", err)) - return - } - // Capture sandbox metadata (envd/kernel/vmm/agent versions). - sandboxMetadata := resp.Msg.Metadata - - // Record sandbox/host association. - _ = s.DB.UpdateBuildSandbox(buildCtx, db.UpdateBuildSandboxParams{ - ID: buildID, - SandboxID: sandboxID, - HostID: host.ID, - }) - - // Upload and extract build archive if provided. - archive := s.takeArchive(buildIDStr) - if len(archive) > 0 { - if err := s.uploadAndExtractArchive(buildCtx, agent, sandboxIDStr, archive, buildIDStr); err != nil { - s.destroySandbox(buildCtx, agent, sandboxIDStr) - s.failBuild(buildCtx, buildID, fmt.Sprintf("archive upload failed: %v", err)) - return - } - } + log = log.With("sandbox_id", sandboxIDStr) // Parse recipe steps. preBuildCmds and postBuildCmds are hardcoded and always // valid; panic on error is appropriate here since it would be a programmer mistake. @@ -435,81 +377,162 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) { } } - // Healthcheck or direct snapshot. + // Finalize: healthcheck/snapshot/flatten → persist template → mark success. + s.finalizeBuild(buildCtx, buildID, build, agent, sandboxIDStr, templateDefaultUser, templateDefaultEnv, sandboxMetadata, log) +} + +// provisionBuildSandbox picks a host, creates a sandbox, and uploads the build +// archive. On failure it calls failBuild and returns an error. +func (s *BuildService) provisionBuildSandbox( + ctx context.Context, + buildID pgtype.UUID, + buildIDStr string, + build db.TemplateBuild, + log *slog.Logger, +) (buildAgentClient, string, map[string]string, error) { + host, err := s.Scheduler.SelectHost(ctx, id.PlatformTeamID, false, build.MemoryMb, 5120) + if err != nil { + s.failBuild(ctx, buildID, fmt.Sprintf("no host available: %v", err)) + return nil, "", nil, err + } + + agent, err := s.Pool.GetForHost(host) + if err != nil { + s.failBuild(ctx, buildID, fmt.Sprintf("agent client error: %v", err)) + return nil, "", nil, err + } + + sandboxID := id.NewSandboxID() + sandboxIDStr := id.FormatSandboxID(sandboxID) + log.Info("provisioning build sandbox", "sandbox_id", sandboxIDStr, "host_id", id.FormatHostID(host.ID)) + + baseTeamID := id.PlatformTeamID + baseTemplateID := id.MinimalTemplateID + if build.BaseTemplate != "minimal" { + baseTmpl, err := s.DB.GetPlatformTemplateByName(ctx, build.BaseTemplate) + if err != nil { + s.failBuild(ctx, buildID, fmt.Sprintf("base template %q not found: %v", build.BaseTemplate, err)) + return nil, "", nil, err + } + baseTeamID = baseTmpl.TeamID + baseTemplateID = baseTmpl.ID + } + + resp, err := agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{ + SandboxId: sandboxIDStr, + Template: build.BaseTemplate, + TeamId: id.UUIDString(baseTeamID), + TemplateId: id.UUIDString(baseTemplateID), + Vcpus: build.Vcpus, + MemoryMb: build.MemoryMb, + TimeoutSec: 0, + DiskSizeMb: 5120, + })) + if err != nil { + s.failBuild(ctx, buildID, fmt.Sprintf("create sandbox failed: %v", err)) + return nil, "", nil, err + } + sandboxMetadata := resp.Msg.Metadata + + _ = s.DB.UpdateBuildSandbox(ctx, db.UpdateBuildSandboxParams{ + ID: buildID, + SandboxID: sandboxID, + HostID: host.ID, + }) + + archive := s.takeArchive(buildIDStr) + if len(archive) > 0 { + if err := s.uploadAndExtractArchive(ctx, agent, sandboxIDStr, archive, buildIDStr); err != nil { + s.destroySandbox(ctx, agent, sandboxIDStr) + s.failBuild(ctx, buildID, fmt.Sprintf("archive upload failed: %v", err)) + return nil, "", nil, err + } + } + + return agent, sandboxIDStr, sandboxMetadata, nil +} + +// finalizeBuild handles the healthcheck/snapshot/flatten step and persists the +// template record. Called after all recipe phases complete successfully. +func (s *BuildService) finalizeBuild( + ctx context.Context, + buildID pgtype.UUID, + build db.TemplateBuild, + agent buildAgentClient, + sandboxIDStr string, + defaultUser string, + defaultEnv map[string]string, + sandboxMetadata map[string]string, + log *slog.Logger, +) { var sizeBytes int64 if build.Healthcheck != "" { hc, err := recipe.ParseHealthcheck(build.Healthcheck) if err != nil { - s.destroySandbox(buildCtx, agent, sandboxIDStr) - s.failBuild(buildCtx, buildID, fmt.Sprintf("invalid healthcheck: %v", err)) + s.destroySandbox(ctx, agent, sandboxIDStr) + s.failBuild(ctx, buildID, fmt.Sprintf("invalid healthcheck: %v", err)) return } log.Info("running healthcheck", "cmd", hc.Cmd, "interval", hc.Interval, "timeout", hc.Timeout, "start_period", hc.StartPeriod, "retries", hc.Retries) - if err := s.waitForHealthcheck(buildCtx, agent, sandboxIDStr, hc, templateDefaultUser); err != nil { - s.destroySandbox(buildCtx, agent, sandboxIDStr) - if buildCtx.Err() != nil { + if err := s.waitForHealthcheck(ctx, agent, sandboxIDStr, hc, defaultUser); err != nil { + s.destroySandbox(ctx, agent, sandboxIDStr) + if ctx.Err() != nil { return } - s.failBuild(buildCtx, buildID, fmt.Sprintf("healthcheck failed: %v", err)) + s.failBuild(ctx, buildID, fmt.Sprintf("healthcheck failed: %v", err)) return } - // Healthcheck passed → full snapshot (with memory/CPU state). log.Info("healthcheck passed, creating snapshot") - snapResp, err := agent.CreateSnapshot(buildCtx, connect.NewRequest(&pb.CreateSnapshotRequest{ + snapResp, err := agent.CreateSnapshot(ctx, connect.NewRequest(&pb.CreateSnapshotRequest{ SandboxId: sandboxIDStr, Name: build.Name, TeamId: id.UUIDString(build.TeamID), TemplateId: id.UUIDString(build.TemplateID), })) if err != nil { - s.destroySandbox(buildCtx, agent, sandboxIDStr) - if buildCtx.Err() != nil { + s.destroySandbox(ctx, agent, sandboxIDStr) + if ctx.Err() != nil { return } - s.failBuild(buildCtx, buildID, fmt.Sprintf("create snapshot failed: %v", err)) + s.failBuild(ctx, buildID, fmt.Sprintf("create snapshot failed: %v", err)) return } sizeBytes = snapResp.Msg.SizeBytes } else { - // No healthcheck → image-only template (rootfs only). log.Info("no healthcheck, flattening rootfs") - flatResp, err := agent.FlattenRootfs(buildCtx, connect.NewRequest(&pb.FlattenRootfsRequest{ + flatResp, err := agent.FlattenRootfs(ctx, connect.NewRequest(&pb.FlattenRootfsRequest{ SandboxId: sandboxIDStr, Name: build.Name, TeamId: id.UUIDString(build.TeamID), TemplateId: id.UUIDString(build.TemplateID), })) if err != nil { - s.destroySandbox(buildCtx, agent, sandboxIDStr) - if buildCtx.Err() != nil { + s.destroySandbox(ctx, agent, sandboxIDStr) + if ctx.Err() != nil { return } - s.failBuild(buildCtx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err)) + s.failBuild(ctx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err)) return } sizeBytes = flatResp.Msg.SizeBytes } - // Insert into templates table as a global (platform) template. templateType := "base" if build.Healthcheck != "" { templateType = "snapshot" } - // Serialize env vars for DB storage. - defaultEnvJSON, err := json.Marshal(templateDefaultEnv) + defaultEnvJSON, err := json.Marshal(defaultEnv) if err != nil { defaultEnvJSON = []byte("{}") } - - // Serialize sandbox metadata for DB storage. metadataJSON, err := json.Marshal(sandboxMetadata) if err != nil || len(sandboxMetadata) == 0 { metadataJSON = []byte("{}") } - if _, err := s.DB.InsertTemplate(buildCtx, db.InsertTemplateParams{ + if _, err := s.DB.InsertTemplate(ctx, db.InsertTemplateParams{ ID: build.TemplateID, Name: build.Name, Type: templateType, @@ -517,28 +540,21 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) { MemoryMb: build.MemoryMb, SizeBytes: sizeBytes, TeamID: id.PlatformTeamID, - DefaultUser: templateDefaultUser, + DefaultUser: defaultUser, DefaultEnv: defaultEnvJSON, Metadata: metadataJSON, }); err != nil { log.Error("failed to insert template record", "error", err) - // Build succeeded on disk, just DB record failed — don't mark as failed. } - // Record defaults and metadata on the build record for inspection. - _ = s.DB.UpdateBuildDefaults(buildCtx, db.UpdateBuildDefaultsParams{ + _ = s.DB.UpdateBuildDefaults(ctx, db.UpdateBuildDefaultsParams{ ID: buildID, - DefaultUser: templateDefaultUser, + DefaultUser: defaultUser, DefaultEnv: defaultEnvJSON, Metadata: metadataJSON, }) - // For CreateSnapshot, the sandbox is already destroyed by the snapshot process. - // For FlattenRootfs, the sandbox is already destroyed by the flatten process. - // No additional destroy needed. - - // Mark build as success. - if _, err := s.DB.UpdateBuildStatus(buildCtx, db.UpdateBuildStatusParams{ + if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{ ID: buildID, Status: "success", }); err != nil { log.Error("failed to mark build as success", "error", err)