forked from wrenn/wrenn
Switch database IDs from TEXT to native UUID
Consolidate 16 migrations into one with UUID columns for all entity
IDs. TEXT is kept only for polymorphic fields (audit_logs.actor_id,
resource_id) and template names. The id package now generates UUIDs
via google/uuid, with Format*/Parse* helpers for the prefixed wire
format (sb-{uuid}, usr-{uuid}, etc.). Auth context, services, and
handlers pass pgtype.UUID internally; conversion to/from prefixed
strings happens at API and RPC boundaries. Adds PlatformTeamID
(all-zeros UUID) for shared resources.
This commit is contained in:
@ -19,11 +19,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
buildQueueKey = "wrenn:build_queue"
|
||||
buildCommandTimeout = 30 * time.Second
|
||||
healthcheckInterval = 1 * time.Second
|
||||
healthcheckTimeout = 60 * time.Second
|
||||
platformTeamID = "platform"
|
||||
buildQueueKey = "wrenn:build_queue"
|
||||
buildCommandTimeout = 30 * time.Second
|
||||
healthcheckInterval = 1 * time.Second
|
||||
healthcheckTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
// buildAgentClient is the subset of the host agent client used by the build worker.
|
||||
@ -82,13 +81,14 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
}
|
||||
|
||||
buildID := id.NewBuildID()
|
||||
buildIDStr := id.FormatBuildID(buildID)
|
||||
|
||||
build, err := s.DB.InsertTemplateBuild(ctx, db.InsertTemplateBuildParams{
|
||||
ID: buildID,
|
||||
Name: p.Name,
|
||||
BaseTemplate: p.BaseTemplate,
|
||||
Recipe: recipeJSON,
|
||||
Healthcheck: pgtype.Text{String: p.Healthcheck, Valid: p.Healthcheck != ""},
|
||||
Healthcheck: p.Healthcheck,
|
||||
Vcpus: p.VCPUs,
|
||||
MemoryMb: p.MemoryMB,
|
||||
TotalSteps: int32(len(p.Recipe)),
|
||||
@ -97,8 +97,8 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
return db.TemplateBuild{}, fmt.Errorf("insert build: %w", err)
|
||||
}
|
||||
|
||||
// Enqueue build ID to Redis for workers to pick up.
|
||||
if err := s.Redis.RPush(ctx, buildQueueKey, buildID).Err(); err != nil {
|
||||
// Enqueue build ID (as formatted string) to Redis for workers to pick up.
|
||||
if err := s.Redis.RPush(ctx, buildQueueKey, buildIDStr).Err(); err != nil {
|
||||
return db.TemplateBuild{}, fmt.Errorf("enqueue build: %w", err)
|
||||
}
|
||||
|
||||
@ -106,7 +106,7 @@ func (s *BuildService) Create(ctx context.Context, p BuildCreateParams) (db.Temp
|
||||
}
|
||||
|
||||
// Get returns a single build by ID.
|
||||
func (s *BuildService) Get(ctx context.Context, buildID string) (db.TemplateBuild, error) {
|
||||
func (s *BuildService) Get(ctx context.Context, buildID pgtype.UUID) (db.TemplateBuild, error) {
|
||||
return s.DB.GetTemplateBuild(ctx, buildID)
|
||||
}
|
||||
|
||||
@ -140,15 +140,21 @@ func (s *BuildService) worker(ctx context.Context, workerID int) {
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
// result[0] is the key, result[1] is the build ID.
|
||||
buildID := result[1]
|
||||
log.Info("picked up build", "build_id", buildID)
|
||||
s.executeBuild(ctx, buildID)
|
||||
// result[0] is the key, result[1] is the build ID (formatted string).
|
||||
buildIDStr := result[1]
|
||||
log.Info("picked up build", "build_id", buildIDStr)
|
||||
s.executeBuild(ctx, buildIDStr)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
log := slog.With("build_id", buildID)
|
||||
func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
log := slog.With("build_id", buildIDStr)
|
||||
|
||||
buildID, err := id.ParseBuildID(buildIDStr)
|
||||
if err != nil {
|
||||
log.Error("invalid build ID from queue", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
build, err := s.DB.GetTemplateBuild(ctx, buildID)
|
||||
if err != nil {
|
||||
@ -172,7 +178,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
}
|
||||
|
||||
// Pick a platform host and create a sandbox.
|
||||
host, err := s.Scheduler.SelectHost(ctx, platformTeamID, false)
|
||||
host, err := s.Scheduler.SelectHost(ctx, id.PlatformTeamID, false)
|
||||
if err != nil {
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("no host available: %v", err))
|
||||
return
|
||||
@ -185,10 +191,11 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
}
|
||||
|
||||
sandboxID := id.NewSandboxID()
|
||||
log = log.With("sandbox_id", sandboxID, "host_id", host.ID)
|
||||
sandboxIDStr := id.FormatSandboxID(sandboxID)
|
||||
log = log.With("sandbox_id", sandboxIDStr, "host_id", id.FormatHostID(host.ID))
|
||||
|
||||
resp, err := agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
Template: build.BaseTemplate,
|
||||
Vcpus: build.Vcpus,
|
||||
MemoryMb: build.MemoryMb,
|
||||
@ -203,8 +210,8 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
// Record sandbox/host association.
|
||||
_ = s.DB.UpdateBuildSandbox(ctx, db.UpdateBuildSandboxParams{
|
||||
ID: buildID,
|
||||
SandboxID: pgtype.Text{String: sandboxID, Valid: true},
|
||||
HostID: pgtype.Text{String: host.ID, Valid: true},
|
||||
SandboxID: sandboxID,
|
||||
HostID: host.ID,
|
||||
})
|
||||
|
||||
// Execute recipe commands.
|
||||
@ -216,7 +223,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
start := time.Now()
|
||||
|
||||
execResp, err := agent.Exec(execCtx, connect.NewRequest(&pb.ExecRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
Cmd: "/bin/sh",
|
||||
Args: []string{"-c", cmd},
|
||||
TimeoutSec: int32(buildCommandTimeout.Seconds()),
|
||||
@ -234,7 +241,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
entry.Ok = false
|
||||
logs = append(logs, entry)
|
||||
s.updateLogs(ctx, buildID, i+1, logs)
|
||||
s.destroySandbox(ctx, agent, sandboxID)
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("step %d exec error: %v", i+1, err))
|
||||
return
|
||||
}
|
||||
@ -248,7 +255,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
s.updateLogs(ctx, buildID, i+1, logs)
|
||||
|
||||
if execResp.Msg.ExitCode != 0 {
|
||||
s.destroySandbox(ctx, agent, sandboxID)
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("step %d failed with exit code %d", i+1, execResp.Msg.ExitCode))
|
||||
return
|
||||
}
|
||||
@ -256,10 +263,10 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
|
||||
// Healthcheck or direct snapshot.
|
||||
var sizeBytes int64
|
||||
if build.Healthcheck.Valid && build.Healthcheck.String != "" {
|
||||
log.Info("running healthcheck", "cmd", build.Healthcheck.String)
|
||||
if err := s.waitForHealthcheck(ctx, agent, sandboxID, build.Healthcheck.String); err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxID)
|
||||
if build.Healthcheck != "" {
|
||||
log.Info("running healthcheck", "cmd", build.Healthcheck)
|
||||
if err := s.waitForHealthcheck(ctx, agent, sandboxIDStr, build.Healthcheck); err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("healthcheck failed: %v", err))
|
||||
return
|
||||
}
|
||||
@ -267,11 +274,11 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
// Healthcheck passed → full snapshot (with memory/CPU state).
|
||||
log.Info("healthcheck passed, creating snapshot")
|
||||
snapResp, err := agent.CreateSnapshot(ctx, connect.NewRequest(&pb.CreateSnapshotRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
Name: build.Name,
|
||||
}))
|
||||
if err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxID)
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("create snapshot failed: %v", err))
|
||||
return
|
||||
}
|
||||
@ -280,11 +287,11 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
// No healthcheck → image-only template (rootfs only).
|
||||
log.Info("no healthcheck, flattening rootfs")
|
||||
flatResp, err := agent.FlattenRootfs(ctx, connect.NewRequest(&pb.FlattenRootfsRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
Name: build.Name,
|
||||
}))
|
||||
if err != nil {
|
||||
s.destroySandbox(ctx, agent, sandboxID)
|
||||
s.destroySandbox(ctx, agent, sandboxIDStr)
|
||||
s.failBuild(ctx, buildID, fmt.Sprintf("flatten rootfs failed: %v", err))
|
||||
return
|
||||
}
|
||||
@ -293,17 +300,17 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
|
||||
// Insert into templates table as a global (platform) template.
|
||||
templateType := "base"
|
||||
if build.Healthcheck.Valid && build.Healthcheck.String != "" {
|
||||
if build.Healthcheck != "" {
|
||||
templateType = "snapshot"
|
||||
}
|
||||
|
||||
if _, err := s.DB.InsertTemplate(ctx, db.InsertTemplateParams{
|
||||
Name: build.Name,
|
||||
Type: templateType,
|
||||
Vcpus: pgtype.Int4{Int32: build.Vcpus, Valid: true},
|
||||
MemoryMb: pgtype.Int4{Int32: build.MemoryMb, Valid: true},
|
||||
Vcpus: build.Vcpus,
|
||||
MemoryMb: build.MemoryMb,
|
||||
SizeBytes: sizeBytes,
|
||||
TeamID: platformTeamID,
|
||||
TeamID: id.PlatformTeamID,
|
||||
}); err != nil {
|
||||
log.Error("failed to insert template record", "error", err)
|
||||
// Build succeeded on disk, just DB record failed — don't mark as failed.
|
||||
@ -323,7 +330,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildID string) {
|
||||
log.Info("template build completed successfully", "name", build.Name)
|
||||
}
|
||||
|
||||
func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentClient, sandboxID, cmd string) error {
|
||||
func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentClient, sandboxIDStr, cmd string) error {
|
||||
deadline := time.NewTimer(healthcheckTimeout)
|
||||
defer deadline.Stop()
|
||||
ticker := time.NewTicker(healthcheckInterval)
|
||||
@ -338,7 +345,7 @@ func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentC
|
||||
case <-ticker.C:
|
||||
execCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
resp, err := agent.Exec(execCtx, connect.NewRequest(&pb.ExecRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
Cmd: "/bin/sh",
|
||||
Args: []string{"-c", cmd},
|
||||
TimeoutSec: 10,
|
||||
@ -357,7 +364,7 @@ func (s *BuildService) waitForHealthcheck(ctx context.Context, agent buildAgentC
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) updateLogs(ctx context.Context, buildID string, step int, logs []BuildLogEntry) {
|
||||
func (s *BuildService) updateLogs(ctx context.Context, buildID pgtype.UUID, step int, logs []BuildLogEntry) {
|
||||
logsJSON, err := json.Marshal(logs)
|
||||
if err != nil {
|
||||
slog.Warn("failed to marshal build logs", "error", err)
|
||||
@ -372,26 +379,26 @@ func (s *BuildService) updateLogs(ctx context.Context, buildID string, step int,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) failBuild(_ context.Context, buildID, errMsg string) {
|
||||
slog.Error("build failed", "build_id", buildID, "error", errMsg)
|
||||
func (s *BuildService) failBuild(_ context.Context, buildID pgtype.UUID, errMsg string) {
|
||||
slog.Error("build failed", "build_id", id.FormatBuildID(buildID), "error", errMsg)
|
||||
// Use a detached context so DB writes survive parent context cancellation (e.g. shutdown).
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := s.DB.UpdateBuildError(ctx, db.UpdateBuildErrorParams{
|
||||
ID: buildID,
|
||||
Error: pgtype.Text{String: errMsg, Valid: true},
|
||||
Error: errMsg,
|
||||
}); err != nil {
|
||||
slog.Error("failed to update build error", "build_id", buildID, "error", err)
|
||||
slog.Error("failed to update build error", "build_id", id.FormatBuildID(buildID), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BuildService) destroySandbox(_ context.Context, agent buildAgentClient, sandboxID string) {
|
||||
func (s *BuildService) destroySandbox(_ context.Context, agent buildAgentClient, sandboxIDStr string) {
|
||||
// Use a detached context so cleanup succeeds even during shutdown.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if _, err := agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{
|
||||
SandboxId: sandboxID,
|
||||
SandboxId: sandboxIDStr,
|
||||
})); err != nil {
|
||||
slog.Warn("failed to destroy build sandbox", "sandbox_id", sandboxID, "error", err)
|
||||
slog.Warn("failed to destroy build sandbox", "sandbox_id", sandboxIDStr, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user