forked from wrenn/wrenn
fix: sandbox network responsiveness under port-binding apps
Running port-binding applications (Jupyter, http.server, NextJS) inside
sandboxes caused severe PTY sluggishness and proxy navigation errors.
Root cause: the CP sandbox proxy and Connect RPC pool shared a single
HTTP transport. Heavy proxy traffic (Jupyter WebSocket, REST polling)
interfered with PTY RPC streams via HTTP/2 flow control contention.
Transport isolation (main fix):
- Add dedicated proxy transport on CP (NewProxyTransport) with HTTP/2
disabled, separate from the RPC pool transport
- Add dedicated proxy transport on host agent, replacing
http.DefaultTransport
- Add dedicated envdclient transport with tuned connection pooling
- Replace http.DefaultClient in file streaming RPCs with per-sandbox
envd client
Proxy path rewriting (navigation fix):
- Add ModifyResponse to rewrite Location headers with /proxy/{id}/{port}
prefix, handling both root-relative and absolute-URL redirects
- Strip prefix back out in CP subdomain proxy for correct browser
behavior
- Replace path.Join with string concat in CP Director to preserve
trailing slashes (prevents redirect loops on directory listings)
Proxy resilience:
- Add dial retry with linear backoff (3 attempts) to handle socat
startup delay when ports are first detected
- Cache ReverseProxy instances per sandbox+port+host in sync.Map
- Add EvictProxy callback wired into sandbox Manager.Destroy
Buffer and server hardening:
- Increase PTY and exec stream channel buffers from 16 to 256
- Add ReadHeaderTimeout (10s) and IdleTimeout (620s) to host agent
HTTP server
Network tuning:
- Set TAP device TxQueueLen to 5000 (up from default 1000)
- Add Firecracker tx_rate_limiter (200 MB/s sustained, 100 MB burst)
to prevent guest traffic from saturating the TAP
This commit is contained in:
@ -148,7 +148,13 @@ func main() {
|
|||||||
slog.Info("host registered", "host_id", creds.HostID)
|
slog.Info("host registered", "host_id", creds.HostID)
|
||||||
|
|
||||||
// httpServer is declared here so the shutdown func can reference it.
|
// httpServer is declared here so the shutdown func can reference it.
|
||||||
httpServer := &http.Server{Addr: listenAddr}
|
// ReadTimeout/WriteTimeout are intentionally omitted — they would kill
|
||||||
|
// long-lived Connect RPC streams and WebSocket proxy connections.
|
||||||
|
httpServer := &http.Server{
|
||||||
|
Addr: listenAddr,
|
||||||
|
ReadHeaderTimeout: 10 * time.Second,
|
||||||
|
IdleTimeout: 620 * time.Second, // > typical LB upstream timeout (600s)
|
||||||
|
}
|
||||||
|
|
||||||
// mTLS is mandatory — refuse to start without a valid certificate.
|
// mTLS is mandatory — refuse to start without a valid certificate.
|
||||||
var certStore hostagent.CertStore
|
var certStore hostagent.CertStore
|
||||||
@ -193,6 +199,7 @@ func main() {
|
|||||||
path, handler := hostagentv1connect.NewHostAgentServiceHandler(srv)
|
path, handler := hostagentv1connect.NewHostAgentServiceHandler(srv)
|
||||||
|
|
||||||
proxyHandler := hostagent.NewProxyHandler(mgr)
|
proxyHandler := hostagent.NewProxyHandler(mgr)
|
||||||
|
mgr.SetOnDestroy(proxyHandler.EvictProxy)
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle(path, handler)
|
mux.Handle(path, handler)
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -74,7 +73,7 @@ func NewSandboxProxyWrapper(inner http.Handler, queries *db.Queries, pool *lifec
|
|||||||
inner: inner,
|
inner: inner,
|
||||||
db: queries,
|
db: queries,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
transport: pool.Transport(),
|
transport: pool.NewProxyTransport(),
|
||||||
cache: make(map[pgtype.UUID]proxyCacheEntry),
|
cache: make(map[pgtype.UUID]proxyCacheEntry),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -167,14 +166,29 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The host agent's proxy adds a /proxy/{id}/{port} prefix to Location
|
||||||
|
// headers for path-based routing. For subdomain routing the browser is at
|
||||||
|
// {port}-{id}.domain, so we strip the prefix back out.
|
||||||
|
agentProxyPrefix := "/proxy/" + sandboxIDStr + "/" + port
|
||||||
|
|
||||||
proxy := &httputil.ReverseProxy{
|
proxy := &httputil.ReverseProxy{
|
||||||
Transport: h.transport,
|
Transport: h.transport,
|
||||||
Director: func(req *http.Request) {
|
Director: func(req *http.Request) {
|
||||||
req.URL.Scheme = agentURL.Scheme
|
req.URL.Scheme = agentURL.Scheme
|
||||||
req.URL.Host = agentURL.Host
|
req.URL.Host = agentURL.Host
|
||||||
req.URL.Path = path.Join("/proxy", sandboxIDStr, port, path.Clean("/"+req.URL.Path))
|
// Use string concatenation instead of path.Join to preserve trailing
|
||||||
|
// slashes. path.Join strips them, causing redirect loops for directory
|
||||||
|
// listings in apps like python http.server and Jupyter.
|
||||||
|
req.URL.Path = "/proxy/" + sandboxIDStr + "/" + port + req.URL.Path
|
||||||
req.Host = agentURL.Host
|
req.Host = agentURL.Host
|
||||||
},
|
},
|
||||||
|
ModifyResponse: func(resp *http.Response) error {
|
||||||
|
if loc := resp.Header.Get("Location"); loc != "" {
|
||||||
|
loc = strings.TrimPrefix(loc, agentProxyPrefix)
|
||||||
|
resp.Header.Set("Location", loc)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
slog.Debug("sandbox proxy error",
|
slog.Debug("sandbox proxy error",
|
||||||
"sandbox_id", sandboxIDStr,
|
"sandbox_id", sandboxIDStr,
|
||||||
|
|||||||
@ -48,6 +48,13 @@ func (c *Client) BaseURL() string {
|
|||||||
return c.base
|
return c.base
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTPClient returns the underlying http.Client used for envd requests.
|
||||||
|
// Use this instead of http.DefaultClient when making direct HTTP calls to envd
|
||||||
|
// (e.g. file streaming) to avoid sharing the global transport with proxy traffic.
|
||||||
|
func (c *Client) HTTPClient() *http.Client {
|
||||||
|
return c.httpClient
|
||||||
|
}
|
||||||
|
|
||||||
// ExecResult holds the output of a command execution.
|
// ExecResult holds the output of a command execution.
|
||||||
type ExecResult struct {
|
type ExecResult struct {
|
||||||
Stdout []byte
|
Stdout []byte
|
||||||
@ -142,7 +149,7 @@ func (c *Client) ExecStream(ctx context.Context, cmd string, args ...string) (<-
|
|||||||
return nil, fmt.Errorf("start process: %w", err)
|
return nil, fmt.Errorf("start process: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan ExecStreamEvent, 16)
|
ch := make(chan ExecStreamEvent, 256)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|||||||
@ -2,7 +2,9 @@ package envdclient
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// envdPort is the default port envd listens on inside the guest.
|
// envdPort is the default port envd listens on inside the guest.
|
||||||
@ -13,9 +15,19 @@ func baseURL(hostIP string) string {
|
|||||||
return fmt.Sprintf("http://%s:%d", hostIP, envdPort)
|
return fmt.Sprintf("http://%s:%d", hostIP, envdPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHTTPClient returns an http.Client suitable for talking to envd.
|
// newHTTPClient returns an http.Client with a dedicated transport for talking
|
||||||
// No special transport is needed — envd is reachable via the host IP
|
// to envd. The transport is intentionally separate from http.DefaultTransport
|
||||||
// through the veth/TAP network path.
|
// so that proxy traffic to user services inside the sandbox cannot interfere
|
||||||
|
// with envd RPC connections (PTY streams, exec, file ops).
|
||||||
func newHTTPClient() *http.Client {
|
func newHTTPClient() *http.Client {
|
||||||
return &http.Client{}
|
return &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConnsPerHost: 10,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -162,7 +162,7 @@ type eventProvider interface {
|
|||||||
// drainPtyStream reads events from either a Start or Connect stream and maps
|
// drainPtyStream reads events from either a Start or Connect stream and maps
|
||||||
// them into PtyEvent values on a channel.
|
// them into PtyEvent values on a channel.
|
||||||
func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent {
|
func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent {
|
||||||
ch := make(chan PtyEvent, 16)
|
ch := make(chan PtyEvent, 256)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|||||||
@ -1,16 +1,28 @@
|
|||||||
package hostagent
|
package hostagent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.omukk.dev/wrenn/wrenn/internal/sandbox"
|
"git.omukk.dev/wrenn/wrenn/internal/sandbox"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// proxyDialAttempts is the number of connection attempts for the proxy
|
||||||
|
// transport. Retries handle the delay between a process binding to a port
|
||||||
|
// inside the guest and socat/Go-proxy starting to forward on the TAP IP.
|
||||||
|
proxyDialAttempts = 3
|
||||||
|
)
|
||||||
|
|
||||||
// ProxyHandler reverse-proxies HTTP requests to services running inside
|
// ProxyHandler reverse-proxies HTTP requests to services running inside
|
||||||
// sandboxes. It handles requests of the form:
|
// sandboxes. It handles requests of the form:
|
||||||
//
|
//
|
||||||
@ -21,16 +33,75 @@ import (
|
|||||||
type ProxyHandler struct {
|
type ProxyHandler struct {
|
||||||
mgr *sandbox.Manager
|
mgr *sandbox.Manager
|
||||||
transport http.RoundTripper
|
transport http.RoundTripper
|
||||||
|
|
||||||
|
// proxies caches ReverseProxy instances per sandbox+port to avoid
|
||||||
|
// per-request allocation under high-frequency REST polling.
|
||||||
|
proxies sync.Map // key: "sandboxID/port" → *httputil.ReverseProxy
|
||||||
|
}
|
||||||
|
|
||||||
|
// newProxyTransport returns an HTTP transport dedicated to proxying user
|
||||||
|
// traffic into sandboxes. It is intentionally separate from the envdclient
|
||||||
|
// transport and http.DefaultTransport to prevent proxy traffic from
|
||||||
|
// interfering with Connect RPC streams (PTY, exec).
|
||||||
|
func newProxyTransport() http.RoundTripper {
|
||||||
|
dialer := &net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 20 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &http.Transport{
|
||||||
|
ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking
|
||||||
|
MaxIdleConnsPerHost: 20,
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
IdleConnTimeout: 120 * time.Second,
|
||||||
|
DisableCompression: true,
|
||||||
|
// Retry with linear backoff to handle the delay between a process
|
||||||
|
// binding inside the guest and the port forwarder making it reachable.
|
||||||
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
var conn net.Conn
|
||||||
|
var err error
|
||||||
|
for attempt := range proxyDialAttempts {
|
||||||
|
conn, err = dialer.DialContext(ctx, network, addr)
|
||||||
|
if err == nil {
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
// Don't sleep on the last attempt.
|
||||||
|
if attempt < proxyDialAttempts-1 {
|
||||||
|
backoff := time.Duration(100*(attempt+1)) * time.Millisecond
|
||||||
|
select {
|
||||||
|
case <-time.After(backoff):
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProxyHandler creates a new sandbox proxy handler.
|
// NewProxyHandler creates a new sandbox proxy handler.
|
||||||
func NewProxyHandler(mgr *sandbox.Manager) *ProxyHandler {
|
func NewProxyHandler(mgr *sandbox.Manager) *ProxyHandler {
|
||||||
return &ProxyHandler{
|
return &ProxyHandler{
|
||||||
mgr: mgr,
|
mgr: mgr,
|
||||||
transport: http.DefaultTransport,
|
transport: newProxyTransport(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EvictProxy removes cached reverse proxy instances for a sandbox.
|
||||||
|
// Call this when a sandbox is destroyed.
|
||||||
|
func (h *ProxyHandler) EvictProxy(sandboxID string) {
|
||||||
|
h.proxies.Range(func(key, _ any) bool {
|
||||||
|
if k, ok := key.(string); ok && strings.HasPrefix(k, sandboxID+"/") {
|
||||||
|
h.proxies.Delete(key)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// ServeHTTP implements http.Handler.
|
// ServeHTTP implements http.Handler.
|
||||||
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
// Expected path: /proxy/{sandbox_id}/{port}/...
|
// Expected path: /proxy/{sandbox_id}/{port}/...
|
||||||
@ -49,10 +120,6 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
sandboxID := parts[0]
|
sandboxID := parts[0]
|
||||||
port := parts[1]
|
port := parts[1]
|
||||||
remainder := ""
|
|
||||||
if len(parts) == 3 {
|
|
||||||
remainder = parts[2]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate port is a number in the valid range.
|
// Validate port is a number in the valid range.
|
||||||
portNum, err := strconv.Atoi(port)
|
portNum, err := strconv.Atoi(port)
|
||||||
@ -68,22 +135,61 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
defer tracker.Release()
|
defer tracker.Release()
|
||||||
|
|
||||||
targetHost := fmt.Sprintf("%s:%d", hostIP, portNum)
|
proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum))
|
||||||
|
proxy.ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOrCreateProxy returns a cached ReverseProxy for the given sandbox+port+host,
|
||||||
|
// creating one if it doesn't exist. The targetHost is included in the key so
|
||||||
|
// that an IP change after pause/resume naturally misses the old entry.
|
||||||
|
func (h *ProxyHandler) getOrCreateProxy(sandboxID, port, targetHost string) *httputil.ReverseProxy {
|
||||||
|
cacheKey := sandboxID + "/" + port + "/" + targetHost
|
||||||
|
|
||||||
|
if v, ok := h.proxies.Load(cacheKey); ok {
|
||||||
|
return v.(*httputil.ReverseProxy)
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyPrefix := "/proxy/" + sandboxID + "/" + port
|
||||||
|
|
||||||
proxy := &httputil.ReverseProxy{
|
proxy := &httputil.ReverseProxy{
|
||||||
Transport: h.transport,
|
Transport: h.transport,
|
||||||
Director: func(req *http.Request) {
|
Director: func(req *http.Request) {
|
||||||
|
// Extract remainder from the original path: /proxy/{id}/{port}/{remainder}
|
||||||
|
remainder := ""
|
||||||
|
if trimmed := strings.TrimPrefix(req.URL.Path, proxyPrefix); trimmed != req.URL.Path {
|
||||||
|
remainder = strings.TrimPrefix(trimmed, "/")
|
||||||
|
}
|
||||||
|
|
||||||
req.URL.Scheme = "http"
|
req.URL.Scheme = "http"
|
||||||
req.URL.Host = targetHost
|
req.URL.Host = targetHost
|
||||||
req.URL.Path = "/" + remainder
|
req.URL.Path = "/" + remainder
|
||||||
req.URL.RawQuery = r.URL.RawQuery
|
|
||||||
req.Host = targetHost
|
req.Host = targetHost
|
||||||
},
|
},
|
||||||
|
// Rewrite redirect Location headers so they include the /proxy/{id}/{port}
|
||||||
|
// prefix. Handles both root-relative (/path) and absolute-URL redirects
|
||||||
|
// (http://internal-ip:port/path) that would otherwise leak internal IPs
|
||||||
|
// or break directory navigation.
|
||||||
|
ModifyResponse: func(resp *http.Response) error {
|
||||||
|
loc := resp.Header.Get("Location")
|
||||||
|
if loc == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(loc, "/") {
|
||||||
|
resp.Header.Set("Location", proxyPrefix+loc)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Rewrite absolute URLs pointing to the internal target host.
|
||||||
|
if u, err := url.Parse(loc); err == nil && u.Host == targetHost {
|
||||||
|
resp.Header.Set("Location", proxyPrefix+u.RequestURI())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||||
slog.Debug("proxy error", "sandbox_id", sandboxID, "port", port, "error", err)
|
slog.Debug("proxy error", "sandbox_id", sandboxID, "port", port, "error", err)
|
||||||
http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway)
|
http.Error(w, "proxy error: "+err.Error(), http.StatusBadGateway)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy.ServeHTTP(w, r)
|
actual, _ := h.proxies.LoadOrStore(cacheKey, proxy)
|
||||||
|
return actual.(*httputil.ReverseProxy)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -459,7 +459,7 @@ func (s *Server) WriteFileStream(
|
|||||||
}
|
}
|
||||||
httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType())
|
httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType())
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(httpReq)
|
resp, err := client.HTTPClient().Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pw.CloseWithError(err)
|
pw.CloseWithError(err)
|
||||||
<-errCh
|
<-errCh
|
||||||
@ -504,7 +504,7 @@ func (s *Server) ReadFileStream(
|
|||||||
return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
|
return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(httpReq)
|
resp, err := client.HTTPClient().Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err))
|
return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -269,6 +269,7 @@ func CreateNetwork(slot *Slot) error {
|
|||||||
// Create TAP device inside namespace.
|
// Create TAP device inside namespace.
|
||||||
tapAttrs := netlink.NewLinkAttrs()
|
tapAttrs := netlink.NewLinkAttrs()
|
||||||
tapAttrs.Name = tapName
|
tapAttrs.Name = tapName
|
||||||
|
tapAttrs.TxQLen = 5000 // Up from default 1000 to reduce drops under bursty traffic.
|
||||||
tap := &netlink.Tuntap{
|
tap := &netlink.Tuntap{
|
||||||
LinkAttrs: tapAttrs,
|
LinkAttrs: tapAttrs,
|
||||||
Mode: netlink.TUNTAP_MODE_TAP,
|
Mode: netlink.TUNTAP_MODE_TAP,
|
||||||
|
|||||||
@ -53,6 +53,15 @@ type Manager struct {
|
|||||||
|
|
||||||
autoPausedMu sync.Mutex
|
autoPausedMu sync.Mutex
|
||||||
autoPausedIDs []string
|
autoPausedIDs []string
|
||||||
|
|
||||||
|
// onDestroy is called with the sandbox ID after cleanup completes.
|
||||||
|
// Used by ProxyHandler to evict cached reverse proxies.
|
||||||
|
onDestroy func(sandboxID string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetOnDestroy registers a callback invoked after each sandbox is cleaned up.
|
||||||
|
func (m *Manager) SetOnDestroy(fn func(sandboxID string)) {
|
||||||
|
m.onDestroy = fn
|
||||||
}
|
}
|
||||||
|
|
||||||
// sandboxState holds the runtime state for a single sandbox.
|
// sandboxState holds the runtime state for a single sandbox.
|
||||||
@ -314,6 +323,10 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
|
|||||||
slog.Warn("snapshot cleanup error", "id", sandboxID, "error", err)
|
slog.Warn("snapshot cleanup error", "id", sandboxID, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.onDestroy != nil {
|
||||||
|
m.onDestroy(sandboxID)
|
||||||
|
}
|
||||||
|
|
||||||
slog.Info("sandbox destroyed", "id", sandboxID)
|
slog.Info("sandbox destroyed", "id", sandboxID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -84,11 +84,21 @@ func (c *fcClient) setRootfsDrive(ctx context.Context, driveID, path string, rea
|
|||||||
}
|
}
|
||||||
|
|
||||||
// setNetworkInterface configures a network interface attached to a TAP device.
|
// setNetworkInterface configures a network interface attached to a TAP device.
|
||||||
|
// A tx_rate_limiter caps sustained guest→host throughput to prevent user
|
||||||
|
// application traffic from completely saturating the TAP device and starving
|
||||||
|
// envd control traffic (PTY, exec, file ops).
|
||||||
func (c *fcClient) setNetworkInterface(ctx context.Context, ifaceID, tapName, macAddr string) error {
|
func (c *fcClient) setNetworkInterface(ctx context.Context, ifaceID, tapName, macAddr string) error {
|
||||||
return c.do(ctx, http.MethodPut, "/network-interfaces/"+ifaceID, map[string]any{
|
return c.do(ctx, http.MethodPut, "/network-interfaces/"+ifaceID, map[string]any{
|
||||||
"iface_id": ifaceID,
|
"iface_id": ifaceID,
|
||||||
"host_dev_name": tapName,
|
"host_dev_name": tapName,
|
||||||
"guest_mac": macAddr,
|
"guest_mac": macAddr,
|
||||||
|
"tx_rate_limiter": map[string]any{
|
||||||
|
"bandwidth": map[string]any{
|
||||||
|
"size": 209715200, // 200 MB/s sustained
|
||||||
|
"refill_time": 1000, // refill period: 1 second
|
||||||
|
"one_time_burst": 104857600, // 100 MB initial burst
|
||||||
|
},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package lifecycle
|
|||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -115,6 +116,34 @@ func (p *HostClientPool) ResolveAddr(addr string) string {
|
|||||||
return p.ensureScheme(addr)
|
return p.ensureScheme(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewProxyTransport returns a new http.RoundTripper configured for proxying
|
||||||
|
// user traffic to sandbox services. It is intentionally separate from the RPC
|
||||||
|
// transport returned by Transport() so that heavy proxy traffic (Jupyter
|
||||||
|
// WebSocket, REST API polling) cannot interfere with Connect RPC streams (PTY,
|
||||||
|
// exec) via HTTP/2 flow control or connection pool contention.
|
||||||
|
func (p *HostClientPool) NewProxyTransport() http.RoundTripper {
|
||||||
|
t := &http.Transport{
|
||||||
|
ForceAttemptHTTP2: false, // HTTP/1.1 only — avoids HTTP/2 HOL blocking
|
||||||
|
MaxIdleConnsPerHost: 20,
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
IdleConnTimeout: 120 * time.Second,
|
||||||
|
DisableCompression: true,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 20 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the pool uses TLS, the proxy transport must too.
|
||||||
|
if p.httpClient.Transport != nil {
|
||||||
|
if ht, ok := p.httpClient.Transport.(*http.Transport); ok && ht.TLSClientConfig != nil {
|
||||||
|
t.TLSClientConfig = ht.TLSClientConfig.Clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
// EnsureScheme adds "http://" if the address has no scheme.
|
// EnsureScheme adds "http://" if the address has no scheme.
|
||||||
// Deprecated: use pool.ResolveAddr which respects the pool's TLS setting.
|
// Deprecated: use pool.ResolveAddr which respects the pool's TLS setting.
|
||||||
func EnsureScheme(addr string) string {
|
func EnsureScheme(addr string) string {
|
||||||
|
|||||||
Reference in New Issue
Block a user