forked from wrenn/wrenn
Add skip_pre_post build option, cancel endpoint, and recipe package
- skip_pre_post flag on builds bypasses apt update/clean pre/post steps for
faster iteration when the recipe handles its own environment setup
- POST /v1/admin/builds/{id}/cancel endpoint marks an in-progress build as
cancelled; UpdateBuildStatus now also sets completed_at for 'cancelled'
- internal/recipe: typed recipe parser and executor (RUN/ENV/COPY steps)
replacing the raw string slice approach in the build worker
- pre/post build commands prefixed with RUN to match recipe step format
This commit is contained in:
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
@ -14,6 +15,7 @@ import (
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/recipe"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/scheduler"
|
||||
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
|
||||
)
|
||||
@ -27,14 +29,14 @@ const (
|
||||
|
||||
// preBuildCmds run before the user recipe to prepare the build environment.
|
||||
var preBuildCmds = []string{
|
||||
"apt update",
|
||||
"RUN apt update",
|
||||
}
|
||||
|
||||
// postBuildCmds run after the user recipe to clean up caches and reduce image size.
|
||||
var postBuildCmds = []string{
|
||||
"apt clean",
|
||||
"apt autoremove -y",
|
||||
"rm -rf /var/lib/apt/lists/*",
|
||||
"RUN apt clean",
|
||||
"RUN apt autoremove -y",
|
||||
"RUN rm -rf /var/lib/apt/lists/*",
|
||||
}
|
||||
|
||||
// buildAgentClient is the subset of the host agent client used by the build worker.
|
||||
@ -46,24 +48,15 @@ type buildAgentClient interface {
|
||||
FlattenRootfs(ctx context.Context, req *connect.Request[pb.FlattenRootfsRequest]) (*connect.Response[pb.FlattenRootfsResponse], error)
|
||||
}
|
||||
|
||||
// BuildLogEntry represents a single entry in the build log JSONB array.
|
||||
type BuildLogEntry struct {
|
||||
Step int `json:"step"`
|
||||
Phase string `json:"phase"` // "pre-build", "recipe", or "post-build"
|
||||
Cmd string `json:"cmd"`
|
||||
Stdout string `json:"stdout"`
|
||||
Stderr string `json:"stderr"`
|
||||
Exit int32 `json:"exit"`
|
||||
Ok bool `json:"ok"`
|
||||
Elapsed int64 `json:"elapsed_ms"`
|
||||
}
|
||||
|
||||
// BuildService handles template build orchestration.
|
||||
type BuildService struct {
|
||||
DB *db.Queries
|
||||
Redis *redis.Client
|
||||
Pool *lifecycle.HostClientPool
|
||||
Scheduler scheduler.HostScheduler
|
||||
|
||||
mu sync.Mutex
|
||||
cancelMap map[string]context.CancelFunc // buildID → per-build cancel func
|
||||
}
|
||||
|
||||
// BuildCreateParams holds the parameters for creating a template build.
|
||||
@ -74,6 +67,7 @@ type BuildCreateParams struct {
|
||||
Healthcheck string
|
||||
VCPUs int32
|
||||
MemoryMB int32
|
||||
SkipPrePost bool
|
||||
}
|
||||
|
||||
// Create inserts a new build record and enqueues it to Redis.
|
||||
@ -97,6 +91,11 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
buildIDStr := id.FormatBuildID(buildID)
|
||||
newTemplateID := id.NewTemplateID()
|
||||
|
||||
defaultSteps := len(preBuildCmds) + len(postBuildCmds)
|
||||
if p.SkipPrePost {
|
||||
defaultSteps = 0
|
||||
}
|
||||
|
||||
build, err := s.DB.InsertTemplateBuild(ctx, db.InsertTemplateBuildParams{
|
||||
ID: buildID,
|
||||
Name: p.Name,
|
||||
@ -105,9 +104,10 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
Healthcheck: p.Healthcheck,
|
||||
Vcpus: p.VCPUs,
|
||||
MemoryMb: p.MemoryMB,
|
||||
TotalSteps: int32(len(p.Recipe) + len(preBuildCmds) + len(postBuildCmds)),
|
||||
TotalSteps: int32(len(p.Recipe) + defaultSteps),
|
||||
TemplateID: newTemplateID,
|
||||
TeamID: id.PlatformTeamID,
|
||||
SkipPrePost: p.SkipPrePost,
|
||||
})
|
||||
if err != nil {
|
||||
return db.TemplateBuild{}, fmt.Errorf("insert build: %w", err)
|
||||
@ -131,6 +131,40 @@ func (s *BuildService) List(ctx context.Context) ([]db.TemplateBuild, error) {
|
||||
return s.DB.ListTemplateBuilds(ctx)
|
||||
}
|
||||
|
||||
// Cancel cancels a pending or running build. For pending builds the status is
|
||||
// updated in the DB and the worker skips it when dequeued. For running builds
|
||||
// the per-build context is cancelled, which causes the current exec step to
|
||||
// abort; executeBuild then detects the cancellation and records the status.
|
||||
func (s *BuildService) Cancel(ctx context.Context, buildID pgtype.UUID) error {
|
||||
build, err := s.DB.GetTemplateBuild(ctx, buildID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get build: %w", err)
|
||||
}
|
||||
switch build.Status {
|
||||
case "success", "failed", "cancelled":
|
||||
return fmt.Errorf("build is already %s", build.Status)
|
||||
}
|
||||
|
||||
// Mark cancelled in DB first. This handles both pending builds (which haven't
|
||||
// been picked up yet) and acts as a flag for executeBuild to check on start.
|
||||
if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{
|
||||
ID: buildID, Status: "cancelled",
|
||||
}); err != nil {
|
||||
return fmt.Errorf("update build status: %w", err)
|
||||
}
|
||||
|
||||
// If the build is currently running, signal its context.
|
||||
buildIDStr := id.FormatBuildID(buildID)
|
||||
s.mu.Lock()
|
||||
cancel, running := s.cancelMap[buildIDStr]
|
||||
s.mu.Unlock()
|
||||
if running {
|
||||
cancel()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartWorkers launches n goroutines that consume from the Redis build queue.
|
||||
// The returned cancel function stops all workers.
|
||||
func (s *BuildService) StartWorkers(ctx context.Context, n int) context.CancelFunc {
|
||||
@ -172,14 +206,38 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
return
|
||||
}
|
||||
|
||||
build, err := s.DB.GetTemplateBuild(ctx, buildID)
|
||||
// Create a per-build context so this build can be cancelled independently of
|
||||
// the worker. Register in cancelMap before fetching the build so that a
|
||||
// concurrent Cancel call can always find and signal it.
|
||||
buildCtx, buildCancel := context.WithCancel(ctx)
|
||||
defer buildCancel()
|
||||
|
||||
s.mu.Lock()
|
||||
if s.cancelMap == nil {
|
||||
s.cancelMap = make(map[string]context.CancelFunc)
|
||||
}
|
||||
s.cancelMap[buildIDStr] = buildCancel
|
||||
s.mu.Unlock()
|
||||
defer func() {
|
||||
s.mu.Lock()
|
||||
delete(s.cancelMap, buildIDStr)
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
build, err := s.DB.GetTemplateBuild(buildCtx, buildID)
|
||||
if err != nil {
|
||||
log.Error("failed to fetch build", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if already cancelled (Cancel was called before we dequeued).
|
||||
if build.Status == "cancelled" {
|
||||
log.Info("build already cancelled, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
// Mark as running.
|
||||
if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{
|
||||
if _, err := s.DB.UpdateBuildStatus(buildCtx, db.UpdateBuildStatusParams{
|
||||
ID: buildID, Status: "running",
|
||||
}); err != nil {
|
||||
log.Error("failed to update build status", "error", err)
|
||||
@ -187,22 +245,22 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
}
|
||||
|
||||
// Parse user recipe.
|
||||
var recipe []string
|
||||
if err := json.Unmarshal(build.Recipe, &recipe); err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("invalid recipe JSON: %v", err))
|
||||
var userRecipe []string
|
||||
if err := json.Unmarshal(build.Recipe, &userRecipe); err != nil {
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("invalid recipe JSON: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Pick a platform host and create a sandbox.
|
||||
host, err := s.Scheduler.SelectHost(ctx, id.PlatformTeamID, false)
|
||||
host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false)
|
||||
if err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("no host available: %v", err))
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("no host available: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := s.Pool.GetForHost(host)
|
||||
if err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("agent client error: %v", err))
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("agent client error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
@ -214,16 +272,16 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
baseTeamID := id.PlatformTeamID
|
||||
baseTemplateID := id.MinimalTemplateID
|
||||
if build.BaseTemplate != "minimal" {
|
||||
baseTmpl, err := s.DB.GetPlatformTemplateByName(ctx, build.BaseTemplate)
|
||||
baseTmpl, err := s.DB.GetPlatformTemplateByName(buildCtx, build.BaseTemplate)
|
||||
if err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("base template %q not found: %v", build.BaseTemplate, err))
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("base template %q not found: %v", build.BaseTemplate, err))
|
||||
return
|
||||
}
|
||||
baseTeamID = baseTmpl.TeamID
|
||||
baseTemplateID = baseTmpl.ID
|
||||
}
|
||||
|
||||
resp, err := agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{
|
||||
resp, err := agent.CreateSandbox(buildCtx, connect.NewRequest(&pb.CreateSandboxRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Template: build.BaseTemplate,
|
||||
TeamId: id.UUIDString(baseTeamID),
|
||||
@ -234,129 +292,121 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
DiskSizeMb: 5120, // 5 GB for template builds
|
||||
}))
|
||||
if err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("create sandbox failed: %v", err))
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("create sandbox failed: %v", err))
|
||||
return
|
||||
}
|
||||
_ = resp
|
||||
|
||||
// Record sandbox/host association.
|
||||
_ = s.DB.UpdateBuildSandbox(ctx, db.UpdateBuildSandboxParams{
|
||||
_ = s.DB.UpdateBuildSandbox(buildCtx, db.UpdateBuildSandboxParams{
|
||||
ID: buildID,
|
||||
SandboxID: sandboxID,
|
||||
HostID: host.ID,
|
||||
})
|
||||
|
||||
// Parse recipe steps. preBuildCmds and postBuildCmds are hardcoded and always
|
||||
// valid; panic on error is appropriate here since it would be a programmer mistake.
|
||||
preBuildSteps, err := recipe.ParseRecipe(preBuildCmds)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid pre-build recipe: %v", err))
|
||||
}
|
||||
userRecipeSteps, err := recipe.ParseRecipe(userRecipe)
|
||||
if err != nil {
|
||||
s.destroySandbox(buildCtx, agent, sandboxIDStr)
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("recipe parse error: %v", err))
|
||||
return
|
||||
}
|
||||
postBuildSteps, err := recipe.ParseRecipe(postBuildCmds)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid post-build recipe: %v", err))
|
||||
}
|
||||
|
||||
// Execute build phases: pre-build → user recipe → post-build.
|
||||
var logs []BuildLogEntry
|
||||
// bctx carries working directory and env vars across all phases.
|
||||
var logs []recipe.BuildLogEntry
|
||||
step := 0
|
||||
bctx := &recipe.ExecContext{}
|
||||
|
||||
// Helper to run a list of commands in a given phase.
|
||||
// timeout=0 means no timeout (uses parent context).
|
||||
runPhase := func(phase string, cmds []string, timeout time.Duration) bool {
|
||||
for _, cmd := range cmds {
|
||||
step++
|
||||
log.Info("executing build step", "phase", phase, "step", step, "cmd", cmd)
|
||||
|
||||
execCtx := ctx
|
||||
var cancel context.CancelFunc
|
||||
// When no timeout is specified, use 10 minutes as a generous upper
|
||||
// bound. The host agent defaults TimeoutSec=0 to 30s, so we must
|
||||
// always send an explicit value.
|
||||
effectiveTimeout := timeout
|
||||
if effectiveTimeout <= 0 {
|
||||
effectiveTimeout = 10 * time.Minute
|
||||
}
|
||||
execCtx, cancel = context.WithTimeout(ctx, effectiveTimeout)
|
||||
timeoutSec := int32(effectiveTimeout.Seconds())
|
||||
|
||||
start := time.Now()
|
||||
execResp, err := agent.Exec(execCtx, connect.NewRequest(&pb.ExecRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Cmd: "/bin/sh",
|
||||
Args: []string{"-c", cmd},
|
||||
TimeoutSec: timeoutSec,
|
||||
}))
|
||||
cancel()
|
||||
|
||||
entry := BuildLogEntry{
|
||||
Step: step,
|
||||
Phase: phase,
|
||||
Cmd: cmd,
|
||||
Elapsed: time.Since(start).Milliseconds(),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
entry.Stderr = err.Error()
|
||||
entry.Ok = false
|
||||
logs = append(logs, entry)
|
||||
s.updateLogs(ctx, buildID, step, logs)
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("%s step %d failed: %v", phase, step, err))
|
||||
runPhase := func(phase string, steps []recipe.Step, defaultTimeout time.Duration) bool {
|
||||
newEntries, nextStep, ok := recipe.Execute(buildCtx, phase, steps, sandboxIDStr, step, defaultTimeout, bctx, agent.Exec)
|
||||
logs = append(logs, newEntries...)
|
||||
step = nextStep
|
||||
s.updateLogs(buildCtx, buildID, step, logs)
|
||||
if !ok {
|
||||
s.destroySandbox(buildCtx, agent, sandboxIDStr)
|
||||
// If the build was cancelled, status is already set — don't overwrite with "failed".
|
||||
if buildCtx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
entry.Stdout = string(execResp.Msg.Stdout)
|
||||
entry.Stderr = string(execResp.Msg.Stderr)
|
||||
entry.Exit = execResp.Msg.ExitCode
|
||||
entry.Ok = execResp.Msg.ExitCode == 0
|
||||
logs = append(logs, entry)
|
||||
s.updateLogs(ctx, buildID, step, logs)
|
||||
|
||||
if execResp.Msg.ExitCode != 0 {
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("%s step %d failed with exit code %d", phase, step, execResp.Msg.ExitCode))
|
||||
return false
|
||||
last := newEntries[len(newEntries)-1]
|
||||
reason := last.Stderr
|
||||
if reason == "" {
|
||||
reason = fmt.Sprintf("exit code %d", last.Exit)
|
||||
}
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("%s step %d failed: %s", phase, step, reason))
|
||||
}
|
||||
return true
|
||||
return ok
|
||||
}
|
||||
|
||||
if !runPhase("pre-build", preBuildCmds, 0) {
|
||||
if !build.SkipPrePost {
|
||||
if !runPhase("pre-build", preBuildSteps, 0) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if !runPhase("recipe", userRecipeSteps, buildCommandTimeout) {
|
||||
return
|
||||
}
|
||||
if !runPhase("recipe", recipe, buildCommandTimeout) {
|
||||
return
|
||||
}
|
||||
if !runPhase("post-build", postBuildCmds, 0) {
|
||||
return
|
||||
if !build.SkipPrePost {
|
||||
if !runPhase("post-build", postBuildSteps, 0) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Healthcheck or direct snapshot.
|
||||
var sizeBytes int64
|
||||
if build.Healthcheck != "" {
|
||||
log.Info("running healthcheck", "cmd", build.Healthcheck)
|
||||
if err := s.waitForHealthcheck(ctx, agent, sandboxIDStr, build.Healthcheck); err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("healthcheck failed: %v", err))
|
||||
if err := s.waitForHealthcheck(buildCtx, agent, sandboxIDStr, build.Healthcheck); err != nil {
|
||||
s.destroySandbox(buildCtx, agent, sandboxIDStr)
|
||||
if buildCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("healthcheck failed: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Healthcheck passed → full snapshot (with memory/CPU state).
|
||||
log.Info("healthcheck passed, creating snapshot")
|
||||
snapResp, err := agent.CreateSnapshot(ctx, connect.NewRequest(&pb.CreateSnapshotRequest{
|
||||
snapResp, err := agent.CreateSnapshot(buildCtx, connect.NewRequest(&pb.CreateSnapshotRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Name: build.Name,
|
||||
TeamId: id.UUIDString(build.TeamID),
|
||||
TemplateId: id.UUIDString(build.TemplateID),
|
||||
}))
|
||||
if err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("create snapshot failed: %v", err))
|
||||
s.destroySandbox(buildCtx, agent, sandboxIDStr)
|
||||
if buildCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("create snapshot failed: %v", err))
|
||||
return
|
||||
}
|
||||
sizeBytes = snapResp.Msg.SizeBytes
|
||||
} else {
|
||||
// No healthcheck → image-only template (rootfs only).
|
||||
log.Info("no healthcheck, flattening rootfs")
|
||||
flatResp, err := agent.FlattenRootfs(ctx, connect.NewRequest(&pb.FlattenRootfsRequest{
|
||||
flatResp, err := agent.FlattenRootfs(buildCtx, connect.NewRequest(&pb.FlattenRootfsRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Name: build.Name,
|
||||
TeamId: id.UUIDString(build.TeamID),
|
||||
TemplateId: id.UUIDString(build.TemplateID),
|
||||
}))
|
||||
if err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err))
|
||||
s.destroySandbox(buildCtx, agent, sandboxIDStr)
|
||||
if buildCtx.Err() != nil {
|
||||
return
|
||||
}
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err))
|
||||
return
|
||||
}
|
||||
sizeBytes = flatResp.Msg.SizeBytes
|
||||
@ -368,7 +418,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
templateType = "snapshot"
|
||||
}
|
||||
|
||||
if _, err := s.DB.InsertTemplate(ctx, db.InsertTemplateParams{
|
||||
if _, err := s.DB.InsertTemplate(buildCtx, db.InsertTemplateParams{
|
||||
ID: build.TemplateID,
|
||||
Name: build.Name,
|
||||
Type: templateType,
|
||||
@ -386,7 +436,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
// No additional destroy needed.
|
||||
|
||||
// Mark build as success.
|
||||
if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{
|
||||
if _, err := s.DB.UpdateBuildStatus(buildCtx, db.UpdateBuildStatusParams{
|
||||
ID: buildID, Status: "success",
|
||||
}); err != nil {
|
||||
log.Error("failed to mark build as success", "error", err)
|
||||
@ -429,7 +479,7 @@ func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentC
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) updateLogs(ctx context.Context, buildID pgtype.UUID, step int, logs []BuildLogEntry) {
|
||||
func (s *BuildService) updateLogs(ctx context.Context, buildID pgtype.UUID, step int, logs []recipe.BuildLogEntry) {
|
||||
logsJSON, err := json.Marshal(logs)
|
||||
if err != nil {
|
||||
slog.Warn("failed to marshal build logs", "error", err)
|
||||
|
||||
Reference in New Issue
Block a user