1
0
forked from wrenn/wrenn

fix: resolve process stream hangs, pause race, and PTY signal loss

- Cache terminal EndEvent on ProcessHandle so connect() can detect
  already-exited processes instead of hanging forever on broadcast
  receivers that missed the event. Subscribe before checking cache
  to close the TOCTOU window.

- Protect sb.Status writes in Pause with m.mu to prevent data race
  with concurrent readers (AcquireProxyConn, Exec, etc.).

- Restart metrics sampler in restoreRunning so a failed pause attempt
  doesn't permanently kill sandbox metrics collection.

- Return dequeued non-input messages from coalescePtyInput instead of
  dropping them, preventing silent loss of kill/resize signals during
  typing bursts.
This commit is contained in:
2026-05-09 18:11:15 +06:00
parent 522e1c5e90
commit aca43d51eb
4 changed files with 101 additions and 71 deletions

View File

@ -37,6 +37,7 @@ pub struct ProcessHandle {
data_tx: broadcast::Sender<DataEvent>, data_tx: broadcast::Sender<DataEvent>,
end_tx: broadcast::Sender<EndEvent>, end_tx: broadcast::Sender<EndEvent>,
ended: Mutex<Option<EndEvent>>,
stdin: Mutex<Option<std::process::ChildStdin>>, stdin: Mutex<Option<std::process::ChildStdin>>,
pty_master: Mutex<Option<std::fs::File>>, pty_master: Mutex<Option<std::fs::File>>,
@ -51,6 +52,10 @@ impl ProcessHandle {
self.end_tx.subscribe() self.end_tx.subscribe()
} }
pub fn cached_end(&self) -> Option<EndEvent> {
self.ended.lock().unwrap().clone()
}
pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> { pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> {
signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| { signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| {
ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}")) ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}"))
@ -250,6 +255,7 @@ pub fn spawn_process(
pid, pid,
data_tx: data_tx.clone(), data_tx: data_tx.clone(),
end_tx: end_tx.clone(), end_tx: end_tx.clone(),
ended: Mutex::new(None),
stdin: Mutex::new(None), stdin: Mutex::new(None),
pty_master: Mutex::new(Some(master_file)), pty_master: Mutex::new(Some(master_file)),
}); });
@ -273,26 +279,25 @@ pub fn spawn_process(
}); });
let end_tx_clone = end_tx.clone(); let end_tx_clone = end_tx.clone();
let handle_for_waiter = Arc::clone(&handle);
std::thread::spawn(move || { std::thread::spawn(move || {
let mut child = child; let mut child = child;
match child.wait() { let end_event = match child.wait() {
Ok(s) => { Ok(s) => EndEvent {
let _ = end_tx_clone.send(EndEvent { exit_code: s.code().unwrap_or(-1),
exit_code: s.code().unwrap_or(-1), exited: s.code().is_some(),
exited: s.code().is_some(), status: format!("{s}"),
status: format!("{s}"), error: None,
error: None, },
}); Err(e) => EndEvent {
} exit_code: -1,
Err(e) => { exited: false,
let _ = end_tx_clone.send(EndEvent { status: "error".into(),
exit_code: -1, error: Some(e.to_string()),
exited: false, },
status: "error".into(), };
error: Some(e.to_string()), *handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
}); let _ = end_tx_clone.send(end_event);
}
}
}); });
tracing::info!(pid, cmd = cmd_str, "process started (pty)"); tracing::info!(pid, cmd = cmd_str, "process started (pty)");
@ -336,6 +341,7 @@ pub fn spawn_process(
pid, pid,
data_tx: data_tx.clone(), data_tx: data_tx.clone(),
end_tx: end_tx.clone(), end_tx: end_tx.clone(),
ended: Mutex::new(None),
stdin: Mutex::new(stdin), stdin: Mutex::new(stdin),
pty_master: Mutex::new(None), pty_master: Mutex::new(None),
}); });
@ -376,25 +382,24 @@ pub fn spawn_process(
} }
let end_tx_clone = end_tx.clone(); let end_tx_clone = end_tx.clone();
let handle_for_waiter = Arc::clone(&handle);
std::thread::spawn(move || { std::thread::spawn(move || {
match child.wait() { let end_event = match child.wait() {
Ok(s) => { Ok(s) => EndEvent {
let _ = end_tx_clone.send(EndEvent { exit_code: s.code().unwrap_or(-1),
exit_code: s.code().unwrap_or(-1), exited: s.code().is_some(),
exited: s.code().is_some(), status: format!("{s}"),
status: format!("{s}"), error: None,
error: None, },
}); Err(e) => EndEvent {
} exit_code: -1,
Err(e) => { exited: false,
let _ = end_tx_clone.send(EndEvent { status: "error".into(),
exit_code: -1, error: Some(e.to_string()),
exited: false, },
status: "error".into(), };
error: Some(e.to_string()), *handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
}); let _ = end_tx_clone.send(end_event);
}
}
}); });
tracing::info!(pid, cmd = cmd_str, "process started (pipe)"); tracing::info!(pid, cmd = cmd_str, "process started (pipe)");

View File

@ -237,6 +237,7 @@ impl Process for ProcessServiceImpl {
let mut data_rx = handle.subscribe_data(); let mut data_rx = handle.subscribe_data();
let mut end_rx = handle.subscribe_end(); let mut end_rx = handle.subscribe_end();
let cached_end = handle.cached_end();
let stream = async_stream::stream! { let stream = async_stream::stream! {
yield Ok(ConnectResponse { yield Ok(ConnectResponse {
@ -249,36 +250,43 @@ impl Process for ProcessServiceImpl {
..Default::default() ..Default::default()
}); });
loop { if let Some(end) = cached_end {
tokio::select! { yield Ok(ConnectResponse {
biased; event: buffa::MessageField::some(make_end_event(end)),
data = data_rx.recv() => { ..Default::default()
match data { });
Ok(ev) => { } else {
loop {
tokio::select! {
biased;
data = data_rx.recv() => {
match data {
Ok(ev) => {
yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_data_event(ev)),
..Default::default()
});
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
end = end_rx.recv() => {
while let Ok(ev) = data_rx.try_recv() {
yield Ok(ConnectResponse { yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_data_event(ev)), event: buffa::MessageField::some(make_data_event(ev)),
..Default::default() ..Default::default()
}); });
} }
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, if let Ok(end) = end {
Err(tokio::sync::broadcast::error::RecvError::Closed) => break, yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_end_event(end)),
..Default::default()
});
}
break;
} }
} }
end = end_rx.recv() => {
while let Ok(ev) = data_rx.try_recv() {
yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_data_event(ev)),
..Default::default()
});
}
if let Ok(end) = end {
yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_end_event(end)),
..Default::default()
});
}
break;
}
} }
} }
}; };

View File

@ -350,9 +350,23 @@ func runPtyLoop(
defer wg.Done() defer wg.Done()
defer cancel() defer cancel()
for msg := range inputCh { // pending holds a non-input message dequeued during coalescing
// Use a background context for unary RPCs so they complete // that must be processed on the next iteration.
// even if the stream context is being cancelled. var pending *wsPtyIn
for {
var msg wsPtyIn
if pending != nil {
msg = *pending
pending = nil
} else {
var ok bool
msg, ok = <-inputCh
if !ok {
break
}
}
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second) rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
switch msg.Type { switch msg.Type {
@ -364,7 +378,7 @@ func runPtyLoop(
} }
// Coalesce: drain any queued input messages into a single RPC. // Coalesce: drain any queued input messages into a single RPC.
data = coalescePtyInput(inputCh, data) data, pending = coalescePtyInput(inputCh, data)
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{ if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
SandboxId: sandboxID, SandboxId: sandboxID,
@ -430,19 +444,17 @@ func runPtyLoop(
// coalescePtyInput drains any immediately-available "input" messages from the // coalescePtyInput drains any immediately-available "input" messages from the
// channel and appends their decoded data to buf, reducing RPC call volume // channel and appends their decoded data to buf, reducing RPC call volume
// during bursts of fast typing. // during bursts of fast typing. Returns the coalesced buffer and any
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte { // non-input message that was dequeued (must be processed by the caller).
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) ([]byte, *wsPtyIn) {
for { for {
select { select {
case msg, ok := <-ch: case msg, ok := <-ch:
if !ok { if !ok {
return buf return buf, nil
} }
if msg.Type != "input" { if msg.Type != "input" {
// Non-input message — can't coalesce. Put-back isn't possible return buf, &msg
// with channels, but resize/kill during a typing burst is rare
// enough that dropping one is acceptable.
return buf
} }
data, err := base64.StdEncoding.DecodeString(msg.Data) data, err := base64.StdEncoding.DecodeString(msg.Data)
if err != nil { if err != nil {
@ -450,7 +462,7 @@ func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
} }
buf = append(buf, data...) buf = append(buf, data...)
default: default:
return buf return buf, nil
} }
} }
} }

View File

@ -378,13 +378,18 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
} }
// Mark sandbox as pausing to block new exec/file/PTY operations. // Mark sandbox as pausing to block new exec/file/PTY operations.
m.mu.Lock()
sb.Status = models.StatusPausing sb.Status = models.StatusPausing
m.mu.Unlock()
// restoreRunning reverts state if any pre-freeze step fails. // restoreRunning reverts state if any pre-freeze step fails.
restoreRunning := func() { restoreRunning := func() {
_ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0) _ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0)
sb.connTracker.Reset() sb.connTracker.Reset()
m.mu.Lock()
sb.Status = models.StatusRunning sb.Status = models.StatusRunning
m.mu.Unlock()
m.startSampler(sb)
} }
// Stop the metrics sampler goroutine before tearing down any resources // Stop the metrics sampler goroutine before tearing down any resources