forked from wrenn/wrenn
feat: immediate sandbox reconciliation on host reconnect
When a host transitions from unreachable → online via heartbeat, trigger ReconcileHost in a background goroutine so "missing" sandboxes are resolved instantly instead of waiting up to 60s for the next monitor tick. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -21,10 +22,11 @@ type hostHandler struct {
|
|||||||
svc *service.HostService
|
svc *service.HostService
|
||||||
queries *db.Queries
|
queries *db.Queries
|
||||||
audit *audit.AuditLogger
|
audit *audit.AuditLogger
|
||||||
|
monitor *HostMonitor
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHostHandler(svc *service.HostService, queries *db.Queries, al *audit.AuditLogger) *hostHandler {
|
func newHostHandler(svc *service.HostService, queries *db.Queries, al *audit.AuditLogger, monitor *HostMonitor) *hostHandler {
|
||||||
return &hostHandler{svc: svc, queries: queries, audit: al}
|
return &hostHandler{svc: svc, queries: queries, audit: al, monitor: monitor}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request/response types.
|
// Request/response types.
|
||||||
@ -426,9 +428,12 @@ func (h *hostHandler) Heartbeat(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log marked_up if the host just recovered from unreachable.
|
// If the host just recovered from unreachable, log it and trigger immediate
|
||||||
|
// reconciliation so "missing" sandboxes are resolved without waiting for the
|
||||||
|
// next monitor tick.
|
||||||
if prevHost.Status == "unreachable" {
|
if prevHost.Status == "unreachable" {
|
||||||
h.audit.LogHostMarkedUp(r.Context(), prevHost.TeamID, hc.HostID)
|
h.audit.LogHostMarkedUp(r.Context(), prevHost.TeamID, hc.HostID)
|
||||||
|
go h.monitor.ReconcileHost(context.Background(), hc.HostID)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
|||||||
@ -77,6 +77,21 @@ func (m *HostMonitor) run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReconcileHost triggers immediate active reconciliation for a single host.
|
||||||
|
// Called when a host transitions from unreachable → online so sandboxes marked
|
||||||
|
// "missing" are resolved without waiting for the next monitor tick.
|
||||||
|
func (m *HostMonitor) ReconcileHost(ctx context.Context, hostID pgtype.UUID) {
|
||||||
|
host, err := m.db.GetHost(ctx, hostID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("host monitor: reconcile-on-connect: failed to get host", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if host.Status != "online" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.checkHost(ctx, host)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *HostMonitor) checkHost(ctx context.Context, host db.Host) {
|
func (m *HostMonitor) checkHost(ctx context.Context, host db.Host) {
|
||||||
// --- Passive phase: check heartbeat staleness ---
|
// --- Passive phase: check heartbeat staleness ---
|
||||||
|
|
||||||
|
|||||||
@ -50,6 +50,7 @@ func New(
|
|||||||
mailer email.Mailer,
|
mailer email.Mailer,
|
||||||
extensions []cpextension.Extension,
|
extensions []cpextension.Extension,
|
||||||
sctx cpextension.ServerContext,
|
sctx cpextension.ServerContext,
|
||||||
|
monitor *HostMonitor,
|
||||||
version string,
|
version string,
|
||||||
) *Server {
|
) *Server {
|
||||||
r := chi.NewRouter()
|
r := chi.NewRouter()
|
||||||
@ -95,7 +96,7 @@ func New(
|
|||||||
authH := newAuthHandler(queries, pgPool, jwtSecret, mailer, rdb, oauthRedirectURL)
|
authH := newAuthHandler(queries, pgPool, jwtSecret, mailer, rdb, oauthRedirectURL)
|
||||||
oauthH := newOAuthHandler(queries, pgPool, jwtSecret, oauthRegistry, oauthRedirectURL)
|
oauthH := newOAuthHandler(queries, pgPool, jwtSecret, oauthRegistry, oauthRedirectURL)
|
||||||
apiKeys := newAPIKeyHandler(apiKeySvc, al)
|
apiKeys := newAPIKeyHandler(apiKeySvc, al)
|
||||||
hostH := newHostHandler(hostSvc, queries, al)
|
hostH := newHostHandler(hostSvc, queries, al, monitor)
|
||||||
teamH := newTeamHandler(teamSvc, al, mailer)
|
teamH := newTeamHandler(teamSvc, al, mailer)
|
||||||
usersH := newUsersHandler(queries, userSvc, al)
|
usersH := newUsersHandler(queries, userSvc, al)
|
||||||
auditH := newAuditHandler(auditSvc)
|
auditH := newAuditHandler(auditSvc)
|
||||||
|
|||||||
@ -177,8 +177,13 @@ func Run(opts ...Option) {
|
|||||||
Config: cfg,
|
Config: cfg,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Host monitor (passive + active reconciliation every 60s).
|
||||||
|
// Created before API server so the heartbeat handler can trigger immediate
|
||||||
|
// reconciliation when a host recovers from unreachable.
|
||||||
|
monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second)
|
||||||
|
|
||||||
// API server.
|
// API server.
|
||||||
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, o.version)
|
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc, mailer, o.extensions, sctx, monitor, o.version)
|
||||||
|
|
||||||
// Start template build workers (2 concurrent).
|
// Start template build workers (2 concurrent).
|
||||||
stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2)
|
stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2)
|
||||||
@ -191,9 +196,7 @@ func Run(opts ...Option) {
|
|||||||
sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al)
|
sandboxEventConsumer := api.NewSandboxEventConsumer(rdb, queries, al)
|
||||||
sandboxEventConsumer.Start(ctx)
|
sandboxEventConsumer.Start(ctx)
|
||||||
|
|
||||||
// Start host monitor (passive + active reconciliation every 60s).
|
// Start host monitor loop.
|
||||||
// Reduced from 15s since async events handle the normal case.
|
|
||||||
monitor := api.NewHostMonitor(queries, hostPool, al, 60*time.Second)
|
|
||||||
monitor.Start(ctx)
|
monitor.Start(ctx)
|
||||||
|
|
||||||
// Hard-delete accounts that have been soft-deleted for more than 15 days (runs every 24h).
|
// Hard-delete accounts that have been soft-deleted for more than 15 days (runs every 24h).
|
||||||
|
|||||||
Reference in New Issue
Block a user