1
0
forked from wrenn/wrenn
Files
wrenn-releases/internal/api/sandbox_event_consumer.go
Rafeed M. Bhuiyan 05ddf62399 v0.2.0 (#50)
Co-authored-by: Tasnim Kabir Sadik <tksadik@omukk.dev>

Reviewed-on: wrenn/wrenn#50
2026-05-24 21:10:37 +00:00

311 lines
9.5 KiB
Go

package api
import (
"context"
"encoding/json"
"errors"
"log/slog"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/redis/go-redis/v9"
"git.omukk.dev/wrenn/wrenn/pkg/audit"
"git.omukk.dev/wrenn/wrenn/pkg/cpextension"
"git.omukk.dev/wrenn/wrenn/pkg/db"
"git.omukk.dev/wrenn/wrenn/pkg/events"
"git.omukk.dev/wrenn/wrenn/pkg/id"
)
const (
unifiedEventStream = "wrenn:events"
reconcilerConsumerGrp = "wrenn-sandbox-reconciler-v1"
reconcilerConsumer = "cp-0"
)
// SandboxEventConsumer reads capsule lifecycle events from the unified Redis
// stream and drives DB state reconciliation. Uses an independent consumer
// group so its cursor is separate from the channels dispatcher.
type SandboxEventConsumer struct {
rdb *redis.Client
db *db.Queries
audit *audit.AuditLogger
hooks []cpextension.SandboxEventHook
}
// NewSandboxEventConsumer creates a consumer.
func NewSandboxEventConsumer(rdb *redis.Client, queries *db.Queries, al *audit.AuditLogger, hooks []cpextension.SandboxEventHook) *SandboxEventConsumer {
return &SandboxEventConsumer{rdb: rdb, db: queries, audit: al, hooks: hooks}
}
// Start launches the consumer goroutine. Reads from "$" so prior history
// is not replayed.
func (c *SandboxEventConsumer) Start(ctx context.Context) {
go c.run(ctx)
}
func (c *SandboxEventConsumer) run(ctx context.Context) {
err := c.rdb.XGroupCreateMkStream(ctx, unifiedEventStream, reconcilerConsumerGrp, "$").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
slog.Error("sandbox event consumer: failed to create consumer group", "error", err)
return
}
for {
select {
case <-ctx.Done():
return
default:
}
streams, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: reconcilerConsumerGrp,
Consumer: reconcilerConsumer,
Streams: []string{unifiedEventStream, ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil || ctx.Err() != nil {
continue
}
slog.Warn("sandbox event consumer: xreadgroup error", "error", err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
c.handleMessage(ctx, msg)
}
}
}
}
func (c *SandboxEventConsumer) handleMessage(ctx context.Context, msg redis.XMessage) {
ack := true
defer func() {
if !ack {
return
}
ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ackCancel()
if err := c.rdb.XAck(ackCtx, unifiedEventStream, reconcilerConsumerGrp, msg.ID).Err(); err != nil {
slog.Warn("sandbox event consumer: xack failed", "id", msg.ID, "error", err)
}
}()
payload, ok := msg.Values["payload"].(string)
if !ok {
slog.Warn("sandbox event consumer: message missing payload", "id", msg.ID)
return
}
var event events.Event
if err := json.Unmarshal([]byte(payload), &event); err != nil {
slog.Warn("sandbox event consumer: failed to unmarshal event", "id", msg.ID, "error", err)
return
}
// Only capsule.* events drive sandbox reconciliation.
if !strings.HasPrefix(event.Event, "capsule.") || event.Event == events.CapsuleStateChanged {
return
}
// Only system-actor events represent host-side state we need to reflect
// in the DB; user-actor events are already mirrored by the handler that
// produced them.
if event.Actor.Type != events.ActorSystem {
// Exception: handlers publish capsule.create with user actor before
// the host has reported back. Those are owned by the service goroutine.
return
}
sandboxID, err := id.ParseSandboxID(event.Resource.ID)
if err != nil {
slog.Warn("sandbox event consumer: invalid sandbox ID", "sandbox_id", event.Resource.ID, "error", err)
return
}
switch event.Event {
case events.CapsuleCreate:
if event.Outcome == events.OutcomeSuccess {
c.handleStarted(ctx, sandboxID, event, "starting")
} else {
c.handleFailed(ctx, sandboxID, event)
}
case events.CapsuleResume:
if event.Outcome == events.OutcomeSuccess {
c.handleStarted(ctx, sandboxID, event, "resuming")
} else {
c.handleFailed(ctx, sandboxID, event)
}
case events.CapsulePause:
if event.Outcome == events.OutcomeSuccess {
c.handleAutoPaused(ctx, sandboxID)
}
case events.CapsuleDestroy:
if event.Outcome == events.OutcomeSuccess {
c.handleStopped(ctx, sandboxID)
}
}
// Dispatch to extension hooks (cloud billing, audit shipping, etc.). Any
// hook error suppresses the ack so the message will be redelivered. Hooks
// MUST be idempotent — duplicate deliveries are expected on transient
// failures.
if len(c.hooks) > 0 && event.Outcome == events.OutcomeSuccess {
if verb, ok := canonicalSandboxVerb(event.Event); ok {
teamID, _ := id.ParseTeamID(event.TeamID)
meta := map[string]any{}
for k, v := range event.Metadata {
meta[k] = v
}
ev := cpextension.SandboxEvent{
SandboxID: sandboxID,
TeamID: teamID,
Type: verb,
OccurredAt: parseEventTimestamp(event.Timestamp),
Metadata: meta,
}
for _, h := range c.hooks {
if err := h.OnSandboxEvent(ctx, ev); err != nil {
slog.Warn("sandbox event hook failed; leaving message un-acked", "id", msg.ID, "event", event.Event, "error", err)
ack = false
return
}
}
}
}
}
func canonicalSandboxVerb(event string) (string, bool) {
switch event {
case events.CapsuleCreate:
return "created", true
case events.CapsuleResume:
return "resumed", true
case events.CapsulePause:
return "paused", true
case events.CapsuleDestroy:
return "destroyed", true
}
return "", false
}
func parseEventTimestamp(s string) time.Time {
if s == "" {
return time.Now().UTC()
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return time.Now().UTC()
}
return t
}
// handleStarted is a fallback writer for capsule.create.success and
// capsule.resume.success. The background goroutine in SandboxService is the
// primary writer; this only succeeds if the goroutine's conditional update
// was missed.
func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgtype.UUID, event events.Event, fromStatus string) {
hostIP := event.Metadata["host_ip"]
now := time.Now()
if _, err := c.db.UpdateSandboxRunningIf(ctx, db.UpdateSandboxRunningIfParams{
ID: sandboxID,
Status: fromStatus,
HostIp: hostIP,
StartedAt: pgtype.Timestamptz{
Time: now,
Valid: true,
},
}); err != nil {
return
}
}
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID) {
for _, fromStatus := range []string{"running", "pausing"} {
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID, Status: fromStatus, Status_2: "paused",
}); err == nil {
slog.Debug("sandbox event consumer: auto-paused fallback applied", "sandbox_id", id.FormatSandboxID(sandboxID), "from", fromStatus)
return
}
}
}
func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID) {
// stopping → stopped (CP-initiated destroy completed). No audit row here;
// the handler that issued the destroy already wrote one.
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "stopping",
Status_2: "stopped",
}); err == nil {
return
}
// running → stopped (autonomous destroy, e.g. TTL destroy fallback).
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "running",
Status_2: "stopped",
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
slog.Warn("sandbox event consumer: failed to update sandbox to stopped", "sandbox_id", id.FormatSandboxID(sandboxID), "error", err)
}
}
// handleFailed marks a sandbox as "error" when a verb event reports failure
// and writes a system audit row. The DB update is idempotent — the
// SandboxService background goroutine usually wrote "error" already on the
// fast-fail path, which settles in seconds and so never reaches the
// HostMonitor's transient-timeout reconciliation.
//
// audit.Log writes the row only — it does NOT republish an event, which would
// loop back into this consumer. Do not switch to LogSandboxCreateSystem here.
func (c *SandboxEventConsumer) handleFailed(ctx context.Context, sandboxID pgtype.UUID, event events.Event) {
for _, fromStatus := range []string{"running", "starting", "pausing", "resuming", "snapshotting"} {
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID, Status: fromStatus, Status_2: "error",
}); err == nil {
break
}
}
// The HostMonitor transient-timeout reconciler emits failure events via
// LogSandboxCreateSystem / LogSandboxResumeSystem, which already write
// their own audit row before publishing — auditing again here would
// double-count. Those helpers publish with reason="transient_timeout";
// the un-audited fast-fail (createInBackground) and host-callback paths
// do not, so only they need a row written here.
if event.Metadata["reason"] == "transient_timeout" {
return
}
action := "create"
if event.Event == events.CapsuleResume {
action = "resume"
}
reason := event.Metadata["reason"]
if reason == "" {
reason = action + "_failed"
}
meta := map[string]any{"reason": reason}
if event.Error != "" {
meta["error"] = event.Error
}
teamID, _ := id.ParseTeamID(event.TeamID)
c.audit.Log(ctx, audit.Entry{
TeamID: teamID,
ActorType: "system",
ResourceType: "sandbox",
ResourceID: id.FormatSandboxID(sandboxID),
Action: action,
Scope: "team",
Status: "error",
Metadata: meta,
})
}