1
0
forked from wrenn/wrenn

fix: resolve exec 502 by terminating process streams on exit

The start() and connect() streaming RPCs blocked forever in the data
event loop because ProcessHandle retains a broadcast sender (needed for
reconnection via connect()), preventing the channel from closing.

Race data_rx against end_rx with tokio::select! so the stream terminates
when the process exits. Remaining buffered data is drained before
yielding the end event.
This commit is contained in:
2026-05-09 16:36:33 +06:00
parent 2af8412cdc
commit d1d316f35c

View File

@ -193,16 +193,26 @@ impl Process for ProcessServiceImpl {
yield Ok(make_start_response(pid)); yield Ok(make_start_response(pid));
loop { loop {
match data_rx.recv().await { tokio::select! {
biased;
data = data_rx.recv() => {
match data {
Ok(ev) => yield Ok(make_data_start_response(ev)), Ok(ev) => yield Ok(make_data_start_response(ev)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break, Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
} }
} }
end = end_rx.recv() => {
if let Ok(end) = end_rx.recv().await { while let Ok(ev) = data_rx.try_recv() {
yield Ok(make_data_start_response(ev));
}
if let Ok(end) = end {
yield Ok(make_end_start_response(end)); yield Ok(make_end_start_response(end));
} }
break;
}
}
}
}; };
Ok((Box::pin(stream), ctx)) Ok((Box::pin(stream), ctx))
@ -240,7 +250,10 @@ impl Process for ProcessServiceImpl {
}); });
loop { loop {
match data_rx.recv().await { tokio::select! {
biased;
data = data_rx.recv() => {
match data {
Ok(ev) => { Ok(ev) => {
yield Ok(ConnectResponse { yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_data_event(ev)), event: buffa::MessageField::some(make_data_event(ev)),
@ -251,13 +264,23 @@ impl Process for ProcessServiceImpl {
Err(tokio::sync::broadcast::error::RecvError::Closed) => break, Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
} }
} }
end = end_rx.recv() => {
if let Ok(end) = end_rx.recv().await { while let Ok(ev) = data_rx.try_recv() {
yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_data_event(ev)),
..Default::default()
});
}
if let Ok(end) = end {
yield Ok(ConnectResponse { yield Ok(ConnectResponse {
event: buffa::MessageField::some(make_end_event(end)), event: buffa::MessageField::some(make_end_event(end)),
..Default::default() ..Default::default()
}); });
} }
break;
}
}
}
}; };
Ok((Box::pin(stream), ctx)) Ok((Box::pin(stream), ctx))