From bd986101538e44dbd57889f0e3b0fde532dcf3a7 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sat, 25 Apr 2026 04:21:55 +0600 Subject: [PATCH] fix: sandbox network responsiveness under port-binding apps Running port-binding applications (Jupyter, http.server, NextJS) inside sandboxes caused severe PTY sluggishness and proxy navigation errors. Root cause: the CP sandbox proxy and Connect RPC pool shared a single HTTP transport. Heavy proxy traffic (Jupyter WebSocket, REST polling) interfered with PTY RPC streams via HTTP/2 flow control contention. Transport isolation (main fix): - Add dedicated proxy transport on CP (NewProxyTransport) with HTTP/2 disabled, separate from the RPC pool transport - Add dedicated proxy transport on host agent, replacing http.DefaultTransport - Add dedicated envdclient transport with tuned connection pooling - Replace http.DefaultClient in file streaming RPCs with per-sandbox envd client Proxy path rewriting (navigation fix): - Add ModifyResponse to rewrite Location headers with /proxy/{id}/{port} prefix, handling both root-relative and absolute-URL redirects - Strip prefix back out in CP subdomain proxy for correct browser behavior - Replace path.Join with string concat in CP Director to preserve trailing slashes (prevents redirect loops on directory listings) Proxy resilience: - Add dial retry with linear backoff (3 attempts) to handle socat startup delay when ports are first detected - Cache ReverseProxy instances per sandbox+port+host in sync.Map - Add EvictProxy callback wired into sandbox Manager.Destroy Buffer and server hardening: - Increase PTY and exec stream channel buffers from 16 to 256 - Add ReadHeaderTimeout (10s) and IdleTimeout (620s) to host agent HTTP server Network tuning: - Set TAP device TxQueueLen to 5000 (up from default 1000) - Add Firecracker tx_rate_limiter (200 MB/s sustained, 100 MB burst) to prevent guest traffic from saturating the TAP --- cmd/host-agent/main.go | 9 +- internal/api/handler_sandbox_proxy.go | 20 ++++- internal/envdclient/client.go | 9 +- internal/envdclient/dialer.go | 20 ++++- internal/envdclient/pty.go | 2 +- internal/hostagent/proxy.go | 122 ++++++++++++++++++++++++-- internal/hostagent/server.go | 4 +- internal/network/setup.go | 1 + internal/sandbox/manager.go | 13 +++ internal/vm/fc.go | 10 +++ pkg/lifecycle/hostpool.go | 29 ++++++ 11 files changed, 219 insertions(+), 20 deletions(-) diff --git a/cmd/host-agent/main.go b/cmd/host-agent/main.go index 5896c2c..89d65da 100644 --- a/cmd/host-agent/main.go +++ b/cmd/host-agent/main.go @@ -148,7 +148,13 @@ func main() { slog.Info("host registered", "host_id", creds.HostID) // httpServer is declared here so the shutdown func can reference it. - httpServer := &http.Server{Addr: listenAddr} + // ReadTimeout/WriteTimeout are intentionally omitted — they would kill + // long-lived Connect RPC streams and WebSocket proxy connections. + httpServer := &http.Server{ + Addr: listenAddr, + ReadHeaderTimeout: 10 * time.Second, + IdleTimeout: 620 * time.Second, // > typical LB upstream timeout (600s) + } // mTLS is mandatory — refuse to start without a valid certificate. var certStore hostagent.CertStore @@ -193,6 +199,7 @@ func main() { path, handler := hostagentv1connect.NewHostAgentServiceHandler(srv) proxyHandler := hostagent.NewProxyHandler(mgr) + mgr.SetOnDestroy(proxyHandler.EvictProxy) mux := http.NewServeMux() mux.Handle(path, handler) diff --git a/internal/api/handler_sandbox_proxy.go b/internal/api/handler_sandbox_proxy.go index 5e3754d..523513c 100644 --- a/internal/api/handler_sandbox_proxy.go +++ b/internal/api/handler_sandbox_proxy.go @@ -8,7 +8,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "path" "regexp" "strconv" "strings" @@ -74,7 +73,7 @@ func NewSandboxProxyWrapper(inner http.Handler, queries *db.Queries, pool *lifec inner: inner, db: queries, pool: pool, - transport: pool.Transport(), + transport: pool.NewProxyTransport(), cache: make(map[pgtype.UUID]proxyCacheEntry), } } @@ -167,14 +166,29 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) return } + // The host agent's proxy adds a /proxy/{id}/{port} prefix to Location + // headers for path-based routing. For subdomain routing the browser is at + // {port}-{id}.domain, so we strip the prefix back out. + agentProxyPrefix := "/proxy/" + sandboxIDStr + "/" + port + proxy := &httputil.ReverseProxy{ Transport: h.transport, Director: func(req *http.Request) { req.URL.Scheme = agentURL.Scheme req.URL.Host = agentURL.Host - req.URL.Path = path.Join("/proxy", sandboxIDStr, port, path.Clean("/"+req.URL.Path)) + // Use string concatenation instead of path.Join to preserve trailing + // slashes. path.Join strips them, causing redirect loops for directory + // listings in apps like python http.server and Jupyter. + req.URL.Path = "/proxy/" + sandboxIDStr + "/" + port + req.URL.Path req.Host = agentURL.Host }, + ModifyResponse: func(resp *http.Response) error { + if loc := resp.Header.Get("Location"); loc != "" { + loc = strings.TrimPrefix(loc, agentProxyPrefix) + resp.Header.Set("Location", loc) + } + return nil + }, ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { slog.Debug("sandbox proxy error", "sandbox_id", sandboxIDStr, diff --git a/internal/envdclient/client.go b/internal/envdclient/client.go index 03994b2..294a37e 100644 --- a/internal/envdclient/client.go +++ b/internal/envdclient/client.go @@ -48,6 +48,13 @@ func (c *Client) BaseURL() string { return c.base } +// HTTPClient returns the underlying http.Client used for envd requests. +// Use this instead of http.DefaultClient when making direct HTTP calls to envd +// (e.g. file streaming) to avoid sharing the global transport with proxy traffic. +func (c *Client) HTTPClient() *http.Client { + return c.httpClient +} + // ExecResult holds the output of a command execution. type ExecResult struct { Stdout []byte @@ -142,7 +149,7 @@ func (c *Client) ExecStream(ctx context.Context, cmd string, args ...string) (<- return nil, fmt.Errorf("start process: %w", err) } - ch := make(chan ExecStreamEvent, 16) + ch := make(chan ExecStreamEvent, 256) go func() { defer close(ch) defer stream.Close() diff --git a/internal/envdclient/dialer.go b/internal/envdclient/dialer.go index ea6492d..1813ceb 100644 --- a/internal/envdclient/dialer.go +++ b/internal/envdclient/dialer.go @@ -2,7 +2,9 @@ package envdclient import ( "fmt" + "net" "net/http" + "time" ) // envdPort is the default port envd listens on inside the guest. @@ -13,9 +15,19 @@ func baseURL(hostIP string) string { return fmt.Sprintf("http://%s:%d", hostIP, envdPort) } -// newHTTPClient returns an http.Client suitable for talking to envd. -// No special transport is needed — envd is reachable via the host IP -// through the veth/TAP network path. +// newHTTPClient returns an http.Client with a dedicated transport for talking +// to envd. The transport is intentionally separate from http.DefaultTransport +// so that proxy traffic to user services inside the sandbox cannot interfere +// with envd RPC connections (PTY streams, exec, file ops). func newHTTPClient() *http.Client { - return &http.Client{} + return &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + }, + } } diff --git a/internal/envdclient/pty.go b/internal/envdclient/pty.go index 7a625fb..f94a1b0 100644 --- a/internal/envdclient/pty.go +++ b/internal/envdclient/pty.go @@ -162,7 +162,7 @@ type eventProvider interface { // drainPtyStream reads events from either a Start or Connect stream and maps // them into PtyEvent values on a channel. func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent { - ch := make(chan PtyEvent, 16) + ch := make(chan PtyEvent, 256) go func() { defer close(ch) defer stream.Close() diff --git a/internal/hostagent/proxy.go b/internal/hostagent/proxy.go index 7a5097d..d7c875f 100644 --- a/internal/hostagent/proxy.go +++ b/internal/hostagent/proxy.go @@ -1,16 +1,28 @@ package hostagent import ( + "context" "fmt" "log/slog" + "net" "net/http" "net/http/httputil" + "net/url" "strconv" "strings" + "sync" + "time" "git.omukk.dev/wrenn/wrenn/internal/sandbox" ) +const ( + // proxyDialAttempts is the number of connection attempts for the proxy + // transport. Retries handle the delay between a process binding to a port + // inside the guest and socat/Go-proxy starting to forward on the TAP IP. + proxyDialAttempts = 3 +) + // ProxyHandler reverse-proxies HTTP requests to services running inside // sandboxes. It handles requests of the form: // @@ -21,16 +33,75 @@ import ( type ProxyHandler struct { mgr *sandbox.Manager transport http.RoundTripper + + // proxies caches ReverseProxy instances per sandbox+port to avoid + // per-request allocation under high-frequency REST polling. + proxies sync.Map // key: "sandboxID/port" → *httputil.ReverseProxy +} + +// newProxyTransport returns an HTTP transport dedicated to proxying user +// traffic into sandboxes. It is intentionally separate from the envdclient +// transport and http.DefaultTransport to prevent proxy traffic from +// interfering with Connect RPC streams (PTY, exec). +func newProxyTransport() http.RoundTripper { + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 20 * time.Second, + } + + return &http.Transport{ + ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking + MaxIdleConnsPerHost: 20, + MaxIdleConns: 100, + IdleConnTimeout: 120 * time.Second, + DisableCompression: true, + // Retry with linear backoff to handle the delay between a process + // binding inside the guest and the port forwarder making it reachable. + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + var conn net.Conn + var err error + for attempt := range proxyDialAttempts { + conn, err = dialer.DialContext(ctx, network, addr) + if err == nil { + return conn, nil + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Don't sleep on the last attempt. + if attempt < proxyDialAttempts-1 { + backoff := time.Duration(100*(attempt+1)) * time.Millisecond + select { + case <-time.After(backoff): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + } + return nil, err + }, + } } // NewProxyHandler creates a new sandbox proxy handler. func NewProxyHandler(mgr *sandbox.Manager) *ProxyHandler { return &ProxyHandler{ mgr: mgr, - transport: http.DefaultTransport, + transport: newProxyTransport(), } } +// EvictProxy removes cached reverse proxy instances for a sandbox. +// Call this when a sandbox is destroyed. +func (h *ProxyHandler) EvictProxy(sandboxID string) { + h.proxies.Range(func(key, _ any) bool { + if k, ok := key.(string); ok && strings.HasPrefix(k, sandboxID+"/") { + h.proxies.Delete(key) + } + return true + }) +} + // ServeHTTP implements http.Handler. func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Expected path: /proxy/{sandbox_id}/{port}/... @@ -49,10 +120,6 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { sandboxID := parts[0] port := parts[1] - remainder := "" - if len(parts) == 3 { - remainder = parts[2] - } // Validate port is a number in the valid range. portNum, err := strconv.Atoi(port) @@ -68,22 +135,61 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer tracker.Release() - targetHost := fmt.Sprintf("%s:%d", hostIP, portNum) + proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum)) + proxy.ServeHTTP(w, r) +} + +// getOrCreateProxy returns a cached ReverseProxy for the given sandbox+port+host, +// creating one if it doesn't exist. The targetHost is included in the key so +// that an IP change after pause/resume naturally misses the old entry. +func (h *ProxyHandler) getOrCreateProxy(sandboxID, port, targetHost string) *httputil.ReverseProxy { + cacheKey := sandboxID + "/" + port + "/" + targetHost + + if v, ok := h.proxies.Load(cacheKey); ok { + return v.(*httputil.ReverseProxy) + } + + proxyPrefix := "/proxy/" + sandboxID + "/" + port proxy := &httputil.ReverseProxy{ Transport: h.transport, Director: func(req *http.Request) { + // Extract remainder from the original path: /proxy/{id}/{port}/{remainder} + remainder := "" + if trimmed := strings.TrimPrefix(req.URL.Path, proxyPrefix); trimmed != req.URL.Path { + remainder = strings.TrimPrefix(trimmed, "/") + } + req.URL.Scheme = "http" req.URL.Host = targetHost req.URL.Path = "/" + remainder - req.URL.RawQuery = r.URL.RawQuery req.Host = targetHost }, + // Rewrite redirect Location headers so they include the /proxy/{id}/{port} + // prefix. Handles both root-relative (/path) and absolute-URL redirects + // (http://internal-ip:port/path) that would otherwise leak internal IPs + // or break directory navigation. + ModifyResponse: func(resp *http.Response) error { + loc := resp.Header.Get("Location") + if loc == "" { + return nil + } + if strings.HasPrefix(loc, "/") { + resp.Header.Set("Location", proxyPrefix+loc) + return nil + } + // Rewrite absolute URLs pointing to the internal target host. + if u, err := url.Parse(loc); err == nil && u.Host == targetHost { + resp.Header.Set("Location", proxyPrefix+u.RequestURI()) + } + return nil + }, ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { slog.Debug("proxy error", "sandbox_id", sandboxID, "port", port, "error", err) http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway) }, } - proxy.ServeHTTP(w, r) + actual, _ := h.proxies.LoadOrStore(cacheKey, proxy) + return actual.(*httputil.ReverseProxy) } diff --git a/internal/hostagent/server.go b/internal/hostagent/server.go index 663d2cb..e15ef0b 100644 --- a/internal/hostagent/server.go +++ b/internal/hostagent/server.go @@ -459,7 +459,7 @@ func (s *Server) WriteFileStream( } httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) - resp, err := http.DefaultClient.Do(httpReq) + resp, err := client.HTTPClient().Do(httpReq) if err != nil { pw.CloseWithError(err) <-errCh @@ -504,7 +504,7 @@ func (s *Server) ReadFileStream( return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err)) } - resp, err := http.DefaultClient.Do(httpReq) + resp, err := client.HTTPClient().Do(httpReq) if err != nil { return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err)) } diff --git a/internal/network/setup.go b/internal/network/setup.go index 3874c79..d68da89 100644 --- a/internal/network/setup.go +++ b/internal/network/setup.go @@ -269,6 +269,7 @@ func CreateNetwork(slot *Slot) error { // Create TAP device inside namespace. tapAttrs := netlink.NewLinkAttrs() tapAttrs.Name = tapName + tapAttrs.TxQLen = 5000 // Up from default 1000 to reduce drops under bursty traffic. tap := &netlink.Tuntap{ LinkAttrs: tapAttrs, Mode: netlink.TUNTAP_MODE_TAP, diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index a7ff69d..daa1dba 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -53,6 +53,15 @@ type Manager struct { autoPausedMu sync.Mutex autoPausedIDs []string + + // onDestroy is called with the sandbox ID after cleanup completes. + // Used by ProxyHandler to evict cached reverse proxies. + onDestroy func(sandboxID string) +} + +// SetOnDestroy registers a callback invoked after each sandbox is cleaned up. +func (m *Manager) SetOnDestroy(fn func(sandboxID string)) { + m.onDestroy = fn } // sandboxState holds the runtime state for a single sandbox. @@ -314,6 +323,10 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error { slog.Warn("snapshot cleanup error", "id", sandboxID, "error", err) } + if m.onDestroy != nil { + m.onDestroy(sandboxID) + } + slog.Info("sandbox destroyed", "id", sandboxID) return nil } diff --git a/internal/vm/fc.go b/internal/vm/fc.go index 3d0f246..5a131a4 100644 --- a/internal/vm/fc.go +++ b/internal/vm/fc.go @@ -84,11 +84,21 @@ func (c *fcClient) setRootfsDrive(ctx context.Context, driveID, path string, rea } // setNetworkInterface configures a network interface attached to a TAP device. +// A tx_rate_limiter caps sustained guest→host throughput to prevent user +// application traffic from completely saturating the TAP device and starving +// envd control traffic (PTY, exec, file ops). func (c *fcClient) setNetworkInterface(ctx context.Context, ifaceID, tapName, macAddr string) error { return c.do(ctx, http.MethodPut, "/network-interfaces/"+ifaceID, map[string]any{ "iface_id": ifaceID, "host_dev_name": tapName, "guest_mac": macAddr, + "tx_rate_limiter": map[string]any{ + "bandwidth": map[string]any{ + "size": 209715200, // 200 MB/s sustained + "refill_time": 1000, // refill period: 1 second + "one_time_burst": 104857600, // 100 MB initial burst + }, + }, }) } diff --git a/pkg/lifecycle/hostpool.go b/pkg/lifecycle/hostpool.go index 3931d7b..48ed6c9 100644 --- a/pkg/lifecycle/hostpool.go +++ b/pkg/lifecycle/hostpool.go @@ -3,6 +3,7 @@ package lifecycle import ( "crypto/tls" "fmt" + "net" "net/http" "strings" "sync" @@ -115,6 +116,34 @@ func (p *HostClientPool) ResolveAddr(addr string) string { return p.ensureScheme(addr) } +// NewProxyTransport returns a new http.RoundTripper configured for proxying +// user traffic to sandbox services. It is intentionally separate from the RPC +// transport returned by Transport() so that heavy proxy traffic (Jupyter +// WebSocket, REST API polling) cannot interfere with Connect RPC streams (PTY, +// exec) via HTTP/2 flow control or connection pool contention. +func (p *HostClientPool) NewProxyTransport() http.RoundTripper { + t := &http.Transport{ + ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking + MaxIdleConnsPerHost: 20, + MaxIdleConns: 100, + IdleConnTimeout: 120 * time.Second, + DisableCompression: true, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 20 * time.Second, + }).DialContext, + } + + // If the pool uses TLS, the proxy transport must too. + if p.httpClient.Transport != nil { + if ht, ok := p.httpClient.Transport.(*http.Transport); ok && ht.TLSClientConfig != nil { + t.TLSClientConfig = ht.TLSClientConfig.Clone() + } + } + + return t +} + // EnsureScheme adds "http://" if the address has no scheme. // Deprecated: use pool.ResolveAddr which respects the pool's TLS setting. func EnsureScheme(addr string) string {