forked from wrenn/wrenn
fix: resolve PTY failure, MMDS file writes, and metrics instability in envd-rs
Three bugs fixed:
1. PTY connections failed because home directory was hardcoded as
/home/{username} instead of reading from /etc/passwd. For root,
this produced /home/root/ which doesn't exist — CWD validation
rejected every PTY Start request without explicit cwd. Fixed all
6 locations to use user.dir from nix::unistd::User.
2. MMDS polling silently failed to parse metadata because the
logs_collector_address field lacked #[serde(default)]. The host
agent only sends instanceID + envID — missing "address" field
caused every deserialize attempt to fail, so .WRENN_SANDBOX_ID
and .WRENN_TEMPLATE_ID were never written. Also added error
logging and create_dir_all before file writes.
3. Metrics CPU values were non-deterministic because a fresh
sysinfo::System was created per request with a 100ms sleep
between reads. Replaced with a background thread that samples
CPU at fixed 1-second intervals via a persistent System instance,
matching gopsutil's internal caching behavior. Metrics endpoint
now reads cached atomic values — no blocking, consistent window.
Also: close master PTY fd in child pre_exec, add process.Start
request logging, bump version to 0.2.0.
This commit is contained in:
2
envd-rs/Cargo.lock
generated
2
envd-rs/Cargo.lock
generated
@ -514,7 +514,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "envd"
|
name = "envd"
|
||||||
version = "0.1.2"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-stream",
|
"async-stream",
|
||||||
"axum",
|
"axum",
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "envd"
|
name = "envd"
|
||||||
version = "0.1.2"
|
version = "0.2.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
rust-version = "1.88"
|
rust-version = "1.88"
|
||||||
|
|
||||||
|
|||||||
@ -13,7 +13,7 @@ pub struct MMDSOpts {
|
|||||||
pub sandbox_id: String,
|
pub sandbox_id: String,
|
||||||
#[serde(rename = "envID")]
|
#[serde(rename = "envID")]
|
||||||
pub template_id: String,
|
pub template_id: String,
|
||||||
#[serde(rename = "address")]
|
#[serde(rename = "address", default)]
|
||||||
pub logs_collector_address: String,
|
pub logs_collector_address: String,
|
||||||
#[serde(rename = "accessTokenHash", default)]
|
#[serde(rename = "accessTokenHash", default)]
|
||||||
pub access_token_hash: String,
|
pub access_token_hash: String,
|
||||||
@ -103,8 +103,15 @@ pub async fn poll_for_opts(
|
|||||||
env_vars.insert("WRENN_TEMPLATE_ID".into(), opts.template_id.clone());
|
env_vars.insert("WRENN_TEMPLATE_ID".into(), opts.template_id.clone());
|
||||||
|
|
||||||
let run_dir = std::path::Path::new(WRENN_RUN_DIR);
|
let run_dir = std::path::Path::new(WRENN_RUN_DIR);
|
||||||
let _ = std::fs::write(run_dir.join(".WRENN_SANDBOX_ID"), &opts.sandbox_id);
|
if let Err(e) = std::fs::create_dir_all(run_dir) {
|
||||||
let _ = std::fs::write(run_dir.join(".WRENN_TEMPLATE_ID"), &opts.template_id);
|
tracing::error!(error = %e, "mmds: failed to create run dir");
|
||||||
|
}
|
||||||
|
if let Err(e) = std::fs::write(run_dir.join(".WRENN_SANDBOX_ID"), &opts.sandbox_id) {
|
||||||
|
tracing::error!(error = %e, "mmds: failed to write .WRENN_SANDBOX_ID");
|
||||||
|
}
|
||||||
|
if let Err(e) = std::fs::write(run_dir.join(".WRENN_TEMPLATE_ID"), &opts.template_id) {
|
||||||
|
tracing::error!(error = %e, "mmds: failed to write .WRENN_TEMPLATE_ID");
|
||||||
|
}
|
||||||
|
|
||||||
return Some(opts);
|
return Some(opts);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -95,7 +95,7 @@ pub async fn get_files(
|
|||||||
Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e),
|
Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e),
|
||||||
};
|
};
|
||||||
|
|
||||||
let home_dir = format!("/home/{}", user.name);
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let resolved = match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref())
|
let resolved = match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref())
|
||||||
{
|
{
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
@ -246,7 +246,7 @@ pub async fn post_files(
|
|||||||
Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e),
|
Err(e) => return json_error(StatusCode::UNAUTHORIZED, &e),
|
||||||
};
|
};
|
||||||
|
|
||||||
let home_dir = format!("/home/{}", user.name);
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let uid = user.uid;
|
let uid = user.uid;
|
||||||
let gid = user.gid;
|
let gid = user.gid;
|
||||||
|
|
||||||
|
|||||||
@ -22,10 +22,10 @@ pub struct Metrics {
|
|||||||
disk_total: u64,
|
disk_total: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_metrics(State(_state): State<Arc<AppState>>) -> impl IntoResponse {
|
pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||||
tracing::trace!("get metrics");
|
tracing::trace!("get metrics");
|
||||||
|
|
||||||
match collect_metrics() {
|
match collect_metrics(&state) {
|
||||||
Ok(m) => (
|
Ok(m) => (
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[(header::CACHE_CONTROL, "no-store")],
|
[(header::CACHE_CONTROL, "no-store")],
|
||||||
@ -39,26 +39,12 @@ pub async fn get_metrics(State(_state): State<Arc<AppState>>) -> impl IntoRespon
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn collect_metrics() -> Result<Metrics, String> {
|
fn collect_metrics(state: &AppState) -> Result<Metrics, String> {
|
||||||
use sysinfo::System;
|
let cpu_count = state.cpu_count();
|
||||||
|
let cpu_used_pct_rounded = state.cpu_used_pct();
|
||||||
|
|
||||||
let mut sys = System::new();
|
let mut sys = sysinfo::System::new();
|
||||||
sys.refresh_memory();
|
sys.refresh_memory();
|
||||||
sys.refresh_cpu_all();
|
|
||||||
|
|
||||||
// sysinfo needs a small delay for accurate CPU — first call returns 0.
|
|
||||||
// In a real daemon this would be cached; for now, report instantaneous.
|
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
|
||||||
sys.refresh_cpu_all();
|
|
||||||
|
|
||||||
let cpu_count = sys.cpus().len() as u32;
|
|
||||||
let cpu_used_pct = sys.global_cpu_usage();
|
|
||||||
let cpu_used_pct_rounded = if cpu_used_pct > 0.0 {
|
|
||||||
(cpu_used_pct * 100.0).round() / 100.0
|
|
||||||
} else {
|
|
||||||
0.0
|
|
||||||
};
|
|
||||||
|
|
||||||
let mem_total = sys.total_memory();
|
let mem_total = sys.total_memory();
|
||||||
let mem_used = sys.used_memory();
|
let mem_used = sys.used_memory();
|
||||||
let mem_total_mib = mem_total / 1024 / 1024;
|
let mem_total_mib = mem_total / 1024 / 1024;
|
||||||
|
|||||||
@ -196,7 +196,7 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let home = format!("/home/{}", user.name);
|
let home = user.dir.to_string_lossy().to_string();
|
||||||
let cwd = state
|
let cwd = state
|
||||||
.defaults
|
.defaults
|
||||||
.workdir
|
.workdir
|
||||||
|
|||||||
@ -36,7 +36,7 @@ impl FilesystemServiceImpl {
|
|||||||
ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}"))
|
ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let home_dir = format!("/home/{}", user.name);
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let default_workdir = self.state.defaults.workdir.as_deref();
|
let default_workdir = self.state.defaults.workdir.as_deref();
|
||||||
|
|
||||||
expand_and_resolve(path, &home_dir, default_workdir)
|
expand_and_resolve(path, &home_dir, default_workdir)
|
||||||
|
|||||||
@ -141,7 +141,7 @@ pub fn spawn_process(
|
|||||||
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
||||||
let mut env: Vec<(String, String)> = Vec::new();
|
let mut env: Vec<(String, String)> = Vec::new();
|
||||||
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
|
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
|
||||||
let home = format!("/home/{}", user.name);
|
let home = user.dir.to_string_lossy().to_string();
|
||||||
env.push(("HOME".into(), home));
|
env.push(("HOME".into(), home));
|
||||||
env.push(("USER".into(), user.name.clone()));
|
env.push(("USER".into(), user.name.clone()));
|
||||||
env.push(("LOGNAME".into(), user.name.clone()));
|
env.push(("LOGNAME".into(), user.name.clone()));
|
||||||
@ -206,7 +206,9 @@ pub fn spawn_process(
|
|||||||
unsafe {
|
unsafe {
|
||||||
use std::os::unix::io::AsRawFd;
|
use std::os::unix::io::AsRawFd;
|
||||||
let slave_raw = slave_fd.as_raw_fd();
|
let slave_raw = slave_fd.as_raw_fd();
|
||||||
|
let master_raw = master_fd.as_raw_fd();
|
||||||
command.pre_exec(move || {
|
command.pre_exec(move || {
|
||||||
|
libc::close(master_raw);
|
||||||
nix::unistd::setsid()
|
nix::unistd::setsid()
|
||||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||||
libc::ioctl(slave_raw, libc::TIOCSCTTY, 0);
|
libc::ioctl(slave_raw, libc::TIOCSCTTY, 0);
|
||||||
|
|||||||
@ -83,7 +83,7 @@ impl ProcessServiceImpl {
|
|||||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let home_dir = format!("/home/{}", user.name);
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let cwd_str: &str = proc_config.cwd.unwrap_or("");
|
let cwd_str: &str = proc_config.cwd.unwrap_or("");
|
||||||
let cwd = expand_and_resolve(cwd_str, &home_dir, self.state.defaults.workdir.as_deref())
|
let cwd = expand_and_resolve(cwd_str, &home_dir, self.state.defaults.workdir.as_deref())
|
||||||
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?;
|
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?;
|
||||||
@ -105,6 +105,17 @@ impl ProcessServiceImpl {
|
|||||||
let enable_stdin = request.stdin.unwrap_or(true);
|
let enable_stdin = request.stdin.unwrap_or(true);
|
||||||
let tag = request.tag.map(|s| s.to_string());
|
let tag = request.tag.map(|s| s.to_string());
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
cmd = cmd,
|
||||||
|
has_pty = pty_opts.is_some(),
|
||||||
|
pty_size = ?pty_opts,
|
||||||
|
tag = ?tag,
|
||||||
|
stdin = enable_stdin,
|
||||||
|
cwd = effective_cwd,
|
||||||
|
user = %username,
|
||||||
|
"process.Start request"
|
||||||
|
);
|
||||||
|
|
||||||
let handle = process_handler::spawn_process(
|
let handle = process_handler::spawn_process(
|
||||||
cmd,
|
cmd,
|
||||||
&args,
|
&args,
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::auth::token::SecureToken;
|
use crate::auth::token::SecureToken;
|
||||||
@ -17,6 +17,8 @@ pub struct AppState {
|
|||||||
pub access_token: SecureToken,
|
pub access_token: SecureToken,
|
||||||
pub conn_tracker: ConnTracker,
|
pub conn_tracker: ConnTracker,
|
||||||
pub port_subsystem: Option<Arc<PortSubsystem>>,
|
pub port_subsystem: Option<Arc<PortSubsystem>>,
|
||||||
|
pub cpu_used_pct: AtomicU32,
|
||||||
|
pub cpu_count: AtomicU32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
@ -27,7 +29,7 @@ impl AppState {
|
|||||||
is_fc: bool,
|
is_fc: bool,
|
||||||
port_subsystem: Option<Arc<PortSubsystem>>,
|
port_subsystem: Option<Arc<PortSubsystem>>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
Arc::new(Self {
|
let state = Arc::new(Self {
|
||||||
defaults,
|
defaults,
|
||||||
version,
|
version,
|
||||||
commit,
|
commit,
|
||||||
@ -37,6 +39,49 @@ impl AppState {
|
|||||||
access_token: SecureToken::new(),
|
access_token: SecureToken::new(),
|
||||||
conn_tracker: ConnTracker::new(),
|
conn_tracker: ConnTracker::new(),
|
||||||
port_subsystem,
|
port_subsystem,
|
||||||
})
|
cpu_used_pct: AtomicU32::new(0),
|
||||||
|
cpu_count: AtomicU32::new(0),
|
||||||
|
});
|
||||||
|
|
||||||
|
let state_clone = Arc::clone(&state);
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
cpu_sampler(state_clone);
|
||||||
|
});
|
||||||
|
|
||||||
|
state
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cpu_used_pct(&self) -> f32 {
|
||||||
|
f32::from_bits(self.cpu_used_pct.load(Ordering::Relaxed))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cpu_count(&self) -> u32 {
|
||||||
|
self.cpu_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cpu_sampler(state: Arc<AppState>) {
|
||||||
|
use sysinfo::System;
|
||||||
|
|
||||||
|
let mut sys = System::new();
|
||||||
|
sys.refresh_cpu_all();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||||
|
sys.refresh_cpu_all();
|
||||||
|
|
||||||
|
let pct = sys.global_cpu_usage();
|
||||||
|
let rounded = if pct > 0.0 {
|
||||||
|
(pct * 100.0).round() / 100.0
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
state
|
||||||
|
.cpu_used_pct
|
||||||
|
.store(rounded.to_bits(), Ordering::Relaxed);
|
||||||
|
state
|
||||||
|
.cpu_count
|
||||||
|
.store(sys.cpus().len() as u32, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user