From aca43d51ebf8afed6ad62ade4dcb843586edcc5c Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sat, 9 May 2026 18:11:15 +0600 Subject: [PATCH] fix: resolve process stream hangs, pause race, and PTY signal loss - Cache terminal EndEvent on ProcessHandle so connect() can detect already-exited processes instead of hanging forever on broadcast receivers that missed the event. Subscribe before checking cache to close the TOCTOU window. - Protect sb.Status writes in Pause with m.mu to prevent data race with concurrent readers (AcquireProxyConn, Exec, etc.). - Restart metrics sampler in restoreRunning so a failed pause attempt doesn't permanently kill sandbox metrics collection. - Return dequeued non-input messages from coalescePtyInput instead of dropping them, preventing silent loss of kill/resize signals during typing bursts. --- envd-rs/src/rpc/process_handler.rs | 77 ++++++++++++++++-------------- envd-rs/src/rpc/process_service.rs | 54 ++++++++++++--------- internal/api/handlers_pty.go | 36 +++++++++----- internal/sandbox/manager.go | 5 ++ 4 files changed, 101 insertions(+), 71 deletions(-) diff --git a/envd-rs/src/rpc/process_handler.rs b/envd-rs/src/rpc/process_handler.rs index 85dc27d..8c7e07b 100644 --- a/envd-rs/src/rpc/process_handler.rs +++ b/envd-rs/src/rpc/process_handler.rs @@ -37,6 +37,7 @@ pub struct ProcessHandle { data_tx: broadcast::Sender, end_tx: broadcast::Sender, + ended: Mutex>, stdin: Mutex>, pty_master: Mutex>, @@ -51,6 +52,10 @@ impl ProcessHandle { self.end_tx.subscribe() } + pub fn cached_end(&self) -> Option { + self.ended.lock().unwrap().clone() + } + pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> { signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| { ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}")) @@ -250,6 +255,7 @@ pub fn spawn_process( pid, data_tx: data_tx.clone(), end_tx: end_tx.clone(), + ended: Mutex::new(None), stdin: Mutex::new(None), pty_master: Mutex::new(Some(master_file)), }); @@ -273,26 +279,25 @@ pub fn spawn_process( }); let end_tx_clone = end_tx.clone(); + let handle_for_waiter = Arc::clone(&handle); std::thread::spawn(move || { let mut child = child; - match child.wait() { - Ok(s) => { - let _ = end_tx_clone.send(EndEvent { - exit_code: s.code().unwrap_or(-1), - exited: s.code().is_some(), - status: format!("{s}"), - error: None, - }); - } - Err(e) => { - let _ = end_tx_clone.send(EndEvent { - exit_code: -1, - exited: false, - status: "error".into(), - error: Some(e.to_string()), - }); - } - } + let end_event = match child.wait() { + Ok(s) => EndEvent { + exit_code: s.code().unwrap_or(-1), + exited: s.code().is_some(), + status: format!("{s}"), + error: None, + }, + Err(e) => EndEvent { + exit_code: -1, + exited: false, + status: "error".into(), + error: Some(e.to_string()), + }, + }; + *handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone()); + let _ = end_tx_clone.send(end_event); }); tracing::info!(pid, cmd = cmd_str, "process started (pty)"); @@ -336,6 +341,7 @@ pub fn spawn_process( pid, data_tx: data_tx.clone(), end_tx: end_tx.clone(), + ended: Mutex::new(None), stdin: Mutex::new(stdin), pty_master: Mutex::new(None), }); @@ -376,25 +382,24 @@ pub fn spawn_process( } let end_tx_clone = end_tx.clone(); + let handle_for_waiter = Arc::clone(&handle); std::thread::spawn(move || { - match child.wait() { - Ok(s) => { - let _ = end_tx_clone.send(EndEvent { - exit_code: s.code().unwrap_or(-1), - exited: s.code().is_some(), - status: format!("{s}"), - error: None, - }); - } - Err(e) => { - let _ = end_tx_clone.send(EndEvent { - exit_code: -1, - exited: false, - status: "error".into(), - error: Some(e.to_string()), - }); - } - } + let end_event = match child.wait() { + Ok(s) => EndEvent { + exit_code: s.code().unwrap_or(-1), + exited: s.code().is_some(), + status: format!("{s}"), + error: None, + }, + Err(e) => EndEvent { + exit_code: -1, + exited: false, + status: "error".into(), + error: Some(e.to_string()), + }, + }; + *handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone()); + let _ = end_tx_clone.send(end_event); }); tracing::info!(pid, cmd = cmd_str, "process started (pipe)"); diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index 24e8049..3d53cd7 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -237,6 +237,7 @@ impl Process for ProcessServiceImpl { let mut data_rx = handle.subscribe_data(); let mut end_rx = handle.subscribe_end(); + let cached_end = handle.cached_end(); let stream = async_stream::stream! { yield Ok(ConnectResponse { @@ -249,36 +250,43 @@ impl Process for ProcessServiceImpl { ..Default::default() }); - loop { - tokio::select! { - biased; - data = data_rx.recv() => { - match data { - Ok(ev) => { + if let Some(end) = cached_end { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_end_event(end)), + ..Default::default() + }); + } else { + loop { + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_data_event(ev)), + ..Default::default() + }); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + end = end_rx.recv() => { + while let Ok(ev) = data_rx.try_recv() { yield Ok(ConnectResponse { event: buffa::MessageField::some(make_data_event(ev)), ..Default::default() }); } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + if let Ok(end) = end { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_end_event(end)), + ..Default::default() + }); + } + break; } } - end = end_rx.recv() => { - while let Ok(ev) = data_rx.try_recv() { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - }); - } - if let Ok(end) = end { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } - break; - } } } }; diff --git a/internal/api/handlers_pty.go b/internal/api/handlers_pty.go index 6799ffa..d0db965 100644 --- a/internal/api/handlers_pty.go +++ b/internal/api/handlers_pty.go @@ -350,9 +350,23 @@ func runPtyLoop( defer wg.Done() defer cancel() - for msg := range inputCh { - // Use a background context for unary RPCs so they complete - // even if the stream context is being cancelled. + // pending holds a non-input message dequeued during coalescing + // that must be processed on the next iteration. + var pending *wsPtyIn + + for { + var msg wsPtyIn + if pending != nil { + msg = *pending + pending = nil + } else { + var ok bool + msg, ok = <-inputCh + if !ok { + break + } + } + rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second) switch msg.Type { @@ -364,7 +378,7 @@ func runPtyLoop( } // Coalesce: drain any queued input messages into a single RPC. - data = coalescePtyInput(inputCh, data) + data, pending = coalescePtyInput(inputCh, data) if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{ SandboxId: sandboxID, @@ -430,19 +444,17 @@ func runPtyLoop( // coalescePtyInput drains any immediately-available "input" messages from the // channel and appends their decoded data to buf, reducing RPC call volume -// during bursts of fast typing. -func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte { +// during bursts of fast typing. Returns the coalesced buffer and any +// non-input message that was dequeued (must be processed by the caller). +func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) ([]byte, *wsPtyIn) { for { select { case msg, ok := <-ch: if !ok { - return buf + return buf, nil } if msg.Type != "input" { - // Non-input message — can't coalesce. Put-back isn't possible - // with channels, but resize/kill during a typing burst is rare - // enough that dropping one is acceptable. - return buf + return buf, &msg } data, err := base64.StdEncoding.DecodeString(msg.Data) if err != nil { @@ -450,7 +462,7 @@ func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte { } buf = append(buf, data...) default: - return buf + return buf, nil } } } diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 3589fb1..5917396 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -378,13 +378,18 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error { } // Mark sandbox as pausing to block new exec/file/PTY operations. + m.mu.Lock() sb.Status = models.StatusPausing + m.mu.Unlock() // restoreRunning reverts state if any pre-freeze step fails. restoreRunning := func() { _ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0) sb.connTracker.Reset() + m.mu.Lock() sb.Status = models.StatusRunning + m.mu.Unlock() + m.startSampler(sb) } // Stop the metrics sampler goroutine before tearing down any resources