Replace synchronous RPC-based CP-host communication for sandbox lifecycle operations (Create, Pause, Resume, Destroy) with an async pattern. CP handlers now return 202 Accepted immediately, fire agent RPCs in background goroutines, and publish state events to a Redis Stream. A background consumer processes events as a fallback writer. Agent-side auto-pause events are pushed to the CP via HTTP callback (POST /v1/hosts/sandbox-events), keeping Redis internal to the CP. All DB status transitions use conditional updates (UpdateSandboxStatusIf, UpdateSandboxRunningIf) to prevent race conditions between concurrent operations and background goroutines. The HostMonitor reconciler is kept at 60s as a safety net, extended to handle transient statuses (starting, pausing, resuming, stopping). Frontend updated to handle 202 responses with empty bodies and render transient statuses with blue indicators.
294 lines
9.4 KiB
Go
294 lines
9.4 KiB
Go
package cpserver
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
|
|
"git.omukk.dev/wrenn/wrenn/internal/api"
|
|
"git.omukk.dev/wrenn/wrenn/internal/email"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/audit"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/auth"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/auth/oauth"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/channels"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/config"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/db"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/id"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/lifecycle"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/logging"
|
|
"git.omukk.dev/wrenn/wrenn/pkg/scheduler"
|
|
)
|
|
|
|
// Run initializes and starts the control plane server. It blocks until a
|
|
// SIGINT or SIGTERM signal is received, then shuts down gracefully.
|
|
//
|
|
// Extensions registered via WithExtensions get to add routes and start
|
|
// background workers after the core server is fully initialized.
|
|
func Run(opts ...Option) {
|
|
o := &options{
|
|
version: "dev",
|
|
commit: "unknown",
|
|
}
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
|
|
cfg := config.Load()
|
|
cleanupLog := logging.Setup(filepath.Join(cfg.WrennDir, "logs"), "control-plane")
|
|
defer cleanupLog()
|
|
|
|
if len(cfg.JWTSecret) < 32 {
|
|
slog.Error("JWT_SECRET must be at least 32 characters")
|
|
os.Exit(1)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Database connection pool.
|
|
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
|
|
if err != nil {
|
|
slog.Error("failed to connect to database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
defer pool.Close()
|
|
|
|
if err := pool.Ping(ctx); err != nil {
|
|
slog.Error("failed to ping database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("connected to database")
|
|
|
|
queries := db.New(pool)
|
|
|
|
// Redis client.
|
|
redisOpts, err := redis.ParseURL(cfg.RedisURL)
|
|
if err != nil {
|
|
slog.Error("failed to parse REDIS_URL", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
rdb := redis.NewClient(redisOpts)
|
|
defer rdb.Close()
|
|
|
|
if err := rdb.Ping(ctx).Err(); err != nil {
|
|
slog.Error("failed to ping redis", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("connected to redis")
|
|
|
|
// mTLS is mandatory — parse internal CA for CP↔agent communication.
|
|
if cfg.CACert == "" || cfg.CAKey == "" {
|
|
slog.Error("WRENN_CA_CERT and WRENN_CA_KEY are required — mTLS is mandatory for CP↔agent communication")
|
|
os.Exit(1)
|
|
}
|
|
ca, err := auth.ParseCA(cfg.CACert, cfg.CAKey)
|
|
if err != nil {
|
|
slog.Error("failed to parse mTLS CA from environment", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("mTLS enabled: CA loaded")
|
|
|
|
// Host client pool — manages Connect RPC clients to host agents.
|
|
cpCertStore, err := auth.NewCPCertStore(ca)
|
|
if err != nil {
|
|
slog.Error("failed to issue CP client certificate", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
// Renew the CP client certificate periodically so it never expires
|
|
// while the control plane is running (TTL = 24h, renewal = every 12h).
|
|
go func() {
|
|
ticker := time.NewTicker(auth.CPCertRenewInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := cpCertStore.Refresh(); err != nil {
|
|
slog.Error("failed to renew CP client certificate", "error", err)
|
|
} else {
|
|
slog.Info("CP client certificate renewed")
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
hostPool := lifecycle.NewHostClientPoolTLS(auth.CPClientTLSConfig(ca, cpCertStore))
|
|
slog.Info("host client pool: mTLS enabled")
|
|
|
|
// Scheduler — picks a host for each new sandbox (least-loaded, bottleneck-first).
|
|
hostScheduler := scheduler.NewLeastLoadedScheduler(queries)
|
|
|
|
// OAuth provider registry.
|
|
oauthRegistry := oauth.NewRegistry()
|
|
if cfg.OAuthGitHubClientID != "" && cfg.OAuthGitHubClientSecret != "" {
|
|
if cfg.CPPublicURL == "" {
|
|
slog.Error("CP_PUBLIC_URL must be set when OAuth providers are configured")
|
|
os.Exit(1)
|
|
}
|
|
callbackURL := strings.TrimRight(cfg.CPPublicURL, "/") + "/auth/oauth/github/callback"
|
|
ghProvider := oauth.NewGitHubProvider(cfg.OAuthGitHubClientID, cfg.OAuthGitHubClientSecret, callbackURL)
|
|
oauthRegistry.Register(ghProvider)
|
|
slog.Info("registered OAuth provider", "provider", "github")
|
|
}
|
|
|
|
// Channels: publisher, service, dispatcher.
|
|
if len(cfg.EncryptionKeyHex) != 64 {
|
|
slog.Error("WRENN_ENCRYPTION_KEY must be a hex-encoded 32-byte key (64 hex chars)")
|
|
os.Exit(1)
|
|
}
|
|
channelPub := channels.NewPublisher(rdb)
|
|
channelSvc := &channels.Service{DB: queries, EncKey: cfg.EncryptionKey}
|
|
channelDispatcher := channels.NewDispatcher(rdb, queries, cfg.EncryptionKey)
|
|
|
|
// Shared audit logger with event publishing.
|
|
al := audit.NewWithPublisher(queries, channelPub)
|
|
|
|
// Transactional email (no-op if SMTP_HOST is not set).
|
|
mailer := email.New(email.Config{
|
|
Host: cfg.SMTPHost,
|
|
Port: cfg.SMTPPort,
|
|
Username: cfg.SMTPUsername,
|
|
Password: cfg.SMTPPassword,
|
|
FromEmail: cfg.SMTPFromEmail,
|
|
})
|
|
|
|
// Build the server context that extensions receive.
|
|
sctx := ServerContext{
|
|
Queries: queries,
|
|
PgPool: pool,
|
|
Redis: rdb,
|
|
HostPool: hostPool,
|
|
Scheduler: hostScheduler,
|
|
CA: ca,
|
|
Audit: al,
|
|
Mailer: mailer,
|
|
JWTSecret: []byte(cfg.JWTSecret),
|
|
Config: cfg,
|
|
}
|
|
|
|
// API server.
|
|
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, o.version)
|
|
|
|
// Start template build workers (2 concurrent).
|
|
stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2)
|
|
defer stopBuildWorkers()
|
|
|
|
// Start channel event dispatcher.
|
|
channelDispatcher.Start(ctx)
|
|
|
|
// Start sandbox event consumer (processes lifecycle events from Redis stream).
|
|
sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al)
|
|
sandboxEventConsumer.Start(ctx)
|
|
|
|
// Start host monitor (passive + active reconciliation every 60s).
|
|
// Reduced from 15s since async events handle the normal case.
|
|
monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second)
|
|
monitor.Start(ctx)
|
|
|
|
// Hard-delete accounts that have been soft-deleted for more than 15 days (runs every 24h).
|
|
// Audit logs referencing deleted users are anonymized before the user row is removed.
|
|
// A notification email is sent to the user before their data is permanently removed.
|
|
go func() {
|
|
ticker := time.NewTicker(24 * time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
expired, err := queries.ListExpiredSoftDeletedUsers(ctx)
|
|
if err != nil {
|
|
slog.Error("account cleanup: failed to list expired users", "error", err)
|
|
continue
|
|
}
|
|
var deleted int
|
|
for _, row := range expired {
|
|
prefixedID := id.FormatUserID(row.ID)
|
|
if err := queries.AnonymizeAuditLogsByUserID(ctx, pgtype.Text{String: prefixedID, Valid: true}); err != nil {
|
|
slog.Error("account cleanup: failed to anonymize audit logs, skipping delete", "user_id", prefixedID, "error", err)
|
|
continue
|
|
}
|
|
if err := queries.HardDeleteUser(ctx, row.ID); err != nil {
|
|
slog.Error("account cleanup: failed to hard-delete user", "user_id", prefixedID, "error", err)
|
|
continue
|
|
}
|
|
if err := mailer.Send(ctx, row.Email, "Your Wrenn account has been deleted", email.EmailData{
|
|
Message: "Your Wrenn account and all associated data have been permanently deleted. " +
|
|
"This action was taken automatically because your account was scheduled for deletion more than 15 days ago.\n\n" +
|
|
"If you believe this was done in error, please contact support.",
|
|
Closing: "Thank you for using Wrenn.",
|
|
}); err != nil {
|
|
slog.Warn("account cleanup: failed to send deletion notification", "email", row.Email, "error", err)
|
|
}
|
|
deleted++
|
|
}
|
|
if len(expired) > 0 {
|
|
slog.Info("account cleanup: processed expired users", "total", len(expired), "deleted", deleted)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start metrics sampler (records per-team sandbox stats every 10s).
|
|
sampler := api.NewMetricsSampler(queries, 10*time.Second)
|
|
sampler.Start(ctx)
|
|
|
|
// Start daily usage rollup (pre-computes CPU-minutes and RAM-MB-minutes).
|
|
rollup := api.NewDailyUsageRollup(queries, time.Hour)
|
|
rollup.Start(ctx)
|
|
|
|
// Start extension background workers.
|
|
for _, ext := range o.extensions {
|
|
for _, worker := range ext.BackgroundWorkers(sctx) {
|
|
worker(ctx)
|
|
}
|
|
}
|
|
|
|
// Wrap the API handler with the sandbox proxy so that requests with
|
|
// {port}-{sandbox_id}.{domain} Host headers are routed to the sandbox's
|
|
// host agent. All other requests pass through to the normal API router.
|
|
proxyWrapper := api.NewSandboxProxyWrapper(srv.Handler(), queries, hostPool)
|
|
|
|
httpServer := &http.Server{
|
|
Addr: cfg.ListenAddr,
|
|
Handler: proxyWrapper,
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
IdleTimeout: 620 * time.Second, // > typical LB/Cloudflare upstream timeout
|
|
}
|
|
|
|
// Graceful shutdown on signal.
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
sig := <-sigCh
|
|
slog.Info("received signal, shutting down", "signal", sig)
|
|
cancel()
|
|
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer shutdownCancel()
|
|
|
|
if err := httpServer.Shutdown(shutdownCtx); err != nil {
|
|
slog.Error("http server shutdown error", "error", err)
|
|
}
|
|
}()
|
|
|
|
slog.Info("control plane starting", "addr", cfg.ListenAddr, "version", o.version, "commit", o.commit)
|
|
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
slog.Error("http server error", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
slog.Info("control plane stopped")
|
|
}
|