From d1d316f35cc96c2408c51e5c3c4eabfa57ec4db0 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sat, 9 May 2026 16:36:33 +0600 Subject: [PATCH] fix: resolve exec 502 by terminating process streams on exit The start() and connect() streaming RPCs blocked forever in the data event loop because ProcessHandle retains a broadcast sender (needed for reconnection via connect()), preventing the channel from closing. Race data_rx against end_rx with tokio::select! so the stream terminates when the process exits. Remaining buffered data is drained before yielding the end event. --- envd-rs/src/rpc/process_service.rs | 69 ++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index 7e14b85..2682d84 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -193,16 +193,26 @@ impl Process for ProcessServiceImpl { yield Ok(make_start_response(pid)); loop { - match data_rx.recv().await { - Ok(ev) => yield Ok(make_data_start_response(ev)), - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => yield Ok(make_data_start_response(ev)), + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + end = end_rx.recv() => { + while let Ok(ev) = data_rx.try_recv() { + yield Ok(make_data_start_response(ev)); + } + if let Ok(end) = end { + yield Ok(make_end_start_response(end)); + } + break; + } } } - - if let Ok(end) = end_rx.recv().await { - yield Ok(make_end_start_response(end)); - } }; Ok((Box::pin(stream), ctx)) @@ -240,24 +250,37 @@ impl Process for ProcessServiceImpl { }); loop { - match data_rx.recv().await { - Ok(ev) => { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - }); + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_data_event(ev)), + ..Default::default() + }); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + end = end_rx.recv() => { + while let Ok(ev) = data_rx.try_recv() { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_data_event(ev)), + ..Default::default() + }); + } + if let Ok(end) = end { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_end_event(end)), + ..Default::default() + }); + } + break; } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } - - if let Ok(end) = end_rx.recv().await { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } }; Ok((Box::pin(stream), ctx))