forked from wrenn/wrenn
fix: prevent sandbox halt after resume by fixing HTTP/2 HOL blocking and adding timeouts
Disable HTTP/2 on both host agent server and CP→agent transport — multiplexing caused head-of-line blocking when a slow sandbox RPC stalled the shared connection. Add ResponseHeaderTimeout to envd HTTP clients. Merge SetDefaults into Resume's PostInit call to eliminate an extra round-trip that could hang on a stale connection.
This commit is contained in:
@ -154,6 +154,11 @@ func main() {
|
|||||||
Addr: listenAddr,
|
Addr: listenAddr,
|
||||||
ReadHeaderTimeout: 10 * time.Second,
|
ReadHeaderTimeout: 10 * time.Second,
|
||||||
IdleTimeout: 620 * time.Second, // > typical LB upstream timeout (600s)
|
IdleTimeout: 620 * time.Second, // > typical LB upstream timeout (600s)
|
||||||
|
// Disable HTTP/2: empty non-nil map prevents Go from registering
|
||||||
|
// the h2 ALPN token. Connect RPC works over HTTP/1.1; HTTP/2
|
||||||
|
// multiplexing causes HOL blocking when a slow sandbox RPC stalls
|
||||||
|
// the shared connection.
|
||||||
|
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
|
||||||
}
|
}
|
||||||
|
|
||||||
// mTLS is mandatory — refuse to start without a valid certificate.
|
// mTLS is mandatory — refuse to start without a valid certificate.
|
||||||
|
|||||||
@ -23,8 +23,9 @@ func newHTTPClient() *http.Client {
|
|||||||
return &http.Client{
|
return &http.Client{
|
||||||
Timeout: 2 * time.Minute,
|
Timeout: 2 * time.Minute,
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
MaxIdleConnsPerHost: 10,
|
MaxIdleConnsPerHost: 10,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 30 * time.Second,
|
||||||
DialContext: (&net.Dialer{
|
DialContext: (&net.Dialer{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
KeepAlive: 30 * time.Second,
|
KeepAlive: 30 * time.Second,
|
||||||
@ -38,8 +39,9 @@ func newHTTPClient() *http.Client {
|
|||||||
func newStreamingHTTPClient() *http.Client {
|
func newStreamingHTTPClient() *http.Client {
|
||||||
return &http.Client{
|
return &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
MaxIdleConnsPerHost: 10,
|
MaxIdleConnsPerHost: 10,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 30 * time.Second,
|
||||||
DialContext: (&net.Dialer{
|
DialContext: (&net.Dialer{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
KeepAlive: 30 * time.Second,
|
KeepAlive: 30 * time.Second,
|
||||||
|
|||||||
@ -109,18 +109,11 @@ func (s *Server) ResumeSandbox(
|
|||||||
req *connect.Request[pb.ResumeSandboxRequest],
|
req *connect.Request[pb.ResumeSandboxRequest],
|
||||||
) (*connect.Response[pb.ResumeSandboxResponse], error) {
|
) (*connect.Response[pb.ResumeSandboxResponse], error) {
|
||||||
msg := req.Msg
|
msg := req.Msg
|
||||||
sb, err := s.mgr.Resume(ctx, msg.SandboxId, int(msg.TimeoutSec), msg.KernelVersion)
|
sb, err := s.mgr.Resume(ctx, msg.SandboxId, int(msg.TimeoutSec), msg.KernelVersion, msg.DefaultUser, msg.DefaultEnv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, connect.NewError(connect.CodeInternal, err)
|
return nil, connect.NewError(connect.CodeInternal, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply template defaults (user, env vars) if provided.
|
|
||||||
if msg.DefaultUser != "" || len(msg.DefaultEnv) > 0 {
|
|
||||||
if err := s.mgr.SetDefaults(ctx, sb.ID, msg.DefaultUser, msg.DefaultEnv); err != nil {
|
|
||||||
slog.Warn("failed to set sandbox defaults on resume", "sandbox", sb.ID, "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return connect.NewResponse(&pb.ResumeSandboxResponse{
|
return connect.NewResponse(&pb.ResumeSandboxResponse{
|
||||||
SandboxId: sb.ID,
|
SandboxId: sb.ID,
|
||||||
Status: string(sb.Status),
|
Status: string(sb.Status),
|
||||||
|
|||||||
@ -626,7 +626,9 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
|||||||
|
|
||||||
// Resume restores a paused sandbox from its snapshot using UFFD for
|
// Resume restores a paused sandbox from its snapshot using UFFD for
|
||||||
// lazy memory loading. The sandbox gets a new network slot.
|
// lazy memory loading. The sandbox gets a new network slot.
|
||||||
func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, kernelVersion string) (*models.Sandbox, error) {
|
// Optional defaultUser and defaultEnv are applied via a single PostInit
|
||||||
|
// call so that template defaults are set without an extra round-trip.
|
||||||
|
func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int, kernelVersion string, defaultUser string, defaultEnv map[string]string) (*models.Sandbox, error) {
|
||||||
pauseDir := layout.PauseSnapshotDir(m.cfg.WrennDir, sandboxID)
|
pauseDir := layout.PauseSnapshotDir(m.cfg.WrennDir, sandboxID)
|
||||||
if _, err := os.Stat(pauseDir); err != nil {
|
if _, err := os.Stat(pauseDir); err != nil {
|
||||||
return nil, fmt.Errorf("no snapshot found for sandbox %s", sandboxID)
|
return nil, fmt.Errorf("no snapshot found for sandbox %s", sandboxID)
|
||||||
@ -783,8 +785,8 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
|
|||||||
return nil, fmt.Errorf("wait for envd: %w", err)
|
return nil, fmt.Errorf("wait for envd: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger envd to re-read MMDS so it picks up the new sandbox/template IDs.
|
// Trigger envd to re-read MMDS and apply template defaults in a single call.
|
||||||
if err := client.PostInit(waitCtx); err != nil {
|
if err := client.PostInitWithDefaults(waitCtx, defaultUser, defaultEnv); err != nil {
|
||||||
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -39,7 +39,19 @@ func NewHostClientPool() *HostClientPool {
|
|||||||
// (use auth.CPClientTLSConfig to construct it).
|
// (use auth.CPClientTLSConfig to construct it).
|
||||||
func NewHostClientPoolTLS(tlsCfg *tls.Config) *HostClientPool {
|
func NewHostClientPoolTLS(tlsCfg *tls.Config) *HostClientPool {
|
||||||
transport := &http.Transport{
|
transport := &http.Transport{
|
||||||
TLSClientConfig: tlsCfg,
|
TLSClientConfig: tlsCfg,
|
||||||
|
ForceAttemptHTTP2: false,
|
||||||
|
// Empty non-nil map disables HTTP/2 ALPN negotiation, forcing HTTP/1.1.
|
||||||
|
// Connect RPC works over HTTP/1.1; HTTP/2 multiplexing causes HOL
|
||||||
|
// blocking when a single slow sandbox RPC stalls the shared connection.
|
||||||
|
TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper),
|
||||||
|
MaxIdleConnsPerHost: 20,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 45 * time.Second,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
}
|
}
|
||||||
return &HostClientPool{
|
return &HostClientPool{
|
||||||
clients: make(map[string]hostagentv1connect.HostAgentServiceClient),
|
clients: make(map[string]hostagentv1connect.HostAgentServiceClient),
|
||||||
|
|||||||
Reference in New Issue
Block a user