diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index 7e14b85..2682d84 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -193,16 +193,26 @@ impl Process for ProcessServiceImpl { yield Ok(make_start_response(pid)); loop { - match data_rx.recv().await { - Ok(ev) => yield Ok(make_data_start_response(ev)), - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => yield Ok(make_data_start_response(ev)), + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + end = end_rx.recv() => { + while let Ok(ev) = data_rx.try_recv() { + yield Ok(make_data_start_response(ev)); + } + if let Ok(end) = end { + yield Ok(make_end_start_response(end)); + } + break; + } } } - - if let Ok(end) = end_rx.recv().await { - yield Ok(make_end_start_response(end)); - } }; Ok((Box::pin(stream), ctx)) @@ -240,24 +250,37 @@ impl Process for ProcessServiceImpl { }); loop { - match data_rx.recv().await { - Ok(ev) => { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - }); + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_data_event(ev)), + ..Default::default() + }); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + end = end_rx.recv() => { + while let Ok(ev) = data_rx.try_recv() { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_data_event(ev)), + ..Default::default() + }); + } + if let Ok(end) = end { + yield Ok(ConnectResponse { + event: buffa::MessageField::some(make_end_event(end)), + ..Default::default() + }); + } + break; } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } - - if let Ok(end) = end_rx.recv().await { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } }; Ok((Box::pin(stream), ctx))