1
0
forked from wrenn/wrenn

Add template build system with admin panel, async workers, and FlattenRootfs RPC

Introduces an end-to-end template building pipeline: admins submit a recipe
(list of shell commands) via the dashboard, a Redis-backed worker pool spins
up a sandbox, executes each command, and produces either a full snapshot
(with healthcheck) or an image-only template (rootfs flattened via a new
FlattenRootfs host-agent RPC). Build progress and per-step logs are persisted
to a new template_builds table and polled by the frontend.

Backend:
- New FlattenRootfs RPC (proto + host agent + sandbox manager)
- BuildService with Redis queue (BLPOP) and configurable worker pool (default 2)
- Admin-only REST endpoints: POST/GET /v1/admin/builds, GET /v1/admin/builds/{id}
- Migration for template_builds table with JSONB logs and recipe columns
- sqlc queries for build CRUD and progress updates

Frontend:
- /admin/templates page with Templates + Builds tabs
- Create Template dialog with recipe textarea, healthcheck, specs
- Build history with expandable per-step logs, status badges, progress bars
- Auto-polling every 3s for active builds
- AdminSidebar updated with Templates nav item
This commit is contained in:
2026-03-26 15:27:21 +06:00
parent 6898528096
commit 1ce62934b3
17 changed files with 2031 additions and 26 deletions

385
internal/service/build.go Normal file
View File

@ -0,0 +1,385 @@
package service
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"connectrpc.com/connect"
"github.com/jackc/pgx/v5/pgtype"
"github.com/redis/go-redis/v9"
"git.omukk.dev/wrenn/sandbox/internal/db"
"git.omukk.dev/wrenn/sandbox/internal/id"
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
"git.omukk.dev/wrenn/sandbox/internal/scheduler"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
)
const (
buildQueueKey = "wrenn:build_queue"
buildCommandTimeout = 30 * time.Second
healthcheckInterval = 1 * time.Second
healthcheckTimeout = 60 * time.Second
platformTeamID = "platform"
)
// buildAgentClient is the subset of the host agent client used by the build worker.
type buildAgentClient interface {
CreateSandbox(ctx context.Context, req *connect.Request[pb.CreateSandboxRequest]) (*connect.Response[pb.CreateSandboxResponse], error)
DestroySandbox(ctx context.Context, req *connect.Request[pb.DestroySandboxRequest]) (*connect.Response[pb.DestroySandboxResponse], error)
Exec(ctx context.Context, req *connect.Request[pb.ExecRequest]) (*connect.Response[pb.ExecResponse], error)
CreateSnapshot(ctx context.Context, req *connect.Request[pb.CreateSnapshotRequest]) (*connect.Response[pb.CreateSnapshotResponse], error)
FlattenRootfs(ctx context.Context, req *connect.Request[pb.FlattenRootfsRequest]) (*connect.Response[pb.FlattenRootfsResponse], error)
}
// BuildLogEntry represents a single entry in the build log JSONB array.
type BuildLogEntry struct {
Step int `json:"step"`
Cmd string `json:"cmd"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
Exit int32 `json:"exit"`
Ok bool `json:"ok"`
Elapsed int64 `json:"elapsed_ms"`
}
// BuildService handles template build orchestration.
type BuildService struct {
DB *db.Queries
Redis *redis.Client
Pool *lifecycle.HostClientPool
Scheduler scheduler.HostScheduler
}
// BuildCreateParams holds the parameters for creating a template build.
type BuildCreateParams struct {
Name string
BaseTemplate string
Recipe []string
Healthcheck string
VCPUs int32
MemoryMB int32
}
// Create inserts a new build record and enqueues it to Redis.
func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.TemplateBuild, error) {
if p.BaseTemplate == "" {
p.BaseTemplate = "minimal"
}
if p.VCPUs <= 0 {
p.VCPUs = 1
}
if p.MemoryMB <= 0 {
p.MemoryMB = 512
}
recipeJSON, err := json.Marshal(p.Recipe)
if err != nil {
return db.TemplateBuild{}, fmt.Errorf("marshal recipe: %w", err)
}
buildID := id.NewBuildID()
build, err := s.DB.InsertTemplateBuild(ctx, db.InsertTemplateBuildParams{
ID: buildID,
Name: p.Name,
BaseTemplate: p.BaseTemplate,
Recipe: recipeJSON,
Healthcheck: pgtype.Text{String: p.Healthcheck, Valid: p.Healthcheck != ""},
Vcpus: p.VCPUs,
MemoryMb: p.MemoryMB,
TotalSteps: int32(len(p.Recipe)),
})
if err != nil {
return db.TemplateBuild{}, fmt.Errorf("insert build: %w", err)
}
// Enqueue build ID to Redis for workers to pick up.
if err := s.Redis.RPush(ctx, buildQueueKey, buildID).Err(); err != nil {
return db.TemplateBuild{}, fmt.Errorf("enqueue build: %w", err)
}
return build, nil
}
// Get returns a single build by ID.
func (s *BuildService) Get(ctx context.Context, buildID string) (db.TemplateBuild, error) {
return s.DB.GetTemplateBuild(ctx, buildID)
}
// List returns all builds ordered by creation time.
func (s *BuildService) List(ctx context.Context) ([]db.TemplateBuild, error) {
return s.DB.ListTemplateBuilds(ctx)
}
// StartWorkers launches n goroutines that consume from the Redis build queue.
// The returned cancel function stops all workers.
func (s *BuildService) StartWorkers(ctx context.Context, n int) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
for i := range n {
go s.worker(ctx, i)
}
slog.Info("build workers started", "count", n)
return cancel
}
func (s *BuildService) worker(ctx context.Context, workerID int) {
log := slog.With("worker", workerID)
for {
// BLPOP blocks until a build ID is available or context is cancelled.
result, err := s.Redis.BLPop(ctx, 0, buildQueueKey).Result()
if err != nil {
if ctx.Err() != nil {
log.Info("build worker shutting down")
return
}
log.Error("redis BLPOP error", "error", err)
time.Sleep(time.Second)
continue
}
// result[0] is the key, result[1] is the build ID.
buildID := result[1]
log.Info("picked up build", "build_id", buildID)
s.executeBuild(ctx, buildID)
}
}
func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
log := slog.With("build_id", buildID)
build, err := s.DB.GetTemplateBuild(ctx, buildID)
if err != nil {
log.Error("failed to fetch build", "error", err)
return
}
// Mark as running.
if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{
ID: buildID, Status: "running",
}); err != nil {
log.Error("failed to update build status", "error", err)
return
}
// Parse recipe.
var recipe []string
if err := json.Unmarshal(build.Recipe, &recipe); err != nil {
s.failBuild(ctx, buildID, fmt.Sprintf("invalid recipe JSON: %v", err))
return
}
// Pick a platform host and create a sandbox.
host, err := s.Scheduler.SelectHost(ctx, platformTeamID, false)
if err != nil {
s.failBuild(ctx, buildID, fmt.Sprintf("no host available: %v", err))
return
}
agent, err := s.Pool.GetForHost(host)
if err != nil {
s.failBuild(ctx, buildID, fmt.Sprintf("agent client error: %v", err))
return
}
sandboxID := id.NewSandboxID()
log = log.With("sandbox_id", sandboxID, "host_id", host.ID)
resp, err := agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{
SandboxId: sandboxID,
Template: build.BaseTemplate,
Vcpus: build.Vcpus,
MemoryMb: build.MemoryMb,
TimeoutSec: 0, // no auto-pause for builds
}))
if err != nil {
s.failBuild(ctx, buildID, fmt.Sprintf("create sandbox failed: %v", err))
return
}
_ = resp
// Record sandbox/host association.
_ = s.DB.UpdateBuildSandbox(ctx, db.UpdateBuildSandboxParams{
ID: buildID,
SandboxID: pgtype.Text{String: sandboxID, Valid: true},
HostID: pgtype.Text{String: host.ID, Valid: true},
})
// Execute recipe commands.
var logs []BuildLogEntry
for i, cmd := range recipe {
log.Info("executing build step", "step", i+1, "cmd", cmd)
execCtx, cancel := context.WithTimeout(ctx, buildCommandTimeout)
start := time.Now()
execResp, err := agent.Exec(execCtx, connect.NewRequest(&pb.ExecRequest{
SandboxId: sandboxID,
Cmd: "/bin/sh",
Args: []string{"-c", cmd},
TimeoutSec: int32(buildCommandTimeout.Seconds()),
}))
cancel()
entry := BuildLogEntry{
Step: i + 1,
Cmd: cmd,
Elapsed: time.Since(start).Milliseconds(),
}
if err != nil {
entry.Stderr = err.Error()
entry.Ok = false
logs = append(logs, entry)
s.updateLogs(ctx, buildID, i+1, logs)
s.destroySandbox(ctx, agent, sandboxID)
s.failBuild(ctx, buildID, fmt.Sprintf("step %d exec error: %v", i+1, err))
return
}
entry.Stdout = string(execResp.Msg.Stdout)
entry.Stderr = string(execResp.Msg.Stderr)
entry.Exit = execResp.Msg.ExitCode
entry.Ok = execResp.Msg.ExitCode == 0
logs = append(logs, entry)
s.updateLogs(ctx, buildID, i+1, logs)
if execResp.Msg.ExitCode != 0 {
s.destroySandbox(ctx, agent, sandboxID)
s.failBuild(ctx, buildID, fmt.Sprintf("step %d failed with exit code %d", i+1, execResp.Msg.ExitCode))
return
}
}
// Healthcheck or direct snapshot.
if build.Healthcheck.Valid && build.Healthcheck.String != "" {
log.Info("running healthcheck", "cmd", build.Healthcheck.String)
if err := s.waitForHealthcheck(ctx, agent, sandboxID, build.Healthcheck.String); err != nil {
s.destroySandbox(ctx, agent, sandboxID)
s.failBuild(ctx, buildID, fmt.Sprintf("healthcheck failed: %v", err))
return
}
// Healthcheck passed → full snapshot (with memory/CPU state).
log.Info("healthcheck passed, creating snapshot")
if _, err := agent.CreateSnapshot(ctx, connect.NewRequest(&pb.CreateSnapshotRequest{
SandboxId: sandboxID,
Name: build.Name,
})); err != nil {
s.destroySandbox(ctx, agent, sandboxID)
s.failBuild(ctx, buildID, fmt.Sprintf("create snapshot failed: %v", err))
return
}
} else {
// No healthcheck → image-only template (rootfs only).
log.Info("no healthcheck, flattening rootfs")
if _, err := agent.FlattenRootfs(ctx, connect.NewRequest(&pb.FlattenRootfsRequest{
SandboxId: sandboxID,
Name: build.Name,
})); err != nil {
s.destroySandbox(ctx, agent, sandboxID)
s.failBuild(ctx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err))
return
}
}
// Insert into templates table as a global (platform) template.
templateType := "base"
if build.Healthcheck.Valid && build.Healthcheck.String != "" {
templateType = "snapshot"
}
if _, err := s.DB.InsertTemplate(ctx, db.InsertTemplateParams{
Name: build.Name,
Type: templateType,
Vcpus: pgtype.Int4{Int32: build.Vcpus, Valid: true},
MemoryMb: pgtype.Int4{Int32: build.MemoryMb, Valid: true},
SizeBytes: 0, // Could query the host, but the template is created.
TeamID: platformTeamID,
}); err != nil {
log.Error("failed to insert template record", "error", err)
// Build succeeded on disk, just DB record failed — don't mark as failed.
}
// For CreateSnapshot, the sandbox is already destroyed by the snapshot process.
// For FlattenRootfs, the sandbox is already destroyed by the flatten process.
// No additional destroy needed.
// Mark build as success.
if _, err := s.DB.UpdateBuildStatus(ctx, db.UpdateBuildStatusParams{
ID: buildID, Status: "success",
}); err != nil {
log.Error("failed to mark build as success", "error", err)
}
log.Info("template build completed successfully", "name", build.Name)
}
func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentClient, sandboxID, cmd string) error {
deadline := time.After(healthcheckTimeout)
ticker := time.NewTicker(healthcheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-deadline:
return fmt.Errorf("healthcheck timed out after %s", healthcheckTimeout)
case <-ticker.C:
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
resp, err := agent.Exec(execCtx, connect.NewRequest(&pb.ExecRequest{
SandboxId: sandboxID,
Cmd: "/bin/sh",
Args: []string{"-c", cmd},
TimeoutSec: 10,
}))
cancel()
if err != nil {
slog.Debug("healthcheck exec error (retrying)", "error", err)
continue
}
if resp.Msg.ExitCode == 0 {
return nil
}
slog.Debug("healthcheck failed (retrying)", "exit_code", resp.Msg.ExitCode)
}
}
}
func (s *BuildService) updateLogs(ctx context.Context, buildID string, step int, logs []BuildLogEntry) {
logsJSON, err := json.Marshal(logs)
if err != nil {
slog.Warn("failed to marshal build logs", "error", err)
return
}
if err := s.DB.UpdateBuildProgress(ctx, db.UpdateBuildProgressParams{
ID: buildID,
CurrentStep: int32(step),
Logs: logsJSON,
}); err != nil {
slog.Warn("failed to update build progress", "error", err)
}
}
func (s *BuildService) failBuild(ctx context.Context, buildID, errMsg string) {
slog.Error("build failed", "build_id", buildID, "error", errMsg)
if err := s.DB.UpdateBuildError(ctx, db.UpdateBuildErrorParams{
ID: buildID,
Error: pgtype.Text{String: errMsg, Valid: true},
}); err != nil {
slog.Error("failed to update build error", "build_id", buildID, "error", err)
}
}
func (s *BuildService) destroySandbox(ctx context.Context, agent buildAgentClient, sandboxID string) {
if _, err := agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{
SandboxId: sandboxID,
})); err != nil {
slog.Warn("failed to destroy build sandbox", "sandbox_id", sandboxID, "error", err)
}
}