forked from wrenn/wrenn
Migrate the entire VM layer from Firecracker to Cloud Hypervisor (CH). CH provides native snapshot/restore via its HTTP API, eliminating the need for custom UFFD handling, memfile processing, and snapshot header management that Firecracker required. Key changes: - Remove fc.go, jailer.go (FC process management) - Remove internal/uffd/ package (userfaultfd lazy page loading) - Remove snapshot/header.go, mapping.go, memfile.go (FC snapshot format) - Add ch.go (CH HTTP API client over Unix socket) - Add process.go (CH process lifecycle with unshare+netns) - Add chversion.go (CH version detection) - Refactor sandbox manager: remove UFFD socket tracking, snapshot parent/diff chaining, FC-specific balloon logic; add crash watcher - Simplify snapshot/local.go to CH's native snapshot format - Update VM config: FirecrackerBin → VMMBin, new CH-specific fields - Update envdclient, devicemapper, network for CH compatibility
257 lines
8.1 KiB
Go
257 lines
8.1 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"git.omukk.dev/wrenn/wrenn/pkg/audit"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/db"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/id"
|
|
)
|
|
|
|
const (
|
|
sandboxEventStream = "wrenn:sandbox-events"
|
|
sandboxEventGroup = "wrenn-sandbox-events-v1"
|
|
sandboxEventConsumer = "cp-0"
|
|
)
|
|
|
|
// SandboxEvent is the canonical event payload published to the Redis stream
|
|
// by both the CP background goroutines (for explicit lifecycle ops) and
|
|
// the agent callback endpoint (for autonomous events like auto-pause).
|
|
type SandboxEvent struct {
|
|
Event string `json:"event"`
|
|
SandboxID string `json:"sandbox_id"`
|
|
HostID string `json:"host_id"`
|
|
HostIP string `json:"host_ip,omitempty"`
|
|
Metadata map[string]string `json:"metadata,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
}
|
|
|
|
// Sandbox event type constants.
|
|
const (
|
|
SandboxEventStarted = "sandbox.started"
|
|
SandboxEventPaused = "sandbox.paused"
|
|
SandboxEventResumed = "sandbox.resumed"
|
|
SandboxEventStopped = "sandbox.stopped"
|
|
SandboxEventFailed = "sandbox.failed"
|
|
SandboxEventError = "sandbox.error"
|
|
SandboxEventAutoPaused = "sandbox.auto_paused"
|
|
)
|
|
|
|
// SandboxEventConsumer reads sandbox lifecycle events from the Redis stream
|
|
// and updates database state accordingly. It follows the same XREADGROUP
|
|
// pattern as pkg/channels/dispatcher.go.
|
|
type SandboxEventConsumer struct {
|
|
rdb *redis.Client
|
|
db *db.Queries
|
|
audit *audit.AuditLogger
|
|
}
|
|
|
|
// NewSandboxEventConsumer creates a consumer.
|
|
func NewSandboxEventConsumer(rdb *redis.Client, queries *db.Queries, al *audit.AuditLogger) *SandboxEventConsumer {
|
|
return &SandboxEventConsumer{rdb: rdb, db: queries, audit: al}
|
|
}
|
|
|
|
// Start launches the consumer goroutine.
|
|
func (c *SandboxEventConsumer) Start(ctx context.Context) {
|
|
go c.run(ctx)
|
|
}
|
|
|
|
func (c *SandboxEventConsumer) run(ctx context.Context) {
|
|
err := c.rdb.XGroupCreateMkStream(ctx, sandboxEventStream, sandboxEventGroup, "$").Err()
|
|
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
|
slog.Error("sandbox event consumer: failed to create consumer group", "error", err)
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
streams, err := c.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
|
Group: sandboxEventGroup,
|
|
Consumer: sandboxEventConsumer,
|
|
Streams: []string{sandboxEventStream, ">"},
|
|
Count: 10,
|
|
Block: 5 * time.Second,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
if err == redis.Nil || ctx.Err() != nil {
|
|
continue
|
|
}
|
|
slog.Warn("sandbox event consumer: xreadgroup error", "error", err)
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
|
|
for _, stream := range streams {
|
|
for _, msg := range stream.Messages {
|
|
c.handleMessage(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *SandboxEventConsumer) handleMessage(ctx context.Context, msg redis.XMessage) {
|
|
// Use a non-cancellable context for XAck so shutdown doesn't leave
|
|
// messages permanently stuck in the pending entries list.
|
|
defer func() {
|
|
ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer ackCancel()
|
|
if err := c.rdb.XAck(ackCtx, sandboxEventStream, sandboxEventGroup, msg.ID).Err(); err != nil {
|
|
slog.Warn("sandbox event consumer: xack failed", "id", msg.ID, "error", err)
|
|
}
|
|
}()
|
|
|
|
payload, ok := msg.Values["payload"].(string)
|
|
if !ok {
|
|
slog.Warn("sandbox event consumer: message missing payload", "id", msg.ID)
|
|
return
|
|
}
|
|
|
|
var event SandboxEvent
|
|
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
|
slog.Warn("sandbox event consumer: failed to unmarshal event", "id", msg.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
sandboxID, err := id.ParseSandboxID(event.SandboxID)
|
|
if err != nil {
|
|
slog.Warn("sandbox event consumer: invalid sandbox ID", "sandbox_id", event.SandboxID, "error", err)
|
|
return
|
|
}
|
|
|
|
switch event.Event {
|
|
case SandboxEventStarted:
|
|
c.handleStarted(ctx, sandboxID, event, "starting")
|
|
case SandboxEventResumed:
|
|
c.handleStarted(ctx, sandboxID, event, "resuming")
|
|
case SandboxEventPaused:
|
|
c.handlePaused(ctx, sandboxID, event)
|
|
case SandboxEventStopped:
|
|
c.handleStopped(ctx, sandboxID, event)
|
|
case SandboxEventFailed, SandboxEventError:
|
|
c.handleFailed(ctx, sandboxID)
|
|
case SandboxEventAutoPaused:
|
|
c.handleAutoPaused(ctx, sandboxID, event)
|
|
default:
|
|
slog.Warn("sandbox event consumer: unknown event type", "event", event.Event)
|
|
}
|
|
}
|
|
|
|
// handleStarted is a fallback writer for sandbox.started and sandbox.resumed
|
|
// events. The background goroutine in SandboxService is the primary writer;
|
|
// this only succeeds if the goroutine's conditional update was missed.
|
|
func (c *SandboxEventConsumer) handleStarted(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent, fromStatus string) {
|
|
now := time.Now()
|
|
if _, err := c.db.UpdateSandboxRunningIf(ctx, db.UpdateSandboxRunningIfParams{
|
|
ID: sandboxID,
|
|
Status: fromStatus,
|
|
HostIp: event.HostIP,
|
|
StartedAt: pgtype.Timestamptz{
|
|
Time: now,
|
|
Valid: true,
|
|
},
|
|
}); err != nil {
|
|
return
|
|
}
|
|
|
|
if len(event.Metadata) > 0 {
|
|
metaJSON, _ := json.Marshal(event.Metadata)
|
|
_ = c.db.UpdateSandboxMetadata(ctx, db.UpdateSandboxMetadataParams{
|
|
ID: sandboxID,
|
|
Metadata: metaJSON,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *SandboxEventConsumer) handlePaused(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
|
|
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID,
|
|
Status: "pausing",
|
|
Status_2: "paused",
|
|
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
slog.Warn("sandbox event consumer: failed to update sandbox to paused", "sandbox_id", event.SandboxID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (c *SandboxEventConsumer) handleStopped(ctx context.Context, sandboxID pgtype.UUID, event SandboxEvent) {
|
|
// Try stopping → stopped (CP-initiated destroy completed).
|
|
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID,
|
|
Status: "stopping",
|
|
Status_2: "stopped",
|
|
}); err == nil {
|
|
return
|
|
}
|
|
// Try running → stopped (autonomous destroy, e.g. TTL auto-destroy).
|
|
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID,
|
|
Status: "running",
|
|
Status_2: "stopped",
|
|
}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
slog.Warn("sandbox event consumer: failed to update sandbox to stopped", "sandbox_id", event.SandboxID, "error", err)
|
|
}
|
|
}
|
|
|
|
// handleFailed marks a sandbox as "error" when the host agent reports a crash
|
|
// or the CP's background goroutine publishes a failure. Uses conditional update
|
|
// to avoid clobbering concurrent operations.
|
|
func (c *SandboxEventConsumer) handleFailed(ctx context.Context, sandboxID pgtype.UUID) {
|
|
// Try running → error (VM crash pushed by host agent).
|
|
if _, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID, Status: "running", Status_2: "error",
|
|
}); err == nil {
|
|
return
|
|
}
|
|
// Try starting → error (create failed).
|
|
_, _ = c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID, Status: "starting", Status_2: "error",
|
|
})
|
|
}
|
|
|
|
func (c *SandboxEventConsumer) handleAutoPaused(ctx context.Context, sandboxID pgtype.UUID, _ SandboxEvent) {
|
|
sb, err := c.db.UpdateSandboxStatusIf(ctx, db.UpdateSandboxStatusIfParams{
|
|
ID: sandboxID,
|
|
Status: "running",
|
|
Status_2: "paused",
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.audit.LogSandboxAutoPause(ctx, sb.TeamID, sandboxID)
|
|
}
|
|
|
|
// PublishSandboxEvent writes a sandbox lifecycle event to the Redis stream.
|
|
// Used by both the SandboxService background goroutines and the callback endpoint.
|
|
func PublishSandboxEvent(ctx context.Context, rdb *redis.Client, event SandboxEvent) {
|
|
payload, err := json.Marshal(event)
|
|
if err != nil {
|
|
slog.Warn("sandbox event: failed to marshal", "event", event.Event, "error", err)
|
|
return
|
|
}
|
|
|
|
if err := rdb.XAdd(ctx, &redis.XAddArgs{
|
|
Stream: sandboxEventStream,
|
|
MaxLen: 50000,
|
|
Approx: true,
|
|
Values: map[string]any{
|
|
"payload": string(payload),
|
|
},
|
|
}).Err(); err != nil {
|
|
slog.Warn("sandbox event: failed to publish", "event", event.Event, "error", err)
|
|
}
|
|
}
|