forked from wrenn/wrenn
feat: async sandbox lifecycle with Redis Stream events
Replace synchronous RPC-based CP-host communication for sandbox lifecycle operations (Create, Pause, Resume, Destroy) with an async pattern. CP handlers now return 202 Accepted immediately, fire agent RPCs in background goroutines, and publish state events to a Redis Stream. A background consumer processes events as a fallback writer. Agent-side auto-pause events are pushed to the CP via HTTP callback (POST /v1/hosts/sandbox-events), keeping Redis internal to the CP. All DB status transitions use conditional updates (UpdateSandboxStatusIf, UpdateSandboxRunningIf) to prevent race conditions between concurrent operations and background goroutines. The HostMonitor reconciler is kept at 60s as a safety net, extended to handle transient statuses (starting, pausing, resuming, stopping). Frontend updated to handle 202 responses with empty bodies and render transient statuses with blue indicators.
This commit is contained in:
@ -57,7 +57,7 @@ func (h *adminCapsuleHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ac.TeamID = id.PlatformTeamID
|
||||
h.audit.LogSandboxCreate(r.Context(), ac, sb.ID, sb.Template)
|
||||
writeJSON(w, http.StatusCreated, sandboxToResponse(sb))
|
||||
writeJSON(w, http.StatusAccepted, sandboxToResponse(sb))
|
||||
}
|
||||
|
||||
// List handles GET /v1/admin/capsules.
|
||||
@ -113,7 +113,7 @@ func (h *adminCapsuleHandler) Destroy(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
h.audit.LogSandboxDestroy(r.Context(), ac, sandboxID)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
type adminSnapshotRequest struct {
|
||||
|
||||
@ -108,7 +108,7 @@ func (h *sandboxHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
h.audit.LogSandboxCreate(r.Context(), ac, sb.ID, sb.Template)
|
||||
writeJSON(w, http.StatusCreated, sandboxToResponse(sb))
|
||||
writeJSON(w, http.StatusAccepted, sandboxToResponse(sb))
|
||||
}
|
||||
|
||||
// List handles GET /v1/capsules.
|
||||
@ -167,7 +167,7 @@ func (h *sandboxHandler) Pause(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
h.audit.LogSandboxPause(r.Context(), ac, sandboxID)
|
||||
writeJSON(w, http.StatusOK, sandboxToResponse(sb))
|
||||
writeJSON(w, http.StatusAccepted, sandboxToResponse(sb))
|
||||
}
|
||||
|
||||
// Resume handles POST /v1/capsules/{id}/resume.
|
||||
@ -189,7 +189,7 @@ func (h *sandboxHandler) Resume(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
h.audit.LogSandboxResume(r.Context(), ac, sandboxID)
|
||||
writeJSON(w, http.StatusOK, sandboxToResponse(sb))
|
||||
writeJSON(w, http.StatusAccepted, sandboxToResponse(sb))
|
||||
}
|
||||
|
||||
// Ping handles POST /v1/capsules/{id}/ping.
|
||||
@ -230,5 +230,5 @@ func (h *sandboxHandler) Destroy(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
h.audit.LogSandboxDestroy(r.Context(), ac, sandboxID)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
65
internal/api/handlers_sandbox_events.go
Normal file
65
internal/api/handlers_sandbox_events.go
Normal file
@ -0,0 +1,65 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/auth"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/db"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/id"
|
||||
)
|
||||
|
||||
type sandboxEventHandler struct {
|
||||
db *db.Queries
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func newSandboxEventHandler(queries *db.Queries, rdb *redis.Client) *sandboxEventHandler {
|
||||
return &sandboxEventHandler{db: queries, rdb: rdb}
|
||||
}
|
||||
|
||||
type sandboxEventRequest struct {
|
||||
Event string `json:"event"`
|
||||
SandboxID string `json:"sandbox_id"`
|
||||
HostID string `json:"host_id"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Handle receives lifecycle event callbacks from host agents and publishes
|
||||
// them to the internal Redis stream for the SandboxEventConsumer to process.
|
||||
func (h *sandboxEventHandler) Handle(w http.ResponseWriter, r *http.Request) {
|
||||
var req sandboxEventRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Event == "" || req.SandboxID == "" || req.HostID == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "event, sandbox_id, and host_id are required")
|
||||
return
|
||||
}
|
||||
|
||||
// Validate that the calling host matches the host_id in the payload.
|
||||
hc := auth.MustHostFromContext(r.Context())
|
||||
callerHostID := id.FormatHostID(hc.HostID)
|
||||
if callerHostID != req.HostID {
|
||||
writeError(w, http.StatusForbidden, "forbidden", "host_id does not match authenticated host")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Timestamp == 0 {
|
||||
req.Timestamp = time.Now().Unix()
|
||||
}
|
||||
|
||||
PublishSandboxEvent(r.Context(), h.rdb, SandboxEvent{
|
||||
Event: req.Event,
|
||||
SandboxID: req.SandboxID,
|
||||
HostID: req.HostID,
|
||||
Timestamp: req.Timestamp,
|
||||
})
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
@ -213,4 +213,49 @@ func (m *HostMonitor) checkHost(ctx context.Context, host db.Host) {
|
||||
slog.Warn("host monitor: failed to mark stopped", "host_id", id.FormatHostID(host.ID), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Reconcile transient statuses (starting, resuming, pausing, stopping) ---
|
||||
// These represent in-flight operations. If the sandbox is no longer alive on
|
||||
// the host, infer the final state based on the transient status.
|
||||
|
||||
transientSandboxes, err := m.db.ListSandboxesByHostAndStatus(ctx, db.ListSandboxesByHostAndStatusParams{
|
||||
HostID: host.ID,
|
||||
Column2: []string{"starting", "resuming", "pausing", "stopping"},
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("host monitor: failed to list transient sandboxes", "host_id", id.FormatHostID(host.ID), "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, sb := range transientSandboxes {
|
||||
sbIDStr := id.FormatSandboxID(sb.ID)
|
||||
if _, ok := alive[sbIDStr]; ok {
|
||||
// Sandbox is alive on host — the background goroutine should
|
||||
// finalize the transition. For starting/resuming, if the sandbox
|
||||
// is alive it means creation/resume succeeded.
|
||||
if sb.Status == "starting" || sb.Status == "resuming" {
|
||||
if _, err := m.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
||||
ID: sb.ID, Status: sb.Status, Status_2: "running",
|
||||
}); err == nil {
|
||||
slog.Info("host monitor: promoted transient sandbox to running", "sandbox_id", sbIDStr, "from", sb.Status)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Sandbox is not alive on host — infer final state.
|
||||
var finalStatus string
|
||||
switch sb.Status {
|
||||
case "starting", "resuming":
|
||||
finalStatus = "error"
|
||||
case "pausing":
|
||||
finalStatus = "paused"
|
||||
case "stopping":
|
||||
finalStatus = "stopped"
|
||||
}
|
||||
if _, err := m.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
||||
ID: sb.ID, Status: sb.Status, Status_2: finalStatus,
|
||||
}); err == nil {
|
||||
slog.Info("host monitor: resolved transient sandbox", "sandbox_id", sbIDStr, "from", sb.Status, "to", finalStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
236
internal/api/sandbox_event_consumer.go
Normal file
236
internal/api/sandbox_event_consumer.go
Normal file
@ -0,0 +1,236 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/audit"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/db"
|
||||
"git.omukk.dev/wrenn/wrenn/pkg/id"
|
||||
)
|
||||
|
||||
const (
|
||||
sandboxEventStream = "wrenn:sandbox-events"
|
||||
sandboxEventGroup = "wrenn-sandbox-events-v1"
|
||||
sandboxEventConsumer = "cp-0"
|
||||
)
|
||||
|
||||
// SandboxEvent is the canonical event payload published to the Redis stream
|
||||
// by both the CP background goroutines (for explicit lifecycle ops) and
|
||||
// the agent callback endpoint (for autonomous events like auto-pause).
|
||||
type SandboxEvent struct {
|
||||
Event string `json:"event"`
|
||||
SandboxID string `json:"sandbox_id"`
|
||||
HostID string `json:"host_id"`
|
||||
HostIP string `json:"host_ip,omitempty"`
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Sandbox event type constants.
|
||||
const (
|
||||
SandboxEventStarted = "sandbox.started"
|
||||
SandboxEventPaused = "sandbox.paused"
|
||||
SandboxEventResumed = "sandbox.resumed"
|
||||
SandboxEventStopped = "sandbox.stopped"
|
||||
SandboxEventFailed = "sandbox.failed"
|
||||
SandboxEventAutoPaused = "sandbox.auto_paused"
|
||||
)
|
||||
|
||||
// SandboxEventConsumer reads sandbox lifecycle events from the Redis stream
|
||||
// and updates database state accordingly. It follows the same XREADGROUP
|
||||
// pattern as pkg/channels/dispatcher.go.
|
||||
type SandboxEventConsumer struct {
|
||||
rdb *redis.Client
|
||||
db *db.Queries
|
||||
audit *audit.AuditLogger
|
||||
}
|
||||
|
||||
// NewSandboxEventConsumer creates a consumer.
|
||||
func NewSandboxEventConsumer(rdb *redis.Client, queries *db.Queries, al *audit.AuditLogger) *SandboxEventConsumer {
|
||||
return &SandboxEventConsumer{rdb: rdb, db: queries, audit: al}
|
||||
}
|
||||
|
||||
// Start launches the consumer goroutine.
|
||||
func (c *SandboxEventConsumer) Start(ctx context.Context) {
|
||||
go c.run(ctx)
|
||||
}
|
||||
|
||||
func (c *SandboxEventConsumer) run(ctx context.Context) {
|
||||
err := c.rdb.XGroupCreateMkStream(ctx, sandboxEventStream, sandboxEventGroup, "$").Err()
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
slog.Error("sandbox event consumer: failed to create consumer group", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
streams, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: sandboxEventGroup,
|
||||
Consumer: sandboxEventConsumer,
|
||||
Streams: []string{sandboxEventStream, ">"},
|
||||
Count: 10,
|
||||
Block: 5 * time.Second,
|
||||
}).Result()
|
||||
|
||||
if err != nil {
|
||||
if err == redis.Nil || ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
slog.Warn("sandbox event consumer: xreadgroup error", "error", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, msg := range stream.Messages {
|
||||
c.handleMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SandboxEventConsumer) handleMessage(ctx context.Context, msg redis.XMessage) {
|
||||
// Use a non-cancellable context for XAck so shutdown doesn't leave
|
||||
// messages permanently stuck in the pending entries list.
|
||||
defer func() {
|
||||
ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer ackCancel()
|
||||
if err := c.rdb.XAck(ackCtx, sandboxEventStream, sandboxEventGroup, msg.ID).Err(); err != nil {
|
||||
slog.Warn("sandbox event consumer: xack failed", "id", msg.ID, "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
payload, ok := msg.Values["payload"].(string)
|
||||
if !ok {
|
||||
slog.Warn("sandbox event consumer: message missing payload", "id", msg.ID)
|
||||
return
|
||||
}
|
||||
|
||||
var event SandboxEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
slog.Warn("sandbox event consumer: failed to unmarshal event", "id", msg.ID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
sandboxID, err := id.ParseSandboxID(event.SandboxID)
|
||||
if err != nil {
|
||||
slog.Warn("sandbox event consumer: invalid sandbox ID", "sandbox_id", event.SandboxID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Event {
|
||||
case SandboxEventStarted:
|
||||
c.handleStarted(ctx, sandboxID, event, "starting")
|
||||
case SandboxEventResumed:
|
||||
c.handleStarted(ctx, sandboxID, event, "resuming")
|
||||
case SandboxEventPaused:
|
||||
c.handlePaused(ctx, sandboxID, event)
|
||||
case SandboxEventStopped:
|
||||
c.handleStopped(ctx, sandboxID, event)
|
||||
case SandboxEventFailed:
|
||||
c.handleFailed(ctx, sandboxID)
|
||||
case SandboxEventAutoPaused:
|
||||
c.handleAutoPaused(ctx, sandboxID, event)
|
||||
default:
|
||||
slog.Warn("sandbox event consumer: unknown event type", "event", event.Event)
|
||||
}
|
||||
}
|
||||
|
||||
// handleStarted is a fallback writer for sandbox.started and sandbox.resumed
|
||||
// events. The background goroutine in SandboxService is the primary writer;
|
||||
// this only succeeds if the goroutine's conditional update was missed.
|
||||
func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent, fromStatus string) {
|
||||
now := time.Now()
|
||||
if _, err := c.db.UpdateSandboxRunningIf(ctx, db.UpdateSandboxRunningIfParams{
|
||||
ID: sandboxID,
|
||||
Status: fromStatus,
|
||||
HostIp: event.HostIP,
|
||||
StartedAt: pgtype.Timestamptz{
|
||||
Time: now,
|
||||
Valid: true,
|
||||
},
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(event.Metadata) > 0 {
|
||||
metaJSON, _ := json.Marshal(event.Metadata)
|
||||
_ = c.db.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{
|
||||
ID: sandboxID,
|
||||
Metadata: metaJSON,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SandboxEventConsumer) handlePaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
|
||||
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
||||
ID: sandboxID,
|
||||
Status: "pausing",
|
||||
Status_2: "paused",
|
||||
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
slog.Warn("sandbox event consumer: failed to update sandbox to paused", "sandbox_id", event.SandboxID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
|
||||
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
||||
ID: sandboxID,
|
||||
Status: "stopping",
|
||||
Status_2: "stopped",
|
||||
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
slog.Warn("sandbox event consumer: failed to update sandbox to stopped", "sandbox_id", event.SandboxID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleFailed is a no-op fallback — the background goroutine already
|
||||
// performed the conditional DB update before publishing this event.
|
||||
// We keep the case arm so unknown event types are flagged, but avoid
|
||||
// an unconditional status write that could clobber concurrent operations.
|
||||
func (c *SandboxEventConsumer) handleFailed(_ context.Context, _ pgtype.UUID) {}
|
||||
|
||||
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, _ SandboxEvent) {
|
||||
sb, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
||||
ID: sandboxID,
|
||||
Status: "running",
|
||||
Status_2: "paused",
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.audit.LogSandboxAutoPause(ctx, sb.TeamID, sandboxID)
|
||||
}
|
||||
|
||||
// PublishSandboxEvent writes a sandbox lifecycle event to the Redis stream.
|
||||
// Used by both the SandboxService background goroutines and the callback endpoint.
|
||||
func PublishSandboxEvent(ctx context.Context, rdb *redis.Client, event SandboxEvent) {
|
||||
payload, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
slog.Warn("sandbox event: failed to marshal", "event", event.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := rdb.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: sandboxEventStream,
|
||||
MaxLen: 50000,
|
||||
Approx: true,
|
||||
Values: map[string]any{
|
||||
"payload": string(payload),
|
||||
},
|
||||
}).Err(); err != nil {
|
||||
slog.Warn("sandbox event: failed to publish", "event", event.Event, "error", err)
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -63,6 +64,17 @@ func New(
|
||||
|
||||
// Shared service layer.
|
||||
sandboxSvc := &service.SandboxService{DB: queries, Pool: pool, Scheduler: sched}
|
||||
sandboxSvc.PublishEvent = func(ctx context.Context, event service.SandboxStateEvent) {
|
||||
PublishSandboxEvent(ctx, rdb, SandboxEvent{
|
||||
Event: event.Event,
|
||||
SandboxID: event.SandboxID,
|
||||
HostID: event.HostID,
|
||||
HostIP: event.HostIP,
|
||||
Metadata: event.Metadata,
|
||||
Error: event.Error,
|
||||
Timestamp: event.Timestamp,
|
||||
})
|
||||
}
|
||||
apiKeySvc := &service.APIKeyService{DB: queries}
|
||||
templateSvc := &service.TemplateService{DB: queries}
|
||||
hostSvc := &service.HostService{DB: queries, Redis: rdb, JWT: jwtSecret, Pool: pool, CA: ca}
|
||||
@ -95,6 +107,7 @@ func New(
|
||||
ptyH := newPtyHandler(queries, pool, jwtSecret)
|
||||
processH := newProcessHandler(queries, pool, jwtSecret)
|
||||
adminCapsules := newAdminCapsuleHandler(sandboxSvc, queries, pool, al)
|
||||
sandboxEvtH := newSandboxEventHandler(queries, rdb)
|
||||
meH := newMeHandler(queries, pgPool, rdb, jwtSecret, mailer, oauthRegistry, oauthRedirectURL, teamSvc)
|
||||
|
||||
// Health check.
|
||||
@ -221,8 +234,9 @@ func New(
|
||||
// Unauthenticated: refresh token exchange.
|
||||
r.Post("/auth/refresh", hostH.RefreshToken)
|
||||
|
||||
// Host-token-authenticated: heartbeat.
|
||||
// Host-token-authenticated: heartbeat and lifecycle callbacks.
|
||||
r.With(requireHostToken(jwtSecret)).Post("/{id}/heartbeat", hostH.Heartbeat)
|
||||
r.With(requireHostToken(jwtSecret)).Post("/sandbox-events", sandboxEvtH.Handle)
|
||||
|
||||
// JWT-authenticated: host CRUD and tags.
|
||||
r.Group(func(r chi.Router) {
|
||||
|
||||
Reference in New Issue
Block a user