1
0
forked from wrenn/wrenn

Add pre-pause proxy connection drain and sandbox proxy caching

Introduce ConnTracker (atomic.Bool + WaitGroup) to track in-flight proxy
connections per sandbox. Before pausing a VM, the manager drains active
connections with a 2s grace period, preventing Go runtime corruption
inside the guest caused by stale TCP state surviving Firecracker
snapshot/restore.

Also add:
- AcquireProxyConn on Manager for atomic lookup + connection tracking
- Proxy cache (120s TTL) on CP SandboxProxyWrapper with single-query
  DB lookup (GetSandboxProxyTarget) to avoid two round-trips
- Reset() on ConnTracker to re-enable connections if pause fails
This commit is contained in:
2026-04-01 15:09:44 +06:00
parent 377e856c8f
commit 2b4c5e0176
7 changed files with 253 additions and 54 deletions

View File

@ -1,6 +1,8 @@
package api
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
@ -9,6 +11,8 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5/pgtype"
@ -18,10 +22,45 @@ import (
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
)
// Sentinel errors returned by proxyTarget, used to map to HTTP status codes
// without relying on error message text.
var (
errProxySandboxNotFound = errors.New("sandbox not found")
errProxyNoHostAddress = errors.New("host agent has no address")
)
const proxyCacheTTL = 120 * time.Second
// sandboxHostPattern matches hostnames like "49999-cl-abcd1234.localhost" or
// "49999-cl-abcd1234.example.com". Captures: port, sandbox ID.
var sandboxHostPattern = regexp.MustCompile(`^(\d+)-(cl-[0-9a-z]+)\.`)
// errProxySandboxNotRunning carries the sandbox status so callers can include
// it in the HTTP response without parsing error strings.
type errProxySandboxNotRunning struct{ status string }
func (e errProxySandboxNotRunning) Error() string {
return fmt.Sprintf("sandbox is not running (status: %s)", e.status)
}
// proxyCacheEntry caches the resolved agent URL for a (sandbox, team) pair.
// The *httputil.ReverseProxy is built per-request (cheap) so the Director closure
// can capture the correct port without the cache key needing to include it.
type proxyCacheEntry struct {
agentURL *url.URL
expiresAt time.Time
}
// proxyCacheKey is a fixed-size key from two UUIDs, avoids string allocation.
type proxyCacheKey [32]byte
func makeProxyCacheKey(sandboxID, teamID pgtype.UUID) proxyCacheKey {
var k proxyCacheKey
copy(k[:16], sandboxID.Bytes[:])
copy(k[16:], teamID.Bytes[:])
return k
}
// SandboxProxyWrapper wraps an existing HTTP handler and intercepts requests
// whose Host header matches the {port}-{sandbox_id}.{domain} pattern. Matching
// requests are reverse-proxied through the host agent that owns the sandbox.
@ -34,6 +73,9 @@ type SandboxProxyWrapper struct {
db *db.Queries
pool *lifecycle.HostClientPool
transport http.RoundTripper
cacheMu sync.Mutex
cache map[proxyCacheKey]proxyCacheEntry
}
// NewSandboxProxyWrapper creates a new proxy wrapper.
@ -43,9 +85,63 @@ func NewSandboxProxyWrapper(inner http.Handler, queries *db.Queries, pool *lifec
db: queries,
pool: pool,
transport: pool.Transport(),
cache: make(map[proxyCacheKey]proxyCacheEntry),
}
}
// proxyTarget looks up the cached agent URL for (sandboxID, teamID).
// On a miss it queries the DB, resolves the address, and populates the cache.
// The *httputil.ReverseProxy is built by the caller so the Director closure
// captures the correct port without the cache key needing to include it.
func (h *SandboxProxyWrapper) proxyTarget(ctx context.Context, sandboxID, teamID pgtype.UUID) (*url.URL, error) {
cacheKey := makeProxyCacheKey(sandboxID, teamID)
h.cacheMu.Lock()
entry, ok := h.cache[cacheKey]
h.cacheMu.Unlock()
if ok && time.Now().Before(entry.expiresAt) {
return entry.agentURL, nil
}
// Cache miss or expired — query DB.
target, err := h.db.GetSandboxProxyTarget(ctx, db.GetSandboxProxyTargetParams{
ID: sandboxID,
TeamID: teamID,
})
if err != nil {
return nil, errProxySandboxNotFound
}
if target.Status != "running" {
return nil, errProxySandboxNotRunning{status: target.Status}
}
if target.HostAddress == "" {
return nil, errProxyNoHostAddress
}
agentURL, err := url.Parse(h.pool.ResolveAddr(target.HostAddress))
if err != nil {
return nil, fmt.Errorf("invalid host agent address: %w", err)
}
h.cacheMu.Lock()
h.cache[cacheKey] = proxyCacheEntry{
agentURL: agentURL,
expiresAt: time.Now().Add(proxyCacheTTL),
}
h.cacheMu.Unlock()
return agentURL, nil
}
// evictProxyCache removes the cached entry for a (sandbox, team) pair.
// Called on 502 so a stopped/moved sandbox is re-resolved on the next request.
func (h *SandboxProxyWrapper) evictProxyCache(sandboxID, teamID pgtype.UUID) {
h.cacheMu.Lock()
delete(h.cache, makeProxyCacheKey(sandboxID, teamID))
h.cacheMu.Unlock()
}
func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host := r.Host
// Strip port from Host header (e.g. "49999-cl-abcd1234.localhost:8000" → "49999-cl-abcd1234.localhost")
@ -82,51 +178,26 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request)
return
}
ctx := r.Context()
// Look up sandbox and verify ownership.
sb, err := h.db.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{
ID: sandboxID,
TeamID: teamID,
})
agentURL, err := h.proxyTarget(r.Context(), sandboxID, teamID)
if err != nil {
http.Error(w, "sandbox not found", http.StatusNotFound)
return
}
if sb.Status != "running" {
http.Error(w, fmt.Sprintf("sandbox is not running (status: %s)", sb.Status), http.StatusConflict)
return
}
agentHost, err := h.db.GetHost(ctx, sb.HostID)
if err != nil {
http.Error(w, "host agent not found", http.StatusServiceUnavailable)
return
}
if agentHost.Address == "" {
http.Error(w, "host agent has no address", http.StatusServiceUnavailable)
return
}
agentAddr := h.pool.ResolveAddr(agentHost.Address)
upstreamPath := fmt.Sprintf("/proxy/%s/%s%s", sandboxIDStr, port, r.URL.Path)
target, err := url.Parse(agentAddr)
if err != nil {
http.Error(w, "invalid host agent address", http.StatusInternalServerError)
switch {
case errors.Is(err, errProxySandboxNotFound):
http.Error(w, err.Error(), http.StatusNotFound)
case errors.As(err, new(errProxySandboxNotRunning)):
http.Error(w, err.Error(), http.StatusConflict)
default:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
}
return
}
proxy := &httputil.ReverseProxy{
Transport: h.transport,
Director: func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = upstreamPath
req.URL.RawQuery = r.URL.RawQuery
req.Host = target.Host
req.URL.Scheme = agentURL.Scheme
req.URL.Host = agentURL.Host
req.URL.Path = "/proxy/" + sandboxIDStr + "/" + port + req.URL.Path
req.Host = agentURL.Host
},
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
slog.Debug("sandbox proxy error",
@ -134,10 +205,10 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request)
"port", port,
"error", err,
)
h.evictProxyCache(sandboxID, teamID)
http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway)
},
}
proxy.ServeHTTP(w, r)
}