diff --git a/envd-rs/src/main.rs b/envd-rs/src/main.rs index 6a7bf42..9e33fec 100644 --- a/envd-rs/src/main.rs +++ b/envd-rs/src/main.rs @@ -222,8 +222,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) { &user, &state.defaults.env_vars, ) { - Ok(handle) => { - tracing::info!(pid = handle.pid, cmd, "initial command spawned"); + Ok(spawned) => { + tracing::info!(pid = spawned.handle.pid, cmd, "initial command spawned"); } Err(e) => { tracing::error!(error = %e, cmd, "failed to spawn initial command"); diff --git a/envd-rs/src/rpc/process_handler.rs b/envd-rs/src/rpc/process_handler.rs index 296c075..85dc27d 100644 --- a/envd-rs/src/rpc/process_handler.rs +++ b/envd-rs/src/rpc/process_handler.rs @@ -128,6 +128,12 @@ impl ProcessHandle { } } +pub struct SpawnedProcess { + pub handle: Arc, + pub data_rx: broadcast::Receiver, + pub end_rx: broadcast::Receiver, +} + pub fn spawn_process( cmd_str: &str, args: &[String], @@ -138,7 +144,7 @@ pub fn spawn_process( tag: Option, user: &nix::unistd::User, default_env_vars: &dashmap::DashMap, -) -> Result, ConnectError> { +) -> Result { let mut env: Vec<(String, String)> = Vec::new(); env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default())); let home = user.dir.to_string_lossy().to_string(); @@ -248,6 +254,9 @@ pub fn spawn_process( pty_master: Mutex::new(Some(master_file)), }); + let data_rx = handle.subscribe_data(); + let end_rx = handle.subscribe_end(); + let data_tx_clone = data_tx.clone(); std::thread::spawn(move || { let mut master = master_clone; @@ -287,7 +296,7 @@ pub fn spawn_process( }); tracing::info!(pid, cmd = cmd_str, "process started (pty)"); - Ok(handle) + Ok(SpawnedProcess { handle, data_rx, end_rx }) } else { let mut command = std::process::Command::new("/bin/sh"); command @@ -331,6 +340,9 @@ pub fn spawn_process( pty_master: Mutex::new(None), }); + let data_rx = handle.subscribe_data(); + let end_rx = handle.subscribe_end(); + if let Some(mut out) = stdout { let tx = data_tx.clone(); std::thread::spawn(move || { @@ -386,7 +398,7 @@ pub fn spawn_process( }); tracing::info!(pid, cmd = cmd_str, "process started (pipe)"); - Ok(handle) + Ok(SpawnedProcess { handle, data_rx, end_rx }) } } diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index 2682d84..24e8049 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -66,7 +66,7 @@ impl ProcessServiceImpl { fn spawn_from_request( &self, request: &StartRequestView<'_>, - ) -> Result, ConnectError> { + ) -> Result { let proc_config = request.process.as_option().ok_or_else(|| { ConnectError::new(ErrorCode::InvalidArgument, "process config required") })?; @@ -117,7 +117,7 @@ impl ProcessServiceImpl { "process.Start request" ); - let handle = process_handler::spawn_process( + let spawned = process_handler::spawn_process( cmd, &args, &envs, @@ -129,17 +129,17 @@ impl ProcessServiceImpl { &self.state.defaults.env_vars, )?; - self.processes.insert(handle.pid, Arc::clone(&handle)); + self.processes.insert(spawned.handle.pid, Arc::clone(&spawned.handle)); let processes = self.processes.clone(); - let pid = handle.pid; - let mut end_rx = handle.subscribe_end(); + let pid = spawned.handle.pid; + let mut cleanup_end_rx = spawned.handle.subscribe_end(); tokio::spawn(async move { - let _ = end_rx.recv().await; + let _ = cleanup_end_rx.recv().await; processes.remove(&pid); }); - Ok(handle) + Ok(spawned) } } @@ -183,11 +183,11 @@ impl Process for ProcessServiceImpl { ), ConnectError, > { - let handle = self.spawn_from_request(&request)?; - let pid = handle.pid; + let spawned = self.spawn_from_request(&request)?; + let pid = spawned.handle.pid; - let mut data_rx = handle.subscribe_data(); - let mut end_rx = handle.subscribe_end(); + let mut data_rx = spawned.data_rx; + let mut end_rx = spawned.end_rx; let stream = async_stream::stream! { yield Ok(make_start_response(pid));