1
0
forked from wrenn/wrenn
Files
wrenn-releases/internal/api/sandbox_event_consumer.go
pptx704 eaa6b8576d feat(vm): replace Firecracker with Cloud Hypervisor
Migrate the entire VM layer from Firecracker to Cloud Hypervisor (CH).
CH provides native snapshot/restore via its HTTP API, eliminating the
need for custom UFFD handling, memfile processing, and snapshot header
management that Firecracker required.

Key changes:
- Remove fc.go, jailer.go (FC process management)
- Remove internal/uffd/ package (userfaultfd lazy page loading)
- Remove snapshot/header.go, mapping.go, memfile.go (FC snapshot format)
- Add ch.go (CH HTTP API client over Unix socket)
- Add process.go (CH process lifecycle with unshare+netns)
- Add chversion.go (CH version detection)
- Refactor sandbox manager: remove UFFD socket tracking, snapshot
  parent/diff chaining, FC-specific balloon logic; add crash watcher
- Simplify snapshot/local.go to CH's native snapshot format
- Update VM config: FirecrackerBin → VMMBin, new CH-specific fields
- Update envdclient, devicemapper, network for CH compatibility
2026-05-17 01:33:12 +06:00

257 lines
8.1 KiB
Go

package api
import (
"context"
"encoding/json"
"errors"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/redis/go-redis/v9"
"git.omukk.dev/wrenn/wrenn/pkg/audit"
"git.omukk.dev/wrenn/wrenn/pkg/db"
"git.omukk.dev/wrenn/wrenn/pkg/id"
)
const (
sandboxEventStream = "wrenn:sandbox-events"
sandboxEventGroup = "wrenn-sandbox-events-v1"
sandboxEventConsumer = "cp-0"
)
// SandboxEvent is the canonical event payload published to the Redis stream
// by both the CP background goroutines (for explicit lifecycle ops) and
// the agent callback endpoint (for autonomous events like auto-pause).
type SandboxEvent struct {
Event string `json:"event"`
SandboxID string `json:"sandbox_id"`
HostID string `json:"host_id"`
HostIP string `json:"host_ip,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Error string `json:"error,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// Sandbox event type constants.
const (
SandboxEventStarted = "sandbox.started"
SandboxEventPaused = "sandbox.paused"
SandboxEventResumed = "sandbox.resumed"
SandboxEventStopped = "sandbox.stopped"
SandboxEventFailed = "sandbox.failed"
SandboxEventError = "sandbox.error"
SandboxEventAutoPaused = "sandbox.auto_paused"
)
// SandboxEventConsumer reads sandbox lifecycle events from the Redis stream
// and updates database state accordingly. It follows the same XREADGROUP
// pattern as pkg/channels/dispatcher.go.
type SandboxEventConsumer struct {
rdb *redis.Client
db *db.Queries
audit *audit.AuditLogger
}
// NewSandboxEventConsumer creates a consumer.
func NewSandboxEventConsumer(rdb *redis.Client, queries *db.Queries, al *audit.AuditLogger) *SandboxEventConsumer {
return &SandboxEventConsumer{rdb: rdb, db: queries, audit: al}
}
// Start launches the consumer goroutine.
func (c *SandboxEventConsumer) Start(ctx context.Context) {
go c.run(ctx)
}
func (c *SandboxEventConsumer) run(ctx context.Context) {
err := c.rdb.XGroupCreateMkStream(ctx, sandboxEventStream, sandboxEventGroup, "$").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
slog.Error("sandbox event consumer: failed to create consumer group", "error", err)
return
}
for {
select {
case <-ctx.Done():
return
default:
}
streams, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: sandboxEventGroup,
Consumer: sandboxEventConsumer,
Streams: []string{sandboxEventStream, ">"},
Count: 10,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil || ctx.Err() != nil {
continue
}
slog.Warn("sandbox event consumer: xreadgroup error", "error", err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
c.handleMessage(ctx, msg)
}
}
}
}
func (c *SandboxEventConsumer) handleMessage(ctx context.Context, msg redis.XMessage) {
// Use a non-cancellable context for XAck so shutdown doesn't leave
// messages permanently stuck in the pending entries list.
defer func() {
ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer ackCancel()
if err := c.rdb.XAck(ackCtx, sandboxEventStream, sandboxEventGroup, msg.ID).Err(); err != nil {
slog.Warn("sandbox event consumer: xack failed", "id", msg.ID, "error", err)
}
}()
payload, ok := msg.Values["payload"].(string)
if !ok {
slog.Warn("sandbox event consumer: message missing payload", "id", msg.ID)
return
}
var event SandboxEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
slog.Warn("sandbox event consumer: failed to unmarshal event", "id", msg.ID, "error", err)
return
}
sandboxID, err := id.ParseSandboxID(event.SandboxID)
if err != nil {
slog.Warn("sandbox event consumer: invalid sandbox ID", "sandbox_id", event.SandboxID, "error", err)
return
}
switch event.Event {
case SandboxEventStarted:
c.handleStarted(ctx, sandboxID, event, "starting")
case SandboxEventResumed:
c.handleStarted(ctx, sandboxID, event, "resuming")
case SandboxEventPaused:
c.handlePaused(ctx, sandboxID, event)
case SandboxEventStopped:
c.handleStopped(ctx, sandboxID, event)
case SandboxEventFailed, SandboxEventError:
c.handleFailed(ctx, sandboxID)
case SandboxEventAutoPaused:
c.handleAutoPaused(ctx, sandboxID, event)
default:
slog.Warn("sandbox event consumer: unknown event type", "event", event.Event)
}
}
// handleStarted is a fallback writer for sandbox.started and sandbox.resumed
// events. The background goroutine in SandboxService is the primary writer;
// this only succeeds if the goroutine's conditional update was missed.
func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent, fromStatus string) {
now := time.Now()
if _, err := c.db.UpdateSandboxRunningIf(ctx, db.UpdateSandboxRunningIfParams{
ID: sandboxID,
Status: fromStatus,
HostIp: event.HostIP,
StartedAt: pgtype.Timestamptz{
Time: now,
Valid: true,
},
}); err != nil {
return
}
if len(event.Metadata) > 0 {
metaJSON, _ := json.Marshal(event.Metadata)
_ = c.db.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{
ID: sandboxID,
Metadata: metaJSON,
})
}
}
func (c *SandboxEventConsumer) handlePaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "pausing",
Status_2: "paused",
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
slog.Warn("sandbox event consumer: failed to update sandbox to paused", "sandbox_id", event.SandboxID, "error", err)
}
}
func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
// Try stopping → stopped (CP-initiated destroy completed).
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "stopping",
Status_2: "stopped",
}); err == nil {
return
}
// Try running → stopped (autonomous destroy, e.g. TTL auto-destroy).
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "running",
Status_2: "stopped",
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
slog.Warn("sandbox event consumer: failed to update sandbox to stopped", "sandbox_id", event.SandboxID, "error", err)
}
}
// handleFailed marks a sandbox as "error" when the host agent reports a crash
// or the CP's background goroutine publishes a failure. Uses conditional update
// to avoid clobbering concurrent operations.
func (c *SandboxEventConsumer) handleFailed(ctx context.Context, sandboxID pgtype.UUID) {
// Try running → error (VM crash pushed by host agent).
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID, Status: "running", Status_2: "error",
}); err == nil {
return
}
// Try starting → error (create failed).
_, _ = c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID, Status: "starting", Status_2: "error",
})
}
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, _ SandboxEvent) {
sb, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
ID: sandboxID,
Status: "running",
Status_2: "paused",
})
if err != nil {
return
}
c.audit.LogSandboxAutoPause(ctx, sb.TeamID, sandboxID)
}
// PublishSandboxEvent writes a sandbox lifecycle event to the Redis stream.
// Used by both the SandboxService background goroutines and the callback endpoint.
func PublishSandboxEvent(ctx context.Context, rdb *redis.Client, event SandboxEvent) {
payload, err := json.Marshal(event)
if err != nil {
slog.Warn("sandbox event: failed to marshal", "event", event.Event, "error", err)
return
}
if err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: sandboxEventStream,
MaxLen: 50000,
Approx: true,
Values: map[string]any{
"payload": string(payload),
},
}).Err(); err != nil {
slog.Warn("sandbox event: failed to publish", "event", event.Event, "error", err)
}
}