1
0
forked from wrenn/wrenn

fix: subscribe to process channels before spawning threads to prevent event loss

Fast-exiting processes (e.g. echo) sent data/end events before
start() subscribed to the broadcast channels, causing the stream
to hang indefinitely and the exec RPC to time out with 502.

Move channel subscription into spawn_process, before reader/waiter
threads start, and return pre-subscribed receivers via SpawnedProcess.
This commit is contained in:
2026-05-09 17:28:37 +06:00
parent d1d316f35c
commit 522e1c5e90
3 changed files with 28 additions and 16 deletions

View File

@ -222,8 +222,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
&user, &user,
&state.defaults.env_vars, &state.defaults.env_vars,
) { ) {
Ok(handle) => { Ok(spawned) => {
tracing::info!(pid = handle.pid, cmd, "initial command spawned"); tracing::info!(pid = spawned.handle.pid, cmd, "initial command spawned");
} }
Err(e) => { Err(e) => {
tracing::error!(error = %e, cmd, "failed to spawn initial command"); tracing::error!(error = %e, cmd, "failed to spawn initial command");

View File

@ -128,6 +128,12 @@ impl ProcessHandle {
} }
} }
pub struct SpawnedProcess {
pub handle: Arc<ProcessHandle>,
pub data_rx: broadcast::Receiver<DataEvent>,
pub end_rx: broadcast::Receiver<EndEvent>,
}
pub fn spawn_process( pub fn spawn_process(
cmd_str: &str, cmd_str: &str,
args: &[String], args: &[String],
@ -138,7 +144,7 @@ pub fn spawn_process(
tag: Option<String>, tag: Option<String>,
user: &nix::unistd::User, user: &nix::unistd::User,
default_env_vars: &dashmap::DashMap<String, String>, default_env_vars: &dashmap::DashMap<String, String>,
) -> Result<Arc<ProcessHandle>, ConnectError> { ) -> Result<SpawnedProcess, ConnectError> {
let mut env: Vec<(String, String)> = Vec::new(); let mut env: Vec<(String, String)> = Vec::new();
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default())); env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
let home = user.dir.to_string_lossy().to_string(); let home = user.dir.to_string_lossy().to_string();
@ -248,6 +254,9 @@ pub fn spawn_process(
pty_master: Mutex::new(Some(master_file)), pty_master: Mutex::new(Some(master_file)),
}); });
let data_rx = handle.subscribe_data();
let end_rx = handle.subscribe_end();
let data_tx_clone = data_tx.clone(); let data_tx_clone = data_tx.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
let mut master = master_clone; let mut master = master_clone;
@ -287,7 +296,7 @@ pub fn spawn_process(
}); });
tracing::info!(pid, cmd = cmd_str, "process started (pty)"); tracing::info!(pid, cmd = cmd_str, "process started (pty)");
Ok(handle) Ok(SpawnedProcess { handle, data_rx, end_rx })
} else { } else {
let mut command = std::process::Command::new("/bin/sh"); let mut command = std::process::Command::new("/bin/sh");
command command
@ -331,6 +340,9 @@ pub fn spawn_process(
pty_master: Mutex::new(None), pty_master: Mutex::new(None),
}); });
let data_rx = handle.subscribe_data();
let end_rx = handle.subscribe_end();
if let Some(mut out) = stdout { if let Some(mut out) = stdout {
let tx = data_tx.clone(); let tx = data_tx.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
@ -386,7 +398,7 @@ pub fn spawn_process(
}); });
tracing::info!(pid, cmd = cmd_str, "process started (pipe)"); tracing::info!(pid, cmd = cmd_str, "process started (pipe)");
Ok(handle) Ok(SpawnedProcess { handle, data_rx, end_rx })
} }
} }

View File

@ -66,7 +66,7 @@ impl ProcessServiceImpl {
fn spawn_from_request( fn spawn_from_request(
&self, &self,
request: &StartRequestView<'_>, request: &StartRequestView<'_>,
) -> Result<Arc<ProcessHandle>, ConnectError> { ) -> Result<process_handler::SpawnedProcess, ConnectError> {
let proc_config = request.process.as_option().ok_or_else(|| { let proc_config = request.process.as_option().ok_or_else(|| {
ConnectError::new(ErrorCode::InvalidArgument, "process config required") ConnectError::new(ErrorCode::InvalidArgument, "process config required")
})?; })?;
@ -117,7 +117,7 @@ impl ProcessServiceImpl {
"process.Start request" "process.Start request"
); );
let handle = process_handler::spawn_process( let spawned = process_handler::spawn_process(
cmd, cmd,
&args, &args,
&envs, &envs,
@ -129,17 +129,17 @@ impl ProcessServiceImpl {
&self.state.defaults.env_vars, &self.state.defaults.env_vars,
)?; )?;
self.processes.insert(handle.pid, Arc::clone(&handle)); self.processes.insert(spawned.handle.pid, Arc::clone(&spawned.handle));
let processes = self.processes.clone(); let processes = self.processes.clone();
let pid = handle.pid; let pid = spawned.handle.pid;
let mut end_rx = handle.subscribe_end(); let mut cleanup_end_rx = spawned.handle.subscribe_end();
tokio::spawn(async move { tokio::spawn(async move {
let _ = end_rx.recv().await; let _ = cleanup_end_rx.recv().await;
processes.remove(&pid); processes.remove(&pid);
}); });
Ok(handle) Ok(spawned)
} }
} }
@ -183,11 +183,11 @@ impl Process for ProcessServiceImpl {
), ),
ConnectError, ConnectError,
> { > {
let handle = self.spawn_from_request(&request)?; let spawned = self.spawn_from_request(&request)?;
let pid = handle.pid; let pid = spawned.handle.pid;
let mut data_rx = handle.subscribe_data(); let mut data_rx = spawned.data_rx;
let mut end_rx = handle.subscribe_end(); let mut end_rx = spawned.end_rx;
let stream = async_stream::stream! { let stream = async_stream::stream! {
yield Ok(make_start_response(pid)); yield Ok(make_start_response(pid));