diff --git a/envd-rs/Cargo.lock b/envd-rs/Cargo.lock index ecafb78..2e173d6 100644 --- a/envd-rs/Cargo.lock +++ b/envd-rs/Cargo.lock @@ -514,7 +514,7 @@ dependencies = [ [[package]] name = "envd" -version = "0.1.2" +version = "0.2.0" dependencies = [ "async-stream", "axum", diff --git a/envd-rs/Cargo.toml b/envd-rs/Cargo.toml index eea979a..55947e3 100644 --- a/envd-rs/Cargo.toml +++ b/envd-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "envd" -version = "0.1.2" +version = "0.2.0" edition = "2024" rust-version = "1.88" diff --git a/envd-rs/src/host/mmds.rs b/envd-rs/src/host/mmds.rs index ff74201..e2bf5bb 100644 --- a/envd-rs/src/host/mmds.rs +++ b/envd-rs/src/host/mmds.rs @@ -13,7 +13,7 @@ pub struct MMDSOpts { pub sandbox_id: String, #[serde(rename = "envID")] pub template_id: String, - #[serde(rename = "address")] + #[serde(rename = "address", default)] pub logs_collector_address: String, #[serde(rename = "accessTokenHash", default)] pub access_token_hash: String, @@ -103,8 +103,15 @@ pub async fn poll_for_opts( env_vars.insert("WRENN_TEMPLATE_ID".into(), opts.template_id.clone()); let run_dir = std::path::Path::new(WRENN_RUN_DIR); - let _ = std::fs::write(run_dir.join(".WRENN_SANDBOX_ID"), &opts.sandbox_id); - let _ = std::fs::write(run_dir.join(".WRENN_TEMPLATE_ID"), &opts.template_id); + if let Err(e) = std::fs::create_dir_all(run_dir) { + tracing::error!(error = %e, "mmds: failed to create run dir"); + } + if let Err(e) = std::fs::write(run_dir.join(".WRENN_SANDBOX_ID"), &opts.sandbox_id) { + tracing::error!(error = %e, "mmds: failed to write .WRENN_SANDBOX_ID"); + } + if let Err(e) = std::fs::write(run_dir.join(".WRENN_TEMPLATE_ID"), &opts.template_id) { + tracing::error!(error = %e, "mmds: failed to write .WRENN_TEMPLATE_ID"); + } return Some(opts); } diff --git a/envd-rs/src/http/files.rs b/envd-rs/src/http/files.rs index df9206f..dfe1e54 100644 --- a/envd-rs/src/http/files.rs +++ b/envd-rs/src/http/files.rs @@ -95,7 +95,7 @@ pub async fn get_files( Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e), }; - let home_dir = format!("/home/{}", user.name); + let home_dir = user.dir.to_string_lossy().to_string(); let resolved = match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref()) { Ok(p) => p, @@ -246,7 +246,7 @@ pub async fn post_files( Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e), }; - let home_dir = format!("/home/{}", user.name); + let home_dir = user.dir.to_string_lossy().to_string(); let uid = user.uid; let gid = user.gid; diff --git a/envd-rs/src/http/metrics.rs b/envd-rs/src/http/metrics.rs index b63dbda..da13452 100644 --- a/envd-rs/src/http/metrics.rs +++ b/envd-rs/src/http/metrics.rs @@ -22,10 +22,10 @@ pub struct Metrics { disk_total: u64, } -pub async fn get_metrics(State(_state): State>) -> impl IntoResponse { +pub async fn get_metrics(State(state): State>) -> impl IntoResponse { tracing::trace!("get metrics"); - match collect_metrics() { + match collect_metrics(&state) { Ok(m) => ( StatusCode::OK, [(header::CACHE_CONTROL, "no-store")], @@ -39,26 +39,12 @@ pub async fn get_metrics(State(_state): State>) -> impl IntoRespon } } -fn collect_metrics() -> Result { - use sysinfo::System; +fn collect_metrics(state: &AppState) -> Result { + let cpu_count = state.cpu_count(); + let cpu_used_pct_rounded = state.cpu_used_pct(); - let mut sys = System::new(); + let mut sys = sysinfo::System::new(); sys.refresh_memory(); - sys.refresh_cpu_all(); - - // sysinfo needs a small delay for accurate CPU — first call returns 0. - // In a real daemon this would be cached; for now, report instantaneous. - std::thread::sleep(std::time::Duration::from_millis(100)); - sys.refresh_cpu_all(); - - let cpu_count = sys.cpus().len() as u32; - let cpu_used_pct = sys.global_cpu_usage(); - let cpu_used_pct_rounded = if cpu_used_pct > 0.0 { - (cpu_used_pct * 100.0).round() / 100.0 - } else { - 0.0 - }; - let mem_total = sys.total_memory(); let mem_used = sys.used_memory(); let mem_total_mib = mem_total / 1024 / 1024; diff --git a/envd-rs/src/main.rs b/envd-rs/src/main.rs index 760cb93..587fc1a 100644 --- a/envd-rs/src/main.rs +++ b/envd-rs/src/main.rs @@ -196,7 +196,7 @@ fn spawn_initial_command(cmd: &str, state: &AppState) { } }; - let home = format!("/home/{}", user.name); + let home = user.dir.to_string_lossy().to_string(); let cwd = state .defaults .workdir diff --git a/envd-rs/src/rpc/filesystem_service.rs b/envd-rs/src/rpc/filesystem_service.rs index 8cf2b2c..1c73e93 100644 --- a/envd-rs/src/rpc/filesystem_service.rs +++ b/envd-rs/src/rpc/filesystem_service.rs @@ -36,7 +36,7 @@ impl FilesystemServiceImpl { ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}")) })?; - let home_dir = format!("/home/{}", user.name); + let home_dir = user.dir.to_string_lossy().to_string(); let default_workdir = self.state.defaults.workdir.as_deref(); expand_and_resolve(path, &home_dir, default_workdir) diff --git a/envd-rs/src/rpc/process_handler.rs b/envd-rs/src/rpc/process_handler.rs index cf0287c..296c075 100644 --- a/envd-rs/src/rpc/process_handler.rs +++ b/envd-rs/src/rpc/process_handler.rs @@ -141,7 +141,7 @@ pub fn spawn_process( ) -> Result, ConnectError> { let mut env: Vec<(String, String)> = Vec::new(); env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default())); - let home = format!("/home/{}", user.name); + let home = user.dir.to_string_lossy().to_string(); env.push(("HOME".into(), home)); env.push(("USER".into(), user.name.clone())); env.push(("LOGNAME".into(), user.name.clone())); @@ -206,7 +206,9 @@ pub fn spawn_process( unsafe { use std::os::unix::io::AsRawFd; let slave_raw = slave_fd.as_raw_fd(); + let master_raw = master_fd.as_raw_fd(); command.pre_exec(move || { + libc::close(master_raw); nix::unistd::setsid() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; libc::ioctl(slave_raw, libc::TIOCSCTTY, 0); diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index c69c646..92738b5 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -83,7 +83,7 @@ impl ProcessServiceImpl { .map(|(k, v)| (k.to_string(), v.to_string())) .collect(); - let home_dir = format!("/home/{}", user.name); + let home_dir = user.dir.to_string_lossy().to_string(); let cwd_str: &str = proc_config.cwd.unwrap_or(""); let cwd = expand_and_resolve(cwd_str, &home_dir, self.state.defaults.workdir.as_deref()) .map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?; @@ -105,6 +105,17 @@ impl ProcessServiceImpl { let enable_stdin = request.stdin.unwrap_or(true); let tag = request.tag.map(|s| s.to_string()); + tracing::info!( + cmd = cmd, + has_pty = pty_opts.is_some(), + pty_size = ?pty_opts, + tag = ?tag, + stdin = enable_stdin, + cwd = effective_cwd, + user = %username, + "process.Start request" + ); + let handle = process_handler::spawn_process( cmd, &args, diff --git a/envd-rs/src/state.rs b/envd-rs/src/state.rs index d54ea38..aa1f4a2 100644 --- a/envd-rs/src/state.rs +++ b/envd-rs/src/state.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use crate::auth::token::SecureToken; @@ -17,6 +17,8 @@ pub struct AppState { pub access_token: SecureToken, pub conn_tracker: ConnTracker, pub port_subsystem: Option>, + pub cpu_used_pct: AtomicU32, + pub cpu_count: AtomicU32, } impl AppState { @@ -27,7 +29,7 @@ impl AppState { is_fc: bool, port_subsystem: Option>, ) -> Arc { - Arc::new(Self { + let state = Arc::new(Self { defaults, version, commit, @@ -37,6 +39,49 @@ impl AppState { access_token: SecureToken::new(), conn_tracker: ConnTracker::new(), port_subsystem, - }) + cpu_used_pct: AtomicU32::new(0), + cpu_count: AtomicU32::new(0), + }); + + let state_clone = Arc::clone(&state); + std::thread::spawn(move || { + cpu_sampler(state_clone); + }); + + state + } + + pub fn cpu_used_pct(&self) -> f32 { + f32::from_bits(self.cpu_used_pct.load(Ordering::Relaxed)) + } + + pub fn cpu_count(&self) -> u32 { + self.cpu_count.load(Ordering::Relaxed) + } +} + +fn cpu_sampler(state: Arc) { + use sysinfo::System; + + let mut sys = System::new(); + sys.refresh_cpu_all(); + + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + sys.refresh_cpu_all(); + + let pct = sys.global_cpu_usage(); + let rounded = if pct > 0.0 { + (pct * 100.0).round() / 100.0 + } else { + 0.0 + }; + + state + .cpu_used_pct + .store(rounded.to_bits(), Ordering::Relaxed); + state + .cpu_count + .store(sys.cpus().len() as u32, Ordering::Relaxed); } }