From 3671af249884e8a0a6cdf3a9a3283c6d5eb0cd6f Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sat, 16 May 2026 16:15:49 +0600 Subject: [PATCH] feat: immediate sandbox reconciliation on host reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a host transitions from unreachable → online via heartbeat, trigger ReconcileHost in a background goroutine so "missing" sandboxes are resolved instantly instead of waiting up to 60s for the next monitor tick. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/api/handlers_hosts.go | 11 ++++++++--- internal/api/host_monitor.go | 15 +++++++++++++++ internal/api/server.go | 3 ++- pkg/cpserver/run.go | 11 +++++++---- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/internal/api/handlers_hosts.go b/internal/api/handlers_hosts.go index 9536197..20b5ba6 100644 --- a/internal/api/handlers_hosts.go +++ b/internal/api/handlers_hosts.go @@ -1,6 +1,7 @@ package api import ( + "context" "errors" "log/slog" "net/http" @@ -21,10 +22,11 @@ type hostHandler struct { svc *service.HostService queries *db.Queries audit *audit.AuditLogger + monitor *HostMonitor } -func newHostHandler(svc *service.HostService, queries *db.Queries, al *audit.AuditLogger) *hostHandler { - return &hostHandler{svc: svc, queries: queries, audit: al} +func newHostHandler(svc *service.HostService, queries *db.Queries, al *audit.AuditLogger, monitor *HostMonitor) *hostHandler { + return &hostHandler{svc: svc, queries: queries, audit: al, monitor: monitor} } // Request/response types. @@ -426,9 +428,12 @@ func (h *hostHandler) Heartbeat(w http.ResponseWriter, r *http.Request) { return } - // Log marked_up if the host just recovered from unreachable. + // If the host just recovered from unreachable, log it and trigger immediate + // reconciliation so "missing" sandboxes are resolved without waiting for the + // next monitor tick. if prevHost.Status == "unreachable" { h.audit.LogHostMarkedUp(r.Context(), prevHost.TeamID, hc.HostID) + go h.monitor.ReconcileHost(context.Background(), hc.HostID) } w.WriteHeader(http.StatusNoContent) diff --git a/internal/api/host_monitor.go b/internal/api/host_monitor.go index c25dffb..06b3c28 100644 --- a/internal/api/host_monitor.go +++ b/internal/api/host_monitor.go @@ -77,6 +77,21 @@ func (m *HostMonitor) run(ctx context.Context) { } } +// ReconcileHost triggers immediate active reconciliation for a single host. +// Called when a host transitions from unreachable → online so sandboxes marked +// "missing" are resolved without waiting for the next monitor tick. +func (m *HostMonitor) ReconcileHost(ctx context.Context, hostID pgtype.UUID) { + host, err := m.db.GetHost(ctx, hostID) + if err != nil { + slog.Warn("host monitor: reconcile-on-connect: failed to get host", "error", err) + return + } + if host.Status != "online" { + return + } + m.checkHost(ctx, host) +} + func (m *HostMonitor) checkHost(ctx context.Context, host db.Host) { // --- Passive phase: check heartbeat staleness --- diff --git a/internal/api/server.go b/internal/api/server.go index a4e3e3b..57759a6 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -50,6 +50,7 @@ func New( mailer email.Mailer, extensions []cpextension.Extension, sctx cpextension.ServerContext, + monitor *HostMonitor, version string, ) *Server { r := chi.NewRouter() @@ -95,7 +96,7 @@ func New( authH := newAuthHandler(queries, pgPool, jwtSecret, mailer, rdb, oauthRedirectURL) oauthH := newOAuthHandler(queries, pgPool, jwtSecret, oauthRegistry, oauthRedirectURL) apiKeys := newAPIKeyHandler(apiKeySvc, al) - hostH := newHostHandler(hostSvc, queries, al) + hostH := newHostHandler(hostSvc, queries, al, monitor) teamH := newTeamHandler(teamSvc, al, mailer) usersH := newUsersHandler(queries, userSvc, al) auditH := newAuditHandler(auditSvc) diff --git a/pkg/cpserver/run.go b/pkg/cpserver/run.go index 7678d47..7ef47d1 100644 --- a/pkg/cpserver/run.go +++ b/pkg/cpserver/run.go @@ -177,8 +177,13 @@ func Run(opts ...Option) { Config: cfg, } + // Host monitor (passive + active reconciliation every 60s). + // Created before API server so the heartbeat handler can trigger immediate + // reconciliation when a host recovers from unreachable. + monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second) + // API server. - srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, o.version) + srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, monitor, o.version) // Start template build workers (2 concurrent). stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2) @@ -191,9 +196,7 @@ func Run(opts ...Option) { sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al) sandboxEventConsumer.Start(ctx) - // Start host monitor (passive + active reconciliation every 60s). - // Reduced from 15s since async events handle the normal case. - monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second) + // Start host monitor loop. monitor.Start(ctx) // Hard-delete accounts that have been soft-deleted for more than 15 days (runs every 24h).