1
0
forked from wrenn/wrenn
Reviewed-on: wrenn/wrenn#38
Co-authored-by: pptx704 <rafeed@omukk.dev>
Co-committed-by: pptx704 <rafeed@omukk.dev>
This commit is contained in:
2026-05-01 09:01:08 +00:00
committed by Rafeed M. Bhuiyan
parent 52ad21c339
commit 4fcc19e91f
23 changed files with 437 additions and 117 deletions

View File

@ -1 +1 @@
0.1.0 0.1.1

View File

@ -1 +1 @@
0.1.3 0.1.4

View File

@ -148,7 +148,13 @@ func main() {
slog.Info("host registered", "host_id", creds.HostID) slog.Info("host registered", "host_id", creds.HostID)
// httpServer is declared here so the shutdown func can reference it. // httpServer is declared here so the shutdown func can reference it.
httpServer := &http.Server{Addr: listenAddr} // ReadTimeout/WriteTimeout are intentionally omitted — they would kill
// long-lived Connect RPC streams and WebSocket proxy connections.
httpServer := &http.Server{
Addr: listenAddr,
ReadHeaderTimeout: 10 * time.Second,
IdleTimeout: 620 * time.Second, // > typical LB upstream timeout (600s)
}
// mTLS is mandatory — refuse to start without a valid certificate. // mTLS is mandatory — refuse to start without a valid certificate.
var certStore hostagent.CertStore var certStore hostagent.CertStore
@ -193,6 +199,7 @@ func main() {
path, handler := hostagentv1connect.NewHostAgentServiceHandler(srv) path, handler := hostagentv1connect.NewHostAgentServiceHandler(srv)
proxyHandler := hostagent.NewProxyHandler(mgr) proxyHandler := hostagent.NewProxyHandler(mgr)
mgr.SetOnDestroy(proxyHandler.EvictProxy)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle(path, handler) mux.Handle(path, handler)

View File

@ -1 +1 @@
0.1.0 0.1.1

View File

@ -446,7 +446,9 @@ func (p *Handler) Wait() {
err := p.cmd.Wait() err := p.cmd.Wait()
if p.tty != nil {
p.tty.Close() p.tty.Close()
}
var errMsg *string var errMsg *string

View File

@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"path"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@ -74,7 +73,7 @@ func NewSandboxProxyWrapper(inner http.Handler, queries *db.Queries, pool *lifec
inner: inner, inner: inner,
db: queries, db: queries,
pool: pool, pool: pool,
transport: pool.Transport(), transport: pool.NewProxyTransport(),
cache: make(map[pgtype.UUID]proxyCacheEntry), cache: make(map[pgtype.UUID]proxyCacheEntry),
} }
} }
@ -167,14 +166,29 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request)
return return
} }
// The host agent's proxy adds a /proxy/{id}/{port} prefix to Location
// headers for path-based routing. For subdomain routing the browser is at
// {port}-{id}.domain, so we strip the prefix back out.
agentProxyPrefix := "/proxy/" + sandboxIDStr + "/" + port
proxy := &httputil.ReverseProxy{ proxy := &httputil.ReverseProxy{
Transport: h.transport, Transport: h.transport,
Director: func(req *http.Request) { Director: func(req *http.Request) {
req.URL.Scheme = agentURL.Scheme req.URL.Scheme = agentURL.Scheme
req.URL.Host = agentURL.Host req.URL.Host = agentURL.Host
req.URL.Path = path.Join("/proxy", sandboxIDStr, port, path.Clean("/"+req.URL.Path)) // Use string concatenation instead of path.Join to preserve trailing
// slashes. path.Join strips them, causing redirect loops for directory
// listings in apps like python http.server and Jupyter.
req.URL.Path = "/proxy/" + sandboxIDStr + "/" + port + req.URL.Path
req.Host = agentURL.Host req.Host = agentURL.Host
}, },
ModifyResponse: func(resp *http.Response) error {
if loc := resp.Header.Get("Location"); loc != "" {
loc = strings.TrimPrefix(loc, agentProxyPrefix)
resp.Header.Set("Location", loc)
}
return nil
},
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
slog.Debug("sandbox proxy error", slog.Debug("sandbox proxy error",
"sandbox_id", sandboxIDStr, "sandbox_id", sandboxIDStr,

View File

@ -404,10 +404,10 @@ func (h *meHandler) ConnectProvider(w http.ResponseWriter, r *http.Request) {
return return
} }
mac := computeHMAC(h.jwtSecret, state) mac := computeHMAC(h.jwtSecret, state+":"+"login")
http.SetCookie(w, &http.Cookie{ http.SetCookie(w, &http.Cookie{
Name: "oauth_state", Name: "oauth_state",
Value: state + ":" + mac, Value: state + ":" + mac + ":" + "login",
Path: "/", Path: "/",
MaxAge: 600, MaxAge: 600,
HttpOnly: true, HttpOnly: true,

View File

@ -311,10 +311,17 @@ func runPtyLoop(
} }
}() }()
// Input pump: read from WebSocket, dispatch to host agent. // Input pump: decouple WebSocket reads from RPC dispatch.
// Reader goroutine drains the WebSocket into a buffered channel;
// sender goroutine dispatches RPCs at its own pace. This prevents
// slow RPCs from stalling WebSocket reads and causing proxy timeouts.
inputCh := make(chan wsPtyIn, 64)
// Reader: drain WebSocket as fast as possible.
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
defer close(inputCh)
defer cancel() defer cancel()
for { for {
@ -328,6 +335,22 @@ func runPtyLoop(
continue continue
} }
select {
case inputCh <- msg:
default:
// Buffer full — drop frame to keep reader unblocked.
slog.Debug("pty input buffer full, dropping frame", "type", msg.Type)
}
}
}()
// Sender: dispatch RPCs from channel, coalescing consecutive input messages.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
for msg := range inputCh {
// Use a background context for unary RPCs so they complete // Use a background context for unary RPCs so they complete
// even if the stream context is being cancelled. // even if the stream context is being cancelled.
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second) rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
@ -339,6 +362,10 @@ func runPtyLoop(
rpcCancel() rpcCancel()
continue continue
} }
// Coalesce: drain any queued input messages into a single RPC.
data = coalescePtyInput(inputCh, data)
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{ if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
SandboxId: sandboxID, SandboxId: sandboxID,
Tag: tag, Tag: tag,
@ -394,6 +421,33 @@ func runPtyLoop(
wg.Wait() wg.Wait()
} }
// coalescePtyInput drains any immediately-available "input" messages from the
// channel and appends their decoded data to buf, reducing RPC call volume
// during bursts of fast typing.
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
for {
select {
case msg, ok := <-ch:
if !ok {
return buf
}
if msg.Type != "input" {
// Non-input message — can't coalesce. Put-back isn't possible
// with channels, but resize/kill during a typing burst is rare
// enough that dropping one is acceptable.
return buf
}
data, err := base64.StdEncoding.DecodeString(msg.Data)
if err != nil {
continue
}
buf = append(buf, data...)
default:
return buf
}
}
}
// newPtyTag returns a PTY session tag: "pty-" + 8 random hex chars. // newPtyTag returns a PTY session tag: "pty-" + 8 random hex chars.
func newPtyTag() string { func newPtyTag() string {
return "pty-" + id.NewPtyTag() return "pty-" + id.NewPtyTag()

View File

@ -3,8 +3,6 @@ package api
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"strings"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -14,11 +12,6 @@ import (
"git.omukk.dev/wrenn/wrenn/pkg/id" "git.omukk.dev/wrenn/wrenn/pkg/id"
) )
// isWebSocketUpgrade returns true if the request is a WebSocket upgrade.
func isWebSocketUpgrade(r *http.Request) bool {
return strings.EqualFold(r.Header.Get("Upgrade"), "websocket")
}
// ctxKeyAdminWS is a context key for flagging admin WS routes. // ctxKeyAdminWS is a context key for flagging admin WS routes.
type ctxKeyAdminWS struct{} type ctxKeyAdminWS struct{}

View File

@ -15,7 +15,6 @@ func injectPlatformTeam() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if _, ok := auth.FromContext(r.Context()); !ok { if _, ok := auth.FromContext(r.Context()); !ok {
// No auth context yet (WS upgrade); handler will inject platform team after WS auth.
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
return return
} }
@ -27,23 +26,24 @@ func injectPlatformTeam() func(http.Handler) http.Handler {
} }
} }
// markAdminWS flags the request context as an admin WebSocket route.
// Applied to admin WS endpoints that sit outside the requireJWT/requireAdmin
// middleware group. Handlers use isAdminWSRoute(ctx) to pick wsAuthenticateAdmin.
func markAdminWS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r.WithContext(setAdminWSFlag(r.Context())))
})
}
// requireAdmin validates that the authenticated user is a platform admin. // requireAdmin validates that the authenticated user is a platform admin.
// Must run after requireJWT (depends on AuthContext being present). // Must run after requireJWT (depends on AuthContext being present).
// Re-validates against the DB — the JWT is_admin claim is for UI only; // Re-validates against the DB — the JWT is_admin claim is for UI only;
// the DB is the source of truth for admin access. // the DB is the source of truth for admin access.
// WebSocket upgrade requests without auth context are passed through —
// admin WS handlers verify admin status after upgrade via wsAuthenticateAdmin.
func requireAdmin(queries *db.Queries) func(http.Handler) http.Handler { func requireAdmin(queries *db.Queries) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ac, ok := auth.FromContext(r.Context()) ac, ok := auth.FromContext(r.Context())
if !ok { if !ok {
if isWebSocketUpgrade(r) {
ctx := r.Context()
ctx = setAdminWSFlag(ctx)
next.ServeHTTP(w, r.WithContext(ctx))
return
}
writeError(w, http.StatusUnauthorized, "unauthorized", "authentication required") writeError(w, http.StatusUnauthorized, "unauthorized", "authentication required")
return return
} }

View File

@ -85,15 +85,61 @@ func requireAPIKeyOrJWT(queries *db.Queries, jwtSecret []byte) func(http.Handler
return return
} }
// WebSocket upgrade requests may not carry auth headers (browsers
// cannot set custom headers on WS connections). Pass through —
// the WS handler authenticates via the first message after upgrade.
if isWebSocketUpgrade(r) {
next.ServeHTTP(w, r)
return
}
writeError(w, http.StatusUnauthorized, "unauthorized", "X-API-Key or Authorization: Bearer <token> required") writeError(w, http.StatusUnauthorized, "unauthorized", "X-API-Key or Authorization: Bearer <token> required")
}) })
} }
} }
// optionalAPIKeyOrJWT is like requireAPIKeyOrJWT but does not reject
// unauthenticated requests. It injects auth context when valid credentials
// are present (supporting SDK clients that set X-API-Key on WebSocket
// upgrades) and passes through otherwise so the handler can authenticate
// after the WebSocket upgrade via the first message.
func optionalAPIKeyOrJWT(queries *db.Queries, jwtSecret []byte) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Try API key.
if key := r.Header.Get("X-API-Key"); key != "" {
hash := auth.HashAPIKey(key)
row, err := queries.GetAPIKeyByHash(r.Context(), hash)
if err == nil {
if err := queries.UpdateAPIKeyLastUsed(r.Context(), row.ID); err != nil {
slog.Warn("failed to update api key last_used", "key_id", id.FormatAPIKeyID(row.ID), "error", err)
}
ctx := auth.WithAuthContext(r.Context(), auth.AuthContext{
TeamID: row.TeamID,
APIKeyID: row.ID,
APIKeyName: row.Name,
})
next.ServeHTTP(w, r.WithContext(ctx))
return
}
}
// Try JWT bearer token.
if header := r.Header.Get("Authorization"); strings.HasPrefix(header, "Bearer ") {
tokenStr := strings.TrimPrefix(header, "Bearer ")
if claims, err := auth.VerifyJWT(jwtSecret, tokenStr); err == nil {
if teamID, err := id.ParseTeamID(claims.TeamID); err == nil {
if userID, err := id.ParseUserID(claims.Subject); err == nil {
if user, err := queries.GetUserByID(r.Context(), userID); err == nil && user.Status == "active" {
ctx := auth.WithAuthContext(r.Context(), auth.AuthContext{
TeamID: teamID,
UserID: userID,
Email: claims.Email,
Name: claims.Name,
Role: claims.Role,
})
next.ServeHTTP(w, r.WithContext(ctx))
return
}
}
}
}
}
// No valid credentials — pass through for handler to authenticate.
next.ServeHTTP(w, r)
})
}
}

View File

@ -22,13 +22,6 @@ func requireJWT(secret []byte, queries *db.Queries) func(http.Handler) http.Hand
tokenStr = strings.TrimPrefix(header, "Bearer ") tokenStr = strings.TrimPrefix(header, "Bearer ")
} }
if tokenStr == "" { if tokenStr == "" {
// WebSocket upgrade requests may not have an Authorization header
// (browsers cannot set custom headers on WS connections). Let them
// through — the handler authenticates via the first WS message.
if isWebSocketUpgrade(r) {
next.ServeHTTP(w, r)
return
}
writeError(w, http.StatusUnauthorized, "unauthorized", "Authorization: Bearer <token> required") writeError(w, http.StatusUnauthorized, "unauthorized", "Authorization: Bearer <token> required")
return return
} }

View File

@ -2,7 +2,7 @@ openapi: "3.1.0"
info: info:
title: Wrenn API title: Wrenn API
description: MicroVM-based code execution platform API. description: MicroVM-based code execution platform API.
version: "0.1.3" version: "0.1.4"
servers: servers:
- url: http://localhost:8080 - url: http://localhost:8080

View File

@ -161,20 +161,23 @@ func New(
r.With(requireJWT(jwtSecret, queries)).Get("/v1/users/search", usersH.Search) r.With(requireJWT(jwtSecret, queries)).Get("/v1/users/search", usersH.Search)
// Capsule lifecycle: accepts API key or JWT bearer token. // Capsule lifecycle: accepts API key or JWT bearer token.
// WebSocket upgrade requests without auth headers are passed through by
// requireAPIKeyOrJWT — the WS handlers authenticate via first message.
r.Route("/v1/capsules", func(r chi.Router) { r.Route("/v1/capsules", func(r chi.Router) {
// Auth-required routes.
r.Group(func(r chi.Router) {
r.Use(requireAPIKeyOrJWT(queries, jwtSecret)) r.Use(requireAPIKeyOrJWT(queries, jwtSecret))
r.Post("/", sandbox.Create) r.Post("/", sandbox.Create)
r.Get("/", sandbox.List) r.Get("/", sandbox.List)
r.Get("/stats", statsH.GetStats) r.Get("/stats", statsH.GetStats)
r.Get("/usage", usageH.GetUsage) r.Get("/usage", usageH.GetUsage)
})
r.Route("/{id}", func(r chi.Router) { r.Route("/{id}", func(r chi.Router) {
// Auth-required non-WS routes.
r.Group(func(r chi.Router) {
r.Use(requireAPIKeyOrJWT(queries, jwtSecret))
r.Get("/", sandbox.Get) r.Get("/", sandbox.Get)
r.Delete("/", sandbox.Destroy) r.Delete("/", sandbox.Destroy)
r.Post("/exec", exec.Exec) r.Post("/exec", exec.Exec)
r.Get("/exec/stream", execStream.ExecStream)
r.Post("/ping", sandbox.Ping) r.Post("/ping", sandbox.Ping)
r.Post("/pause", sandbox.Pause) r.Post("/pause", sandbox.Pause)
r.Post("/resume", sandbox.Resume) r.Post("/resume", sandbox.Resume)
@ -186,12 +189,21 @@ func New(
r.Post("/files/mkdir", fsH.MakeDir) r.Post("/files/mkdir", fsH.MakeDir)
r.Post("/files/remove", fsH.Remove) r.Post("/files/remove", fsH.Remove)
r.Get("/metrics", metricsH.GetMetrics) r.Get("/metrics", metricsH.GetMetrics)
r.Get("/pty", ptyH.PtySession)
r.Get("/processes", processH.ListProcesses) r.Get("/processes", processH.ListProcesses)
r.Delete("/processes/{selector}", processH.KillProcess) r.Delete("/processes/{selector}", processH.KillProcess)
})
// WebSocket endpoints — handlers authenticate after upgrade.
// optionalAPIKeyOrJWT injects auth context from headers when
// present (SDK clients) but does not reject when absent (browsers).
r.Group(func(r chi.Router) {
r.Use(optionalAPIKeyOrJWT(queries, jwtSecret))
r.Get("/exec/stream", execStream.ExecStream)
r.Get("/pty", ptyH.PtySession)
r.Get("/processes/{selector}/stream", processH.ConnectProcess) r.Get("/processes/{selector}/stream", processH.ConnectProcess)
}) })
}) })
})
// Snapshot / template management: accepts API key or JWT bearer token. // Snapshot / template management: accepts API key or JWT bearer token.
r.Route("/v1/snapshots", func(r chi.Router) { r.Route("/v1/snapshots", func(r chi.Router) {
@ -248,6 +260,8 @@ func New(
// Platform admin routes — require JWT + DB-validated admin status. // Platform admin routes — require JWT + DB-validated admin status.
r.Route("/v1/admin", func(r chi.Router) { r.Route("/v1/admin", func(r chi.Router) {
// Auth-required admin routes (non-capsule + capsule list/create).
r.Group(func(r chi.Router) {
r.Use(requireJWT(jwtSecret, queries)) r.Use(requireJWT(jwtSecret, queries))
r.Use(requireAdmin(queries)) r.Use(requireAdmin(queries))
r.Get("/teams", teamH.AdminListTeams) r.Get("/teams", teamH.AdminListTeams)
@ -264,25 +278,39 @@ func New(
r.Post("/builds/{id}/cancel", buildH.Cancel) r.Post("/builds/{id}/cancel", buildH.Cancel)
r.Post("/capsules", adminCapsules.Create) r.Post("/capsules", adminCapsules.Create)
r.Get("/capsules", adminCapsules.List) r.Get("/capsules", adminCapsules.List)
})
r.Route("/capsules/{id}", func(r chi.Router) { r.Route("/capsules/{id}", func(r chi.Router) {
// Auth-required non-WS admin capsule routes.
r.Group(func(r chi.Router) {
r.Use(requireJWT(jwtSecret, queries))
r.Use(requireAdmin(queries))
r.Use(injectPlatformTeam()) r.Use(injectPlatformTeam())
r.Get("/", adminCapsules.Get) r.Get("/", adminCapsules.Get)
r.Delete("/", adminCapsules.Destroy) r.Delete("/", adminCapsules.Destroy)
r.Post("/snapshot", adminCapsules.Snapshot) r.Post("/snapshot", adminCapsules.Snapshot)
r.Post("/exec", exec.Exec) r.Post("/exec", exec.Exec)
r.Get("/exec/stream", execStream.ExecStream)
r.Post("/files/write", files.Upload) r.Post("/files/write", files.Upload)
r.Post("/files/read", files.Download) r.Post("/files/read", files.Download)
r.Post("/files/list", fsH.ListDir) r.Post("/files/list", fsH.ListDir)
r.Post("/files/mkdir", fsH.MakeDir) r.Post("/files/mkdir", fsH.MakeDir)
r.Post("/files/remove", fsH.Remove) r.Post("/files/remove", fsH.Remove)
r.Get("/metrics", metricsH.GetMetrics) r.Get("/metrics", metricsH.GetMetrics)
r.Get("/pty", ptyH.PtySession)
r.Get("/processes", processH.ListProcesses) r.Get("/processes", processH.ListProcesses)
r.Delete("/processes/{selector}", processH.KillProcess) r.Delete("/processes/{selector}", processH.KillProcess)
})
// Admin WebSocket endpoints — handlers authenticate after upgrade
// via wsAuthenticateAdmin. markAdminWS sets the context flag so
// handlers know to use admin auth instead of regular auth.
r.Group(func(r chi.Router) {
r.Use(markAdminWS)
r.Get("/exec/stream", execStream.ExecStream)
r.Get("/pty", ptyH.PtySession)
r.Get("/processes/{selector}/stream", processH.ConnectProcess) r.Get("/processes/{selector}/stream", processH.ConnectProcess)
}) })
}) })
})
// Let extensions register their routes after all core routes. // Let extensions register their routes after all core routes.
for _, ext := range extensions { for _, ext := range extensions {

View File

@ -48,6 +48,13 @@ func (c *Client) BaseURL() string {
return c.base return c.base
} }
// HTTPClient returns the underlying http.Client used for envd requests.
// Use this instead of http.DefaultClient when making direct HTTP calls to envd
// (e.g. file streaming) to avoid sharing the global transport with proxy traffic.
func (c *Client) HTTPClient() *http.Client {
return c.httpClient
}
// ExecResult holds the output of a command execution. // ExecResult holds the output of a command execution.
type ExecResult struct { type ExecResult struct {
Stdout []byte Stdout []byte
@ -142,7 +149,7 @@ func (c *Client) ExecStream(ctx context.Context, cmd string, args ...string) (<-
return nil, fmt.Errorf("start process: %w", err) return nil, fmt.Errorf("start process: %w", err)
} }
ch := make(chan ExecStreamEvent, 16) ch := make(chan ExecStreamEvent, 256)
go func() { go func() {
defer close(ch) defer close(ch)
defer stream.Close() defer stream.Close()

View File

@ -2,7 +2,9 @@ package envdclient
import ( import (
"fmt" "fmt"
"net"
"net/http" "net/http"
"time"
) )
// envdPort is the default port envd listens on inside the guest. // envdPort is the default port envd listens on inside the guest.
@ -13,9 +15,19 @@ func baseURL(hostIP string) string {
return fmt.Sprintf("http://%s:%d", hostIP, envdPort) return fmt.Sprintf("http://%s:%d", hostIP, envdPort)
} }
// newHTTPClient returns an http.Client suitable for talking to envd. // newHTTPClient returns an http.Client with a dedicated transport for talking
// No special transport is needed — envd is reachable via the host IP // to envd. The transport is intentionally separate from http.DefaultTransport
// through the veth/TAP network path. // so that proxy traffic to user services inside the sandbox cannot interfere
// with envd RPC connections (PTY streams, exec, file ops).
func newHTTPClient() *http.Client { func newHTTPClient() *http.Client {
return &http.Client{} return &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
}
} }

View File

@ -162,7 +162,7 @@ type eventProvider interface {
// drainPtyStream reads events from either a Start or Connect stream and maps // drainPtyStream reads events from either a Start or Connect stream and maps
// them into PtyEvent values on a channel. // them into PtyEvent values on a channel.
func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent { func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent {
ch := make(chan PtyEvent, 16) ch := make(chan PtyEvent, 256)
go func() { go func() {
defer close(ch) defer close(ch)
defer stream.Close() defer stream.Close()

View File

@ -1,16 +1,28 @@
package hostagent package hostagent
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time"
"git.omukk.dev/wrenn/wrenn/internal/sandbox" "git.omukk.dev/wrenn/wrenn/internal/sandbox"
) )
const (
// proxyDialAttempts is the number of connection attempts for the proxy
// transport. Retries handle the delay between a process binding to a port
// inside the guest and socat/Go-proxy starting to forward on the TAP IP.
proxyDialAttempts = 3
)
// ProxyHandler reverse-proxies HTTP requests to services running inside // ProxyHandler reverse-proxies HTTP requests to services running inside
// sandboxes. It handles requests of the form: // sandboxes. It handles requests of the form:
// //
@ -21,16 +33,75 @@ import (
type ProxyHandler struct { type ProxyHandler struct {
mgr *sandbox.Manager mgr *sandbox.Manager
transport http.RoundTripper transport http.RoundTripper
// proxies caches ReverseProxy instances per sandbox+port to avoid
// per-request allocation under high-frequency REST polling.
proxies sync.Map // key: "sandboxID/port" → *httputil.ReverseProxy
}
// newProxyTransport returns an HTTP transport dedicated to proxying user
// traffic into sandboxes. It is intentionally separate from the envdclient
// transport and http.DefaultTransport to prevent proxy traffic from
// interfering with Connect RPC streams (PTY, exec).
func newProxyTransport() http.RoundTripper {
dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 20 * time.Second,
}
return &http.Transport{
ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking
MaxIdleConnsPerHost: 20,
MaxIdleConns: 100,
IdleConnTimeout: 120 * time.Second,
DisableCompression: true,
// Retry with linear backoff to handle the delay between a process
// binding inside the guest and the port forwarder making it reachable.
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
var conn net.Conn
var err error
for attempt := range proxyDialAttempts {
conn, err = dialer.DialContext(ctx, network, addr)
if err == nil {
return conn, nil
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Don't sleep on the last attempt.
if attempt < proxyDialAttempts-1 {
backoff := time.Duration(100*(attempt+1)) * time.Millisecond
select {
case <-time.After(backoff):
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
return nil, err
},
}
} }
// NewProxyHandler creates a new sandbox proxy handler. // NewProxyHandler creates a new sandbox proxy handler.
func NewProxyHandler(mgr *sandbox.Manager) *ProxyHandler { func NewProxyHandler(mgr *sandbox.Manager) *ProxyHandler {
return &ProxyHandler{ return &ProxyHandler{
mgr: mgr, mgr: mgr,
transport: http.DefaultTransport, transport: newProxyTransport(),
} }
} }
// EvictProxy removes cached reverse proxy instances for a sandbox.
// Call this when a sandbox is destroyed.
func (h *ProxyHandler) EvictProxy(sandboxID string) {
h.proxies.Range(func(key, _ any) bool {
if k, ok := key.(string); ok && strings.HasPrefix(k, sandboxID+"/") {
h.proxies.Delete(key)
}
return true
})
}
// ServeHTTP implements http.Handler. // ServeHTTP implements http.Handler.
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Expected path: /proxy/{sandbox_id}/{port}/... // Expected path: /proxy/{sandbox_id}/{port}/...
@ -49,10 +120,6 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sandboxID := parts[0] sandboxID := parts[0]
port := parts[1] port := parts[1]
remainder := ""
if len(parts) == 3 {
remainder = parts[2]
}
// Validate port is a number in the valid range. // Validate port is a number in the valid range.
portNum, err := strconv.Atoi(port) portNum, err := strconv.Atoi(port)
@ -68,22 +135,61 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
defer tracker.Release() defer tracker.Release()
targetHost := fmt.Sprintf("%s:%d", hostIP, portNum) proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum))
proxy.ServeHTTP(w, r)
}
// getOrCreateProxy returns a cached ReverseProxy for the given sandbox+port+host,
// creating one if it doesn't exist. The targetHost is included in the key so
// that an IP change after pause/resume naturally misses the old entry.
func (h *ProxyHandler) getOrCreateProxy(sandboxID, port, targetHost string) *httputil.ReverseProxy {
cacheKey := sandboxID + "/" + port + "/" + targetHost
if v, ok := h.proxies.Load(cacheKey); ok {
return v.(*httputil.ReverseProxy)
}
proxyPrefix := "/proxy/" + sandboxID + "/" + port
proxy := &httputil.ReverseProxy{ proxy := &httputil.ReverseProxy{
Transport: h.transport, Transport: h.transport,
Director: func(req *http.Request) { Director: func(req *http.Request) {
// Extract remainder from the original path: /proxy/{id}/{port}/{remainder}
remainder := ""
if trimmed := strings.TrimPrefix(req.URL.Path, proxyPrefix); trimmed != req.URL.Path {
remainder = strings.TrimPrefix(trimmed, "/")
}
req.URL.Scheme = "http" req.URL.Scheme = "http"
req.URL.Host = targetHost req.URL.Host = targetHost
req.URL.Path = "/" + remainder req.URL.Path = "/" + remainder
req.URL.RawQuery = r.URL.RawQuery
req.Host = targetHost req.Host = targetHost
}, },
// Rewrite redirect Location headers so they include the /proxy/{id}/{port}
// prefix. Handles both root-relative (/path) and absolute-URL redirects
// (http://internal-ip:port/path) that would otherwise leak internal IPs
// or break directory navigation.
ModifyResponse: func(resp *http.Response) error {
loc := resp.Header.Get("Location")
if loc == "" {
return nil
}
if strings.HasPrefix(loc, "/") {
resp.Header.Set("Location", proxyPrefix+loc)
return nil
}
// Rewrite absolute URLs pointing to the internal target host.
if u, err := url.Parse(loc); err == nil && u.Host == targetHost {
resp.Header.Set("Location", proxyPrefix+u.RequestURI())
}
return nil
},
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
slog.Debug("proxy error", "sandbox_id", sandboxID, "port", port, "error", err) slog.Debug("proxy error", "sandbox_id", sandboxID, "port", port, "error", err)
http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway) http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway)
}, },
} }
proxy.ServeHTTP(w, r) actual, _ := h.proxies.LoadOrStore(cacheKey, proxy)
return actual.(*httputil.ReverseProxy)
} }

View File

@ -459,7 +459,7 @@ func (s *Server) WriteFileStream(
} }
httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType())
resp, err := http.DefaultClient.Do(httpReq) resp, err := client.HTTPClient().Do(httpReq)
if err != nil { if err != nil {
pw.CloseWithError(err) pw.CloseWithError(err)
<-errCh <-errCh
@ -504,7 +504,7 @@ func (s *Server) ReadFileStream(
return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err)) return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
} }
resp, err := http.DefaultClient.Do(httpReq) resp, err := client.HTTPClient().Do(httpReq)
if err != nil { if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err)) return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err))
} }

View File

@ -269,6 +269,7 @@ func CreateNetwork(slot *Slot) error {
// Create TAP device inside namespace. // Create TAP device inside namespace.
tapAttrs := netlink.NewLinkAttrs() tapAttrs := netlink.NewLinkAttrs()
tapAttrs.Name = tapName tapAttrs.Name = tapName
tapAttrs.TxQLen = 5000 // Up from default 1000 to reduce drops under bursty traffic.
tap := &netlink.Tuntap{ tap := &netlink.Tuntap{
LinkAttrs: tapAttrs, LinkAttrs: tapAttrs,
Mode: netlink.TUNTAP_MODE_TAP, Mode: netlink.TUNTAP_MODE_TAP,

View File

@ -53,6 +53,15 @@ type Manager struct {
autoPausedMu sync.Mutex autoPausedMu sync.Mutex
autoPausedIDs []string autoPausedIDs []string
// onDestroy is called with the sandbox ID after cleanup completes.
// Used by ProxyHandler to evict cached reverse proxies.
onDestroy func(sandboxID string)
}
// SetOnDestroy registers a callback invoked after each sandbox is cleaned up.
func (m *Manager) SetOnDestroy(fn func(sandboxID string)) {
m.onDestroy = fn
} }
// sandboxState holds the runtime state for a single sandbox. // sandboxState holds the runtime state for a single sandbox.
@ -314,6 +323,10 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
slog.Warn("snapshot cleanup error", "id", sandboxID, "error", err) slog.Warn("snapshot cleanup error", "id", sandboxID, "error", err)
} }
if m.onDestroy != nil {
m.onDestroy(sandboxID)
}
slog.Info("sandbox destroyed", "id", sandboxID) slog.Info("sandbox destroyed", "id", sandboxID)
return nil return nil
} }
@ -363,6 +376,11 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status) return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
} }
// Stop the metrics sampler goroutine before tearing down any resources
// it reads (dm device, Firecracker PID). Without this, the sampler
// leaks on every successful pause.
m.stopSampler(sb)
// Step 0: Drain in-flight proxy connections before freezing vCPUs. // Step 0: Drain in-flight proxy connections before freezing vCPUs.
// This prevents Go runtime corruption inside the guest caused by stale // This prevents Go runtime corruption inside the guest caused by stale
// TCP state from connections that were alive when the VM was snapshotted. // TCP state from connections that were alive when the VM was snapshotted.

View File

@ -84,11 +84,21 @@ func (c *fcClient) setRootfsDrive(ctx context.Context, driveID, path string, rea
} }
// setNetworkInterface configures a network interface attached to a TAP device. // setNetworkInterface configures a network interface attached to a TAP device.
// A tx_rate_limiter caps sustained guest→host throughput to prevent user
// application traffic from completely saturating the TAP device and starving
// envd control traffic (PTY, exec, file ops).
func (c *fcClient) setNetworkInterface(ctx context.Context, ifaceID, tapName, macAddr string) error { func (c *fcClient) setNetworkInterface(ctx context.Context, ifaceID, tapName, macAddr string) error {
return c.do(ctx, http.MethodPut, "/network-interfaces/"+ifaceID, map[string]any{ return c.do(ctx, http.MethodPut, "/network-interfaces/"+ifaceID, map[string]any{
"iface_id": ifaceID, "iface_id": ifaceID,
"host_dev_name": tapName, "host_dev_name": tapName,
"guest_mac": macAddr, "guest_mac": macAddr,
"tx_rate_limiter": map[string]any{
"bandwidth": map[string]any{
"size": 209715200, // 200 MB/s sustained
"refill_time": 1000, // refill period: 1 second
"one_time_burst": 104857600, // 100 MB initial burst
},
},
}) })
} }

View File

@ -3,6 +3,7 @@ package lifecycle
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@ -115,6 +116,34 @@ func (p *HostClientPool) ResolveAddr(addr string) string {
return p.ensureScheme(addr) return p.ensureScheme(addr)
} }
// NewProxyTransport returns a new http.RoundTripper configured for proxying
// user traffic to sandbox services. It is intentionally separate from the RPC
// transport returned by Transport() so that heavy proxy traffic (Jupyter
// WebSocket, REST API polling) cannot interfere with Connect RPC streams (PTY,
// exec) via HTTP/2 flow control or connection pool contention.
func (p *HostClientPool) NewProxyTransport() http.RoundTripper {
t := &http.Transport{
ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking
MaxIdleConnsPerHost: 20,
MaxIdleConns: 100,
IdleConnTimeout: 120 * time.Second,
DisableCompression: true,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 20 * time.Second,
}).DialContext,
}
// If the pool uses TLS, the proxy transport must too.
if p.httpClient.Transport != nil {
if ht, ok := p.httpClient.Transport.(*http.Transport); ok && ht.TLSClientConfig != nil {
t.TLSClientConfig = ht.TLSClientConfig.Clone()
}
}
return t
}
// EnsureScheme adds "http://" if the address has no scheme. // EnsureScheme adds "http://" if the address has no scheme.
// Deprecated: use pool.ResolveAddr which respects the pool's TLS setting. // Deprecated: use pool.ResolveAddr which respects the pool's TLS setting.
func EnsureScheme(addr string) string { func EnsureScheme(addr string) string {