From a08e755e53637bcfccb68065214e5096b3735d63 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Sat, 20 Jun 2026 22:45:08 +0000 Subject: [PATCH] v0.2.1 (#55) Co-authored-by: Tasnim Kabir Sadik Reviewed-on: https://git.omukk.dev/wrenn/wrenn/pulls/55 Co-authored-by: pptx704 Co-committed-by: pptx704 --- .env.example | 15 + CLAUDE.md | 4 +- Makefile | 1 + README.md | 2 + VERSION_AGENT | 2 +- VERSION_CP | 2 +- cmd/host-agent/main.go | 51 +++ envd-rs/Cargo.lock | 2 +- envd-rs/Cargo.toml | 2 +- envd-rs/src/auth/middleware.rs | 7 +- envd-rs/src/auth/mod.rs | 4 +- envd-rs/src/auth/signing.rs | 69 +++- envd-rs/src/cgroups/mod.rs | 15 +- envd-rs/src/cmd/mod.rs | 5 + envd-rs/src/cmd/ports.rs | 164 +++++++++ envd-rs/src/config.rs | 5 + envd-rs/src/crypto/mod.rs | 2 +- envd-rs/src/crypto/sha256.rs | 19 +- envd-rs/src/crypto/sha512.rs | 25 +- envd-rs/src/execcontext.rs | 10 +- envd-rs/src/http/activity.rs | 37 ++ envd-rs/src/http/encoding.rs | 70 +++- envd-rs/src/http/envs.rs | 5 +- envd-rs/src/http/files.rs | 39 +-- envd-rs/src/http/init.rs | 38 +- envd-rs/src/http/mod.rs | 4 +- envd-rs/src/main.rs | 92 +++-- envd-rs/src/permissions/mod.rs | 2 +- envd-rs/src/permissions/path.rs | 30 +- envd-rs/src/port/conn.rs | 107 +++++- envd-rs/src/rpc/entry.rs | 4 +- envd-rs/src/rpc/filesystem_service.rs | 36 +- envd-rs/src/rpc/mod.rs | 8 +- envd-rs/src/rpc/pb.rs | 7 +- envd-rs/src/rpc/process_handler.rs | 141 ++++++-- envd-rs/src/rpc/process_service.rs | 326 +++++++++--------- envd-rs/src/state.rs | 93 ++++- envd-rs/src/util.rs | 10 +- frontend/src/routes/admin/hosts/+page.svelte | 17 +- .../routes/dashboard/capsules/+layout.svelte | 4 - .../routes/dashboard/capsules/+page.svelte | 4 + internal/api/handlers_exec_stream.go | 50 ++- internal/api/handlers_process.go | 18 +- internal/api/middleware.go | 4 + internal/api/sandbox_event_consumer.go | 29 +- internal/api/sse_relay.go | 27 ++ internal/envdclient/client.go | 163 +++++---- internal/envdclient/health.go | 36 ++ internal/envdclient/process.go | 48 +-- internal/hostagent/server.go | 84 ++--- internal/sandbox/activity_test.go | 111 ++++++ internal/sandbox/manager.go | 200 ++++++++++- internal/sandbox/restore.go | 2 +- 53 files changed, 1675 insertions(+), 577 deletions(-) create mode 100644 envd-rs/src/cmd/mod.rs create mode 100644 envd-rs/src/cmd/ports.rs create mode 100644 envd-rs/src/http/activity.rs create mode 100644 internal/sandbox/activity_test.go diff --git a/.env.example b/.env.example index 055f609a..f242f9cb 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,21 @@ WRENN_HOST_INTERFACE=eth0 WRENN_CP_URL=http://localhost:9725 WRENN_DEFAULT_ROOTFS_SIZE=5Gi WRENN_CH_BIN=/usr/local/bin/cloud-hypervisor +# Public domain sandboxes are served under; injected into envd so `envd ports` +# can build {port}-{sandbox_id}.{domain} URLs. +WRENN_PROXY_DOMAIN=wrenn.dev + +# Inactivity activity sampler (all optional; shown values are the defaults). +# The host polls each running sandbox's guest liveness and refreshes its +# inactivity TTL when it is doing real work, so a long-running but +# non-interactive job (build, download) is not auto-paused. A sandbox counts +# as busy when guest CPU ≥ threshold, or net/disk throughput ≥ the floor. +# Busy requires the threshold to hold for 2 consecutive samples (debounced), +# so isolated idle-noise spikes do not keep a sandbox alive. +WRENN_ACTIVITY_SAMPLE_INTERVAL=5s +WRENN_CPU_BUSY_THRESHOLD=5.0 +WRENN_NET_FLOOR_BPS=16384 +WRENN_DISK_FLOOR_BPS=32768 # Auth JWT_SECRET= diff --git a/CLAUDE.md b/CLAUDE.md index 4575423d..5c3e5c3d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -Wrenn Sandbox is a microVM-based code execution platform. Users create isolated sandboxes (Cloud Hypervisor microVMs), run code inside them, and get output back via SDKs. Think E2B but with persistent sandboxes, pool-based pricing, and a single-binary deployment story. +Wrenn is an open-source, self-hosted dev environment platform. Users spin up isolated sandboxes (Cloud Hypervisor microVMs), run code inside them, and get output back via SDKs. Fast boot, persistent state, and a single agent binary on each host you own. ## Build & Development Commands @@ -28,7 +28,7 @@ make dev-envd # envd in debug mode (port 49983) make check # fmt + vet + lint + test (CI order) make test # Unit tests: go test -race -v ./internal/... make test-integration # Integration tests (require host agent + Cloud Hypervisor) -make fmt # gofmt +make fmt # gofmt and rust fmt make vet # go vet make lint # golangci-lint diff --git a/Makefile b/Makefile index 8564ef2d..cff4cb88 100644 --- a/Makefile +++ b/Makefile @@ -106,6 +106,7 @@ sqlc: fmt: gofmt -w . + cargo fmt --manifest-path envd-rs/Cargo.toml lint: golangci-lint run ./... diff --git a/README.md b/README.md index d7325b2e..9892cb17 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Secure infrastructure for AI +Wrenn is an open-source self-hosted dev environment platform. Each capsule is a fully isolated virtual machine — booted in seconds, persistent across sessions. Run the control plane anywhere, deploy a single agent binary on each compute host. + ## Prerequisites - Linux host with `/dev/kvm` access (bare metal or nested virt) diff --git a/VERSION_AGENT b/VERSION_AGENT index 0ea3a944..0c62199f 100644 --- a/VERSION_AGENT +++ b/VERSION_AGENT @@ -1 +1 @@ -0.2.0 +0.2.1 diff --git a/VERSION_CP b/VERSION_CP index 0ea3a944..0c62199f 100644 --- a/VERSION_CP +++ b/VERSION_CP @@ -1 +1 @@ -0.2.0 +0.2.1 diff --git a/cmd/host-agent/main.go b/cmd/host-agent/main.go index e9a9655b..4ec7b665 100644 --- a/cmd/host-agent/main.go +++ b/cmd/host-agent/main.go @@ -148,6 +148,13 @@ func main() { VMMBin: chBin, VMMVersion: chVersion, AgentVersion: version, + ProxyDomain: envOrDefault("WRENN_PROXY_DOMAIN", "wrenn.dev"), + + // Activity sampler tuning (all optional; zero → sandbox package default). + ActivitySampleInterval: envDuration("WRENN_ACTIVITY_SAMPLE_INTERVAL"), + CPUBusyPct: envFloat32("WRENN_CPU_BUSY_THRESHOLD"), + NetFloorBps: envUint64("WRENN_NET_FLOOR_BPS"), + DiskFloorBps: envUint64("WRENN_DISK_FLOOR_BPS"), } // Remove any *.staging-* / *.trash-* directories left behind by a @@ -171,6 +178,7 @@ func main() { mgr.RestorePausedSandboxes() mgr.StartTTLReaper(ctx) + mgr.StartActivitySampler(ctx) // httpServer is declared here so the shutdown func can reference it. // ReadTimeout/WriteTimeout are intentionally omitted — they would kill @@ -311,6 +319,49 @@ func envOrDefault(key, def string) string { return def } +// envDuration parses an optional duration env var (e.g. "5s"). Empty or +// invalid → zero, letting the sandbox package apply its default. +func envDuration(key string) time.Duration { + v := os.Getenv(key) + if v == "" { + return 0 + } + d, err := time.ParseDuration(v) + if err != nil { + slog.Warn("invalid duration env var, using default", "key", key, "value", v) + return 0 + } + return d +} + +// envFloat32 parses an optional float env var. Empty or invalid → 0. +func envFloat32(key string) float32 { + v := os.Getenv(key) + if v == "" { + return 0 + } + f, err := strconv.ParseFloat(v, 32) + if err != nil { + slog.Warn("invalid float env var, using default", "key", key, "value", v) + return 0 + } + return float32(f) +} + +// envUint64 parses an optional unsigned-int env var. Empty or invalid → 0. +func envUint64(key string) uint64 { + v := os.Getenv(key) + if v == "" { + return 0 + } + n, err := strconv.ParseUint(v, 10, 64) + if err != nil { + slog.Warn("invalid uint env var, using default", "key", key, "value", v) + return 0 + } + return n +} + // checkPrivileges verifies the process has the required Linux capabilities. // Always reads CapEff — even for root — because a root process inside a // restricted container (e.g. docker --cap-drop=all) may not have all caps. diff --git a/envd-rs/Cargo.lock b/envd-rs/Cargo.lock index d48d2a39..effa7115 100644 --- a/envd-rs/Cargo.lock +++ b/envd-rs/Cargo.lock @@ -529,7 +529,7 @@ dependencies = [ [[package]] name = "envd" -version = "0.3.0" +version = "0.4.0" dependencies = [ "async-stream", "axum", diff --git a/envd-rs/Cargo.toml b/envd-rs/Cargo.toml index f741acfb..0adfd77f 100644 --- a/envd-rs/Cargo.toml +++ b/envd-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "envd" -version = "0.3.0" +version = "0.4.0" edition = "2024" rust-version = "1.95" diff --git a/envd-rs/src/auth/middleware.rs b/envd-rs/src/auth/middleware.rs index 918fb5ee..8583a2c7 100644 --- a/envd-rs/src/auth/middleware.rs +++ b/envd-rs/src/auth/middleware.rs @@ -14,6 +14,7 @@ const ACCESS_TOKEN_HEADER: &str = "x-access-token"; /// Format: "METHOD/path" const AUTH_EXCLUDED: &[&str] = &[ "GET/health", + "GET/activity", "GET/files", "POST/files", "POST/init", @@ -21,11 +22,7 @@ const AUTH_EXCLUDED: &[&str] = &[ ]; /// Axum middleware that checks X-Access-Token header. -pub async fn auth_layer( - request: Request, - next: Next, - access_token: Arc, -) -> Response { +pub async fn auth_layer(request: Request, next: Next, access_token: Arc) -> Response { if access_token.is_set() { let method = request.method().as_str(); let path = request.uri().path(); diff --git a/envd-rs/src/auth/mod.rs b/envd-rs/src/auth/mod.rs index 6a34efc8..7e4c9922 100644 --- a/envd-rs/src/auth/mod.rs +++ b/envd-rs/src/auth/mod.rs @@ -1,3 +1,3 @@ -pub mod token; -pub mod signing; pub mod middleware; +pub mod signing; +pub mod token; diff --git a/envd-rs/src/auth/signing.rs b/envd-rs/src/auth/signing.rs index 348d0c4b..c123821e 100644 --- a/envd-rs/src/auth/signing.rs +++ b/envd-rs/src/auth/signing.rs @@ -140,13 +140,32 @@ mod tests { #[test] fn validate_correct_header_token() { let token = test_token(b"secret"); - assert!(validate_signing(&token, Some("secret"), None, None, "root", "/f", READ_OPERATION).is_ok()); + assert!( + validate_signing( + &token, + Some("secret"), + None, + None, + "root", + "/f", + READ_OPERATION + ) + .is_ok() + ); } #[test] fn validate_wrong_header_token() { let token = test_token(b"secret"); - let result = validate_signing(&token, Some("wrong"), None, None, "root", "/f", READ_OPERATION); + let result = validate_signing( + &token, + Some("wrong"), + None, + None, + "root", + "/f", + READ_OPERATION, + ); assert!(result.is_err()); assert!(result.unwrap_err().contains("does not match")); } @@ -156,13 +175,32 @@ mod tests { let token = test_token(b"secret"); let exp = far_future(); let sig = generate_signature(&token, "/file", "root", READ_OPERATION, Some(exp)).unwrap(); - assert!(validate_signing(&token, None, Some(&sig), Some(exp), "root", "/file", READ_OPERATION).is_ok()); + assert!( + validate_signing( + &token, + None, + Some(&sig), + Some(exp), + "root", + "/file", + READ_OPERATION + ) + .is_ok() + ); } #[test] fn validate_invalid_signature() { let token = test_token(b"secret"); - let result = validate_signing(&token, None, Some("v1_bad"), Some(far_future()), "root", "/f", READ_OPERATION); + let result = validate_signing( + &token, + None, + Some("v1_bad"), + Some(far_future()), + "root", + "/f", + READ_OPERATION, + ); assert!(result.is_err()); assert!(result.unwrap_err().contains("invalid signature")); } @@ -172,7 +210,15 @@ mod tests { let token = test_token(b"secret"); let expired: i64 = 1_000_000; let sig = generate_signature(&token, "/f", "root", READ_OPERATION, Some(expired)).unwrap(); - let result = validate_signing(&token, None, Some(&sig), Some(expired), "root", "/f", READ_OPERATION); + let result = validate_signing( + &token, + None, + Some(&sig), + Some(expired), + "root", + "/f", + READ_OPERATION, + ); assert!(result.is_err()); assert!(result.unwrap_err().contains("expired")); } @@ -197,7 +243,18 @@ mod tests { fn validate_valid_signature_no_expiration() { let token = test_token(b"secret"); let sig = generate_signature(&token, "/file", "root", READ_OPERATION, None).unwrap(); - assert!(validate_signing(&token, None, Some(&sig), None, "root", "/file", READ_OPERATION).is_ok()); + assert!( + validate_signing( + &token, + None, + Some(&sig), + None, + "root", + "/file", + READ_OPERATION + ) + .is_ok() + ); } #[test] diff --git a/envd-rs/src/cgroups/mod.rs b/envd-rs/src/cgroups/mod.rs index 1ec9dab0..9a375c1b 100644 --- a/envd-rs/src/cgroups/mod.rs +++ b/envd-rs/src/cgroups/mod.rs @@ -19,20 +19,25 @@ pub struct Cgroup2Manager { } impl Cgroup2Manager { - pub fn new(root: &str, configs: &[(ProcessType, &str, &[(&str, &str)])]) -> Result { + pub fn new( + root: &str, + configs: &[(ProcessType, &str, &[(&str, &str)])], + ) -> Result { let mut fds = HashMap::new(); for (proc_type, sub_path, properties) in configs { let full_path = PathBuf::from(root).join(sub_path); - fs::create_dir_all(&full_path).map_err(|e| { - format!("failed to create cgroup {}: {e}", full_path.display()) - })?; + fs::create_dir_all(&full_path) + .map_err(|e| format!("failed to create cgroup {}: {e}", full_path.display()))?; for (name, value) in *properties { let prop_path = full_path.join(name); fs::write(&prop_path, value).map_err(|e| { - format!("failed to write cgroup property {}: {e}", prop_path.display()) + format!( + "failed to write cgroup property {}: {e}", + prop_path.display() + ) })?; } diff --git a/envd-rs/src/cmd/mod.rs b/envd-rs/src/cmd/mod.rs new file mode 100644 index 00000000..f2df95ee --- /dev/null +++ b/envd-rs/src/cmd/mod.rs @@ -0,0 +1,5 @@ +//! Client subcommands for the `envd` binary. These run as short-lived +//! invocations (e.g. `envd ports`) inside the guest, separate from the +//! long-running daemon, and exit when done. + +pub mod ports; diff --git a/envd-rs/src/cmd/ports.rs b/envd-rs/src/cmd/ports.rs new file mode 100644 index 00000000..041b7c28 --- /dev/null +++ b/envd-rs/src/cmd/ports.rs @@ -0,0 +1,164 @@ +//! `envd ports` — list the open ports inside the sandbox that are reachable +//! from outside, alongside the URL each is served at. +//! +//! Runs as a one-shot client (not the daemon): it scans `/proc/net/tcp[6]` +//! directly via the shared port helper and reads the sandbox identity that the +//! daemon recorded under /run/wrenn at /init time. It refuses to run outside a +//! wrenn sandbox. + +use std::fs; +use std::path::Path; + +use crate::config::{DEFAULT_PORT, DEFAULT_PROXY_DOMAIN, WRENN_RUN_DIR}; +use crate::port::conn::reachable_listening_ports; + +/// Arguments for the `envd ports` subcommand. +#[derive(clap::Args)] +pub struct PortsArgs { + /// Override the proxy domain used to build URLs (default: the domain + /// injected by the host, falling back to the built-in default). + #[arg(long)] + domain: Option, + + /// Emit JSON instead of a table. + #[arg(long)] + json: bool, +} + +#[derive(serde::Serialize)] +struct PortEntry { + port: u32, + url: String, +} + +/// Runs the subcommand and returns the desired process exit code. +pub fn run(args: &PortsArgs) -> i32 { + if !inside_sandbox() { + eprintln!("envd ports: not running inside a wrenn sandbox"); + return 1; + } + + let sandbox_id = read_identity("WRENN_SANDBOX_ID", ".WRENN_SANDBOX_ID"); + let domain = args + .domain + .clone() + .filter(|d| !d.is_empty()) + .or_else(|| read_identity("WRENN_PROXY_DOMAIN", ".WRENN_PROXY_DOMAIN")) + .unwrap_or_else(|| DEFAULT_PROXY_DOMAIN.to_string()); + + let entries: Vec = reachable_listening_ports(DEFAULT_PORT as u32) + .into_iter() + .map(|port| PortEntry { + url: build_url(port, sandbox_id.as_deref(), &domain), + port, + }) + .collect(); + + if args.json { + match serde_json::to_string_pretty(&entries) { + Ok(s) => println!("{s}"), + Err(e) => { + eprintln!("envd ports: failed to encode JSON: {e}"); + return 1; + } + } + return 0; + } + + if entries.is_empty() { + println!("No open ports."); + return 0; + } + + println!("{:<6} {}", "PORT", "URL"); + for e in &entries { + println!("{:<6} {}", e.port, e.url); + } + 0 +} + +/// A wrenn sandbox is identified by the marker the daemon writes at startup +/// (`/run/wrenn/.WRENN_SANDBOX`) and the `WRENN_SANDBOX` env var it exports +/// into spawned processes. Running `envd ports` on a normal host finds neither +/// and is refused. +fn inside_sandbox() -> bool { + if std::env::var("WRENN_SANDBOX").as_deref() == Ok("true") { + return true; + } + Path::new(WRENN_RUN_DIR).join(".WRENN_SANDBOX").exists() +} + +/// Reads an identity value from the environment, falling back to the matching +/// /run/wrenn file. Returns None when neither is set or both are blank. +fn read_identity(env_key: &str, file_name: &str) -> Option { + if let Ok(v) = std::env::var(env_key) { + let v = v.trim().to_string(); + if !v.is_empty() { + return Some(v); + } + } + match fs::read_to_string(Path::new(WRENN_RUN_DIR).join(file_name)) { + Ok(v) => { + let v = v.trim().to_string(); + if v.is_empty() { None } else { Some(v) } + } + Err(_) => None, + } +} + +/// Builds the externally-reachable URL for a port. With a known sandbox ID the +/// result is a working https URL; without it (identity not yet injected) the +/// sandbox-ID segment degrades to a `` placeholder so output is +/// still informative. +fn build_url(port: u32, sandbox_id: Option<&str>, domain: &str) -> String { + let id = sandbox_id.unwrap_or(""); + format!("https://{port}-{id}.{domain}") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn url_with_sandbox_id() { + assert_eq!( + build_url(8000, Some("cl-abcd1234"), "wrenn.dev"), + "https://8000-cl-abcd1234.wrenn.dev" + ); + } + + #[test] + fn url_without_sandbox_id_uses_placeholder() { + assert_eq!( + build_url(5173, None, "wrenn.dev"), + "https://5173-.wrenn.dev" + ); + } + + #[test] + fn url_honors_custom_domain() { + assert_eq!( + build_url(3000, Some("cl-deadbeef"), "sandbox.example.com"), + "https://3000-cl-deadbeef.sandbox.example.com" + ); + } + + #[test] + fn read_identity_prefers_env() { + // SAFETY: test-local env var, single-threaded test body. + unsafe { std::env::set_var("ENVD_PORTS_TEST_ID", " cl-fromenv ") }; + assert_eq!( + read_identity("ENVD_PORTS_TEST_ID", ".nonexistent-file"), + Some("cl-fromenv".to_string()) + ); + unsafe { std::env::remove_var("ENVD_PORTS_TEST_ID") }; + } + + #[test] + fn read_identity_none_when_unset() { + assert_eq!( + read_identity("ENVD_PORTS_TEST_UNSET", ".nonexistent-file"), + None + ); + } +} diff --git a/envd-rs/src/config.rs b/envd-rs/src/config.rs index be89725f..92367dfa 100644 --- a/envd-rs/src/config.rs +++ b/envd-rs/src/config.rs @@ -7,5 +7,10 @@ pub const PORT_SCANNER_INTERVAL: Duration = Duration::from_millis(1000); pub const DEFAULT_USER: &str = "root"; pub const WRENN_RUN_DIR: &str = "/run/wrenn"; +/// Fallback proxy domain used by `envd ports` to build URLs when the host has +/// not injected one via /init. Matches the host agent's WRENN_PROXY_DOMAIN +/// default. +pub const DEFAULT_PROXY_DOMAIN: &str = "wrenn.dev"; + pub const KILOBYTE: u64 = 1024; pub const MEGABYTE: u64 = 1024 * KILOBYTE; diff --git a/envd-rs/src/crypto/mod.rs b/envd-rs/src/crypto/mod.rs index 11785bc8..0324f203 100644 --- a/envd-rs/src/crypto/mod.rs +++ b/envd-rs/src/crypto/mod.rs @@ -1,3 +1,3 @@ +pub mod hmac_sha256; pub mod sha256; pub mod sha512; -pub mod hmac_sha256; diff --git a/envd-rs/src/crypto/sha256.rs b/envd-rs/src/crypto/sha256.rs index 353c3cbe..b47a5070 100644 --- a/envd-rs/src/crypto/sha256.rs +++ b/envd-rs/src/crypto/sha256.rs @@ -20,14 +20,22 @@ mod tests { const VECTORS: &[(&[u8], &str)] = &[ (b"", "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU"), (b"abc", "ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0"), - (b"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", "JI1qYdIGOLjlwCaTDD5gOaM85Flk/yFn9uzt1BnbBsE"), + ( + b"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", + "JI1qYdIGOLjlwCaTDD5gOaM85Flk/yFn9uzt1BnbBsE", + ), ]; #[test] fn known_answer_with_prefix() { for (input, expected_b64) in VECTORS { let result = hash(input); - assert_eq!(result, format!("$sha256${expected_b64}"), "input: {:?}", String::from_utf8_lossy(input)); + assert_eq!( + result, + format!("$sha256${expected_b64}"), + "input: {:?}", + String::from_utf8_lossy(input) + ); } } @@ -35,7 +43,12 @@ mod tests { fn known_answer_without_prefix() { for (input, expected_b64) in VECTORS { let result = hash_without_prefix(input); - assert_eq!(result, *expected_b64, "input: {:?}", String::from_utf8_lossy(input)); + assert_eq!( + result, + *expected_b64, + "input: {:?}", + String::from_utf8_lossy(input) + ); } } diff --git a/envd-rs/src/crypto/sha512.rs b/envd-rs/src/crypto/sha512.rs index 747ed111..0d9f5a35 100644 --- a/envd-rs/src/crypto/sha512.rs +++ b/envd-rs/src/crypto/sha512.rs @@ -15,9 +15,18 @@ mod tests { use super::*; const VECTORS: &[(&str, &str)] = &[ - ("", "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"), - ("abc", "ddaf35a193617abacc417349ae20413112e6fa4e89a97ea20a9eeee64b55d39a2192992a274fc1a836ba3c23a3feebbd454d4423643ce80e2a9ac94fa54ca49f"), - ("abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", "204a8fc6dda82f0a0ced7beb8e08a41657c16ef468b228a8279be331a703c33596fd15c13b1b07f9aa1d3bea57789ca031ad85c7a71dd70354ec631238ca3445"), + ( + "", + "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e", + ), + ( + "abc", + "ddaf35a193617abacc417349ae20413112e6fa4e89a97ea20a9eeee64b55d39a2192992a274fc1a836ba3c23a3feebbd454d4423643ce80e2a9ac94fa54ca49f", + ), + ( + "abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", + "204a8fc6dda82f0a0ced7beb8e08a41657c16ef468b228a8279be331a703c33596fd15c13b1b07f9aa1d3bea57789ca031ad85c7a71dd70354ec631238ca3445", + ), ]; #[test] @@ -30,7 +39,10 @@ mod tests { #[test] fn str_and_bytes_agree() { for (input, _) in VECTORS { - assert_eq!(hash_access_token(input), hash_access_token_bytes(input.as_bytes())); + assert_eq!( + hash_access_token(input), + hash_access_token_bytes(input.as_bytes()) + ); } } @@ -38,6 +50,9 @@ mod tests { fn output_is_lowercase_hex_128_chars() { let h = hash_access_token("anything"); assert_eq!(h.len(), 128); - assert!(h.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())); + assert!( + h.chars() + .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) + ); } } diff --git a/envd-rs/src/execcontext.rs b/envd-rs/src/execcontext.rs index 6ad29e64..8874c931 100644 --- a/envd-rs/src/execcontext.rs +++ b/envd-rs/src/execcontext.rs @@ -62,7 +62,10 @@ mod tests { #[test] fn workdir_explicit_overrides_default() { - assert_eq!(resolve_default_workdir("/explicit", Some("/default")), "/explicit"); + assert_eq!( + resolve_default_workdir("/explicit", Some("/default")), + "/explicit" + ); } #[test] @@ -82,7 +85,10 @@ mod tests { #[test] fn username_explicit_returns_explicit() { - assert_eq!(resolve_default_username(Some("root"), "wrenn").unwrap(), "root"); + assert_eq!( + resolve_default_username(Some("root"), "wrenn").unwrap(), + "root" + ); } #[test] diff --git a/envd-rs/src/http/activity.rs b/envd-rs/src/http/activity.rs new file mode 100644 index 00000000..d917fde7 --- /dev/null +++ b/envd-rs/src/http/activity.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use axum::Json; +use axum::extract::State; +use axum::http::header; +use axum::response::IntoResponse; +use serde::Serialize; + +use crate::state::AppState; + +/// Liveness snapshot the host activity sampler polls to decide whether a +/// sandbox is doing real work. All fields are served straight from atomics +/// updated by the 1s sampler thread — no syscalls per request, so the host +/// can poll cheaply at a few-second cadence. +#[derive(Serialize)] +pub struct Activity { + cpu_count: u32, + cpu_used_pct: f32, + net_bps: u64, + disk_bps: u64, +} + +pub async fn get_activity(State(state): State>) -> impl IntoResponse { + tracing::trace!("get activity"); + + let body = Activity { + cpu_count: state.cpu_count(), + cpu_used_pct: state.cpu_used_pct(), + net_bps: state.net_bps(), + disk_bps: state.disk_bps(), + }; + + ( + [(header::CACHE_CONTROL, "no-store")], + Json(body), + ) +} diff --git a/envd-rs/src/http/encoding.rs b/envd-rs/src/http/encoding.rs index d573d047..4ddba25c 100644 --- a/envd-rs/src/http/encoding.rs +++ b/envd-rs/src/http/encoding.rs @@ -20,7 +20,10 @@ fn parse_encoding_with_quality(value: &str) -> EncodingWithQuality { let enc = value[..idx].trim(); for param in params.split(';') { let param = param.trim(); - if let Some(stripped) = param.strip_prefix("q=").or_else(|| param.strip_prefix("Q=")) { + if let Some(stripped) = param + .strip_prefix("q=") + .or_else(|| param.strip_prefix("Q=")) + { if let Ok(q) = stripped.parse::() { quality = q; } @@ -43,8 +46,10 @@ fn parse_accept_encoding_header(header: &str) -> (Vec, bool return (Vec::new(), false); } - let encodings: Vec = - header.split(',').map(|v| parse_encoding_with_quality(v)).collect(); + let encodings: Vec = header + .split(',') + .map(|v| parse_encoding_with_quality(v)) + .collect(); let mut identity_rejected = false; let mut identity_explicitly_accepted = false; @@ -97,7 +102,11 @@ pub fn parse_accept_encoding(r: &Request) -> Result<&'static str, String> } let (mut encodings, identity_rejected) = parse_accept_encoding_header(header); - encodings.sort_by(|a, b| b.quality.partial_cmp(&a.quality).unwrap_or(std::cmp::Ordering::Equal)); + encodings.sort_by(|a, b| { + b.quality + .partial_cmp(&a.quality) + .unwrap_or(std::cmp::Ordering::Equal) + }); for eq in &encodings { if eq.quality == 0.0 { @@ -121,7 +130,9 @@ pub fn parse_accept_encoding(r: &Request) -> Result<&'static str, String> return Ok(ENCODING_IDENTITY); } - Err(format!("no acceptable encoding found, supported: {SUPPORTED_ENCODINGS:?}")) + Err(format!( + "no acceptable encoding found, supported: {SUPPORTED_ENCODINGS:?}" + )) } pub fn parse_content_encoding(r: &Request) -> Result<&'static str, String> { @@ -143,7 +154,9 @@ pub fn parse_content_encoding(r: &Request) -> Result<&'static str, String> return Ok(ENCODING_GZIP); } - Err(format!("unsupported Content-Encoding: {header}, supported: {SUPPORTED_ENCODINGS:?}")) + Err(format!( + "unsupported Content-Encoding: {header}, supported: {SUPPORTED_ENCODINGS:?}" + )) } #[cfg(test)] @@ -236,17 +249,26 @@ mod tests { #[test] fn accept_encoding_no_header_returns_identity() { - assert_eq!(parse_accept_encoding(&req_no_headers()).unwrap(), "identity"); + assert_eq!( + parse_accept_encoding(&req_no_headers()).unwrap(), + "identity" + ); } #[test] fn accept_encoding_gzip() { - assert_eq!(parse_accept_encoding(&req_with_accept("gzip")).unwrap(), "gzip"); + assert_eq!( + parse_accept_encoding(&req_with_accept("gzip")).unwrap(), + "gzip" + ); } #[test] fn accept_encoding_identity_explicit() { - assert_eq!(parse_accept_encoding(&req_with_accept("identity")).unwrap(), "identity"); + assert_eq!( + parse_accept_encoding(&req_with_accept("identity")).unwrap(), + "identity" + ); } #[test] @@ -259,7 +281,10 @@ mod tests { #[test] fn accept_encoding_wildcard_returns_identity() { - assert_eq!(parse_accept_encoding(&req_with_accept("*")).unwrap(), "identity"); + assert_eq!( + parse_accept_encoding(&req_with_accept("*")).unwrap(), + "identity" + ); } #[test] @@ -277,7 +302,10 @@ mod tests { #[test] fn accept_encoding_unsupported_only_falls_to_identity() { - assert_eq!(parse_accept_encoding(&req_with_accept("br")).unwrap(), "identity"); + assert_eq!( + parse_accept_encoding(&req_with_accept("br")).unwrap(), + "identity" + ); } // is_identity_acceptable @@ -311,17 +339,26 @@ mod tests { #[test] fn content_encoding_empty_returns_identity() { - assert_eq!(parse_content_encoding(&req_no_headers()).unwrap(), "identity"); + assert_eq!( + parse_content_encoding(&req_no_headers()).unwrap(), + "identity" + ); } #[test] fn content_encoding_gzip() { - assert_eq!(parse_content_encoding(&req_with_content("gzip")).unwrap(), "gzip"); + assert_eq!( + parse_content_encoding(&req_with_content("gzip")).unwrap(), + "gzip" + ); } #[test] fn content_encoding_identity_explicit() { - assert_eq!(parse_content_encoding(&req_with_content("identity")).unwrap(), "identity"); + assert_eq!( + parse_content_encoding(&req_with_content("identity")).unwrap(), + "identity" + ); } #[test] @@ -331,6 +368,9 @@ mod tests { #[test] fn content_encoding_case_insensitive() { - assert_eq!(parse_content_encoding(&req_with_content("GZIP")).unwrap(), "gzip"); + assert_eq!( + parse_content_encoding(&req_with_content("GZIP")).unwrap(), + "gzip" + ); } } diff --git a/envd-rs/src/http/envs.rs b/envd-rs/src/http/envs.rs index 0d87ccc3..7de647ad 100644 --- a/envd-rs/src/http/envs.rs +++ b/envd-rs/src/http/envs.rs @@ -18,8 +18,5 @@ pub async fn get_envs(State(state): State>) -> impl IntoResponse { .map(|entry| (entry.key().clone(), entry.value().clone())) .collect(); - ( - [(header::CACHE_CONTROL, "no-store")], - Json(envs), - ) + ([(header::CACHE_CONTROL, "no-store")], Json(envs)) } diff --git a/envd-rs/src/http/files.rs b/envd-rs/src/http/files.rs index e0d7ab2c..b37770bc 100644 --- a/envd-rs/src/http/files.rs +++ b/envd-rs/src/http/files.rs @@ -72,13 +72,11 @@ pub async fn get_files( let header_token = extract_header_token(&req); let default_user = state.defaults.user(); - let username = match execcontext::resolve_default_username( - params.username.as_deref(), - &default_user, - ) { - Ok(u) => u.to_string(), - Err(e) => return json_error(StatusCode::BAD_REQUEST, e), - }; + let username = + match execcontext::resolve_default_username(params.username.as_deref(), &default_user) { + Ok(u) => u.to_string(), + Err(e) => return json_error(StatusCode::BAD_REQUEST, e), + }; if let Err(e) = validate_file_signing( &state, @@ -98,8 +96,7 @@ pub async fn get_files( let home_dir = user.dir.to_string_lossy().to_string(); let default_workdir = state.defaults.workdir(); - let resolved = match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref()) - { + let resolved = match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref()) { Ok(p) => p, Err(e) => return json_error(StatusCode::BAD_REQUEST, &e), }; @@ -177,8 +174,7 @@ pub async fn get_files( .unwrap_or("application/octet-stream"); if use_encoding == "gzip" { - let mut encoder = - flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); if let Err(e) = encoder.write_all(&file_data) { return json_error( StatusCode::INTERNAL_SERVER_ERROR, @@ -225,13 +221,11 @@ pub async fn post_files( let header_token = extract_header_token(&req); let default_user = state.defaults.user(); - let username = match execcontext::resolve_default_username( - params.username.as_deref(), - &default_user, - ) { - Ok(u) => u.to_string(), - Err(e) => return json_error(StatusCode::BAD_REQUEST, e), - }; + let username = + match execcontext::resolve_default_username(params.username.as_deref(), &default_user) { + Ok(u) => u.to_string(), + Err(e) => return json_error(StatusCode::BAD_REQUEST, e), + }; if let Err(e) = validate_file_signing( &state, @@ -283,10 +277,7 @@ pub async fn post_files( Err(e) => return json_error(StatusCode::BAD_REQUEST, &e), } } else { - let fname = field - .file_name() - .unwrap_or("upload") - .to_string(); + let fname = field.file_name().unwrap_or("upload").to_string(); match expand_and_resolve(&fname, &home_dir, default_workdir.as_deref()) { Ok(p) => p, Err(e) => return json_error(StatusCode::BAD_REQUEST, &e), @@ -382,7 +373,7 @@ fn process_file( return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("error getting file info: {e}"), - )) + )); } }; @@ -395,7 +386,7 @@ fn process_file( return Err(( StatusCode::INTERNAL_SERVER_ERROR, format!("error changing ownership: {e}"), - )) + )); } } } diff --git a/envd-rs/src/http/init.rs b/envd-rs/src/http/init.rs index 17dd3b01..79aafdbb 100644 --- a/envd-rs/src/http/init.rs +++ b/envd-rs/src/http/init.rs @@ -26,6 +26,9 @@ pub struct InitRequest { pub volume_mounts: Option>, pub sandbox_id: Option, pub template_id: Option, + /// Public proxy domain (e.g. "wrenn.dev"). Used by `envd ports` to build + /// the {port}-{sandbox_id}.{domain} URLs. + pub proxy_domain: Option, /// New lifecycle identifier for this resume. When it changes between /// /init calls, envd treats the call as a post-resume hook: port /// forwarder is restarted and NFS mounts are refreshed. @@ -183,14 +186,32 @@ pub async fn post_init( // SAFETY: envd is single-threaded at init time; no concurrent env reads. unsafe { std::env::set_var("WRENN_SANDBOX_ID", id) }; write_run_file(".WRENN_SANDBOX_ID", id); - state.defaults.env_vars.insert("WRENN_SANDBOX_ID".into(), id.clone()); + state + .defaults + .env_vars + .insert("WRENN_SANDBOX_ID".into(), id.clone()); } if let Some(ref id) = init_req.template_id { tracing::debug!(template_id = %id, "setting template ID from init request"); // SAFETY: envd is single-threaded at init time; no concurrent env reads. unsafe { std::env::set_var("WRENN_TEMPLATE_ID", id) }; write_run_file(".WRENN_TEMPLATE_ID", id); - state.defaults.env_vars.insert("WRENN_TEMPLATE_ID".into(), id.clone()); + state + .defaults + .env_vars + .insert("WRENN_TEMPLATE_ID".into(), id.clone()); + } + if let Some(ref domain) = init_req.proxy_domain { + if !domain.is_empty() { + tracing::debug!(proxy_domain = %domain, "setting proxy domain from init request"); + // SAFETY: envd is single-threaded at init time; no concurrent env reads. + unsafe { std::env::set_var("WRENN_PROXY_DOMAIN", domain) }; + write_run_file(".WRENN_PROXY_DOMAIN", domain); + state + .defaults + .env_vars + .insert("WRENN_PROXY_DOMAIN".into(), domain.clone()); + } } ( @@ -202,7 +223,10 @@ pub async fn post_init( async fn validate_init_access_token(state: &AppState, request_token: &str) -> Result<(), String> { // Fast path: matches existing token - if state.access_token.is_set() && !request_token.is_empty() && state.access_token.equals(request_token) { + if state.access_token.is_set() + && !request_token.is_empty() + && state.access_token.equals(request_token) + { return Ok(()); } @@ -241,10 +265,7 @@ async fn setup_hyperloop(address: &str, env_vars: &dashmap::DashMap Result { } Err(()) } - diff --git a/envd-rs/src/http/mod.rs b/envd-rs/src/http/mod.rs index 841d6f50..03bd0e18 100644 --- a/envd-rs/src/http/mod.rs +++ b/envd-rs/src/http/mod.rs @@ -1,3 +1,4 @@ +pub mod activity; pub mod encoding; pub mod envs; pub mod error; @@ -13,8 +14,8 @@ use std::time::Duration; use axum::Router; use axum::routing::{get, post}; -use http::header::{CACHE_CONTROL, HeaderName}; use http::Method; +use http::header::{CACHE_CONTROL, HeaderName}; use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer}; use crate::config::CORS_MAX_AGE; @@ -47,6 +48,7 @@ pub fn router(state: Arc) -> Router { Router::new() .route("/health", get(health::get_health)) + .route("/activity", get(activity::get_activity)) .route("/metrics", get(metrics::get_metrics)) .route("/envs", get(envs::get_envs)) .route("/init", post(init::post_init)) diff --git a/envd-rs/src/main.rs b/envd-rs/src/main.rs index 3b147fb7..0014670d 100644 --- a/envd-rs/src/main.rs +++ b/envd-rs/src/main.rs @@ -2,6 +2,7 @@ mod auth; mod cgroups; +mod cmd; mod config; mod conntracker; mod crypto; @@ -39,6 +40,10 @@ const COMMIT: &str = { #[derive(Parser)] #[command(name = "envd", about = "Wrenn guest agent daemon")] struct Cli { + /// Client subcommand. When omitted, envd runs as the guest daemon. + #[command(subcommand)] + command: Option, + #[arg(long, default_value_t = DEFAULT_PORT)] port: u16, @@ -55,6 +60,12 @@ struct Cli { cgroup_root: String, } +#[derive(clap::Subcommand)] +enum Commands { + /// List externally-reachable open ports and the URL each is served at. + Ports(cmd::ports::PortsArgs), +} + #[tokio::main] async fn main() { let cli = Cli::parse(); @@ -68,6 +79,11 @@ async fn main() { return; } + // Client subcommands are short-lived: run and exit before any daemon setup. + if let Some(Commands::Ports(args)) = &cli.command { + std::process::exit(cmd::ports::run(args)); + } + logging::init(true); if let Err(e) = fs::create_dir_all(WRENN_RUN_DIR) { @@ -85,36 +101,35 @@ async fn main() { } // Cgroup manager - let cgroup_manager: Arc = - match cgroups::Cgroup2Manager::new( - &cli.cgroup_root, - &[ - ( - cgroups::ProcessType::Pty, - "wrenn/pty", - &[] as &[(&str, &str)], - ), - ( - cgroups::ProcessType::User, - "wrenn/user", - &[] as &[(&str, &str)], - ), - ( - cgroups::ProcessType::Socat, - "wrenn/socat", - &[] as &[(&str, &str)], - ), - ], - ) { - Ok(m) => { - tracing::info!("cgroup2 manager initialized"); - Arc::new(m) - } - Err(e) => { - tracing::warn!(error = %e, "cgroup2 init failed, using noop"); - Arc::new(cgroups::NoopCgroupManager) - } - }; + let cgroup_manager: Arc = match cgroups::Cgroup2Manager::new( + &cli.cgroup_root, + &[ + ( + cgroups::ProcessType::Pty, + "wrenn/pty", + &[] as &[(&str, &str)], + ), + ( + cgroups::ProcessType::User, + "wrenn/user", + &[] as &[(&str, &str)], + ), + ( + cgroups::ProcessType::Socat, + "wrenn/socat", + &[] as &[(&str, &str)], + ), + ], + ) { + Ok(m) => { + tracing::info!("cgroup2 manager initialized"); + Arc::new(m) + } + Err(e) => { + tracing::warn!(error = %e, "cgroup2 init failed, using noop"); + Arc::new(cgroups::NoopCgroupManager) + } + }; // Port subsystem let port_subsystem = Arc::new(PortSubsystem::new(Arc::clone(&cgroup_manager))); @@ -138,8 +153,7 @@ async fn main() { // RPC services (Connect protocol — serves Connect + gRPC + gRPC-Web on same port) let connect_router = rpc::rpc_router(Arc::clone(&state)); - let app = http::router(Arc::clone(&state)) - .fallback_service(connect_router.into_axum_service()); + let app = http::router(Arc::clone(&state)).fallback_service(connect_router.into_axum_service()); // --cmd: spawn initial process if specified if !cli.start_cmd.is_empty() { @@ -151,7 +165,12 @@ async fn main() { } let addr = SocketAddr::from(([0, 0, 0, 0], cli.port)); - tracing::info!(port = cli.port, version = VERSION, commit = COMMIT, "envd starting"); + tracing::info!( + port = cli.port, + version = VERSION, + commit = COMMIT, + "envd starting" + ); let listener = TcpListener::bind(addr).await.expect("failed to bind"); @@ -186,9 +205,7 @@ fn spawn_initial_command(cmd: &str, state: &AppState) { let home = user.dir.to_string_lossy().to_string(); let default_workdir = state.defaults.workdir(); - let cwd = default_workdir - .as_deref() - .unwrap_or(&home); + let cwd = default_workdir.as_deref().unwrap_or(&home); match process_handler::spawn_process( cmd, @@ -235,8 +252,7 @@ fn memory_reclaimer(_state: Arc) { } else { let mut sys2 = sysinfo::System::new(); sys2.refresh_memory(); - let freed_mb = - sys2.available_memory().saturating_sub(available) / (1024 * 1024); + let freed_mb = sys2.available_memory().saturating_sub(available) / (1024 * 1024); tracing::info!(used_pct, freed_mb, "page cache dropped"); } } diff --git a/envd-rs/src/permissions/mod.rs b/envd-rs/src/permissions/mod.rs index 48ccce8a..8dfaf7fc 100644 --- a/envd-rs/src/permissions/mod.rs +++ b/envd-rs/src/permissions/mod.rs @@ -1,2 +1,2 @@ -pub mod user; pub mod path; +pub mod user; diff --git a/envd-rs/src/permissions/path.rs b/envd-rs/src/permissions/path.rs index 5bd25f11..68791faa 100644 --- a/envd-rs/src/permissions/path.rs +++ b/envd-rs/src/permissions/path.rs @@ -94,7 +94,10 @@ mod tests { #[test] fn tilde_slash_path() { - assert_eq!(expand_tilde("~/docs", "/home/user").unwrap(), "/home/user/docs"); + assert_eq!( + expand_tilde("~/docs", "/home/user").unwrap(), + "/home/user/docs" + ); } #[test] @@ -109,12 +112,18 @@ mod tests { #[test] fn tilde_relative_no_tilde() { - assert_eq!(expand_tilde("relative/path", "/home/u").unwrap(), "relative/path"); + assert_eq!( + expand_tilde("relative/path", "/home/u").unwrap(), + "relative/path" + ); } #[test] fn tilde_cmd_like() { - assert_eq!(expand_tilde("~/bin/myapp", "/home/user").unwrap(), "/home/user/bin/myapp"); + assert_eq!( + expand_tilde("~/bin/myapp", "/home/user").unwrap(), + "/home/user/bin/myapp" + ); } #[test] @@ -144,12 +153,18 @@ mod tests { #[test] fn resolve_absolute_passthrough() { - assert_eq!(expand_and_resolve("/abs/path", "/home", None).unwrap(), "/abs/path"); + assert_eq!( + expand_and_resolve("/abs/path", "/home", None).unwrap(), + "/abs/path" + ); } #[test] fn resolve_empty_uses_default() { - assert_eq!(expand_and_resolve("", "/home", Some("/default")).unwrap(), "/default"); + assert_eq!( + expand_and_resolve("", "/home", Some("/default")).unwrap(), + "/default" + ); } #[test] @@ -161,7 +176,10 @@ mod tests { #[test] fn resolve_tilde_expands() { - assert_eq!(expand_and_resolve("~/dir", "/home/u", None).unwrap(), "/home/u/dir"); + assert_eq!( + expand_and_resolve("~/dir", "/home/u", None).unwrap(), + "/home/u/dir" + ); } #[test] diff --git a/envd-rs/src/port/conn.rs b/envd-rs/src/port/conn.rs index 8534bc26..04f4a85a 100644 --- a/envd-rs/src/port/conn.rs +++ b/envd-rs/src/port/conn.rs @@ -37,6 +37,36 @@ pub fn read_tcp_connections() -> Vec { conns } +/// Returns the TCP ports in LISTEN state that are reachable from outside the +/// guest through the host proxy. A port qualifies when it is bound to a +/// wildcard address (`0.0.0.0`/`::`, directly reachable on the TAP interface) +/// or to loopback (`127.0.0.1`/`::1`, bridged to the TAP IP by the socat +/// forwarder). Ports bound to any other specific address are not routable from +/// the host and are excluded, as is `exclude_port` (envd's own control port). +/// The result is deduplicated and sorted ascending. +pub fn reachable_listening_ports(exclude_port: u32) -> Vec { + filter_reachable_ports(&read_tcp_connections(), exclude_port) +} + +fn filter_reachable_ports(conns: &[ConnStat], exclude_port: u32) -> Vec { + let mut ports: Vec = conns + .iter() + .filter(|c| c.status == "LISTEN") + .filter(|c| is_reachable_bind(&c.local_ip)) + .map(|c| c.local_port) + .filter(|p| *p != exclude_port) + .collect(); + ports.sort_unstable(); + ports.dedup(); + ports +} + +/// A bind address is reachable from the host when it is a wildcard (directly +/// routed via the TAP interface) or loopback (socat-forwarded to the TAP IP). +fn is_reachable_bind(ip: &str) -> bool { + matches!(ip, "0.0.0.0" | "::" | "127.0.0.1" | "::1") +} + fn parse_proc_net_tcp(path: &str, family: u32) -> io::Result> { let file = std::fs::File::open(path)?; let reader = io::BufReader::new(file); @@ -92,7 +122,10 @@ fn parse_hex_addr(s: &str, family: u32) -> Option<(String, u32)> { if ip_bytes.len() != 4 { return None; } - format!("{}.{}.{}.{}", ip_bytes[3], ip_bytes[2], ip_bytes[1], ip_bytes[0]) + format!( + "{}.{}.{}.{}", + ip_bytes[3], ip_bytes[2], ip_bytes[1], ip_bytes[0] + ) } else { if ip_bytes.len() != 16 { return None; @@ -257,4 +290,76 @@ mod tests { fn parse_nonexistent_file_errors() { assert!(parse_proc_net_tcp("/nonexistent/path", libc::AF_INET as u32).is_err()); } + + // reachable port filtering + + fn conn(ip: &str, port: u32, status: &str) -> ConnStat { + ConnStat { + local_ip: ip.to_string(), + local_port: port, + status: status.to_string(), + family: libc::AF_INET as u32, + inode: 0, + } + } + + #[test] + fn reachable_bind_accepts_wildcard_and_loopback() { + assert!(is_reachable_bind("0.0.0.0")); + assert!(is_reachable_bind("::")); + assert!(is_reachable_bind("127.0.0.1")); + assert!(is_reachable_bind("::1")); + } + + #[test] + fn reachable_bind_rejects_specific_address() { + assert!(!is_reachable_bind("192.168.1.5")); + assert!(!is_reachable_bind("169.254.0.21")); + assert!(!is_reachable_bind("10.0.0.1")); + } + + #[test] + fn filter_keeps_only_listen_state() { + let conns = vec![ + conn("0.0.0.0", 8000, "LISTEN"), + conn("0.0.0.0", 9000, "ESTABLISHED"), + ]; + assert_eq!(filter_reachable_ports(&conns, 49983), vec![8000]); + } + + #[test] + fn filter_excludes_unreachable_binds() { + let conns = vec![ + conn("127.0.0.1", 8000, "LISTEN"), + conn("169.254.0.21", 8001, "LISTEN"), // socat's own listener + conn("192.168.1.5", 8002, "LISTEN"), + ]; + assert_eq!(filter_reachable_ports(&conns, 49983), vec![8000]); + } + + #[test] + fn filter_excludes_envd_control_port() { + let conns = vec![ + conn("0.0.0.0", 49983, "LISTEN"), + conn("0.0.0.0", 8000, "LISTEN"), + ]; + assert_eq!(filter_reachable_ports(&conns, 49983), vec![8000]); + } + + #[test] + fn filter_dedups_and_sorts() { + // Same port on IPv4 wildcard and IPv6 loopback collapses to one entry. + let conns = vec![ + conn("::1", 8000, "LISTEN"), + conn("0.0.0.0", 8000, "LISTEN"), + conn("0.0.0.0", 3000, "LISTEN"), + ]; + assert_eq!(filter_reachable_ports(&conns, 49983), vec![3000, 8000]); + } + + #[test] + fn filter_empty_when_no_listeners() { + let conns = vec![conn("0.0.0.0", 8000, "ESTABLISHED")]; + assert!(filter_reachable_ports(&conns, 49983).is_empty()); + } } diff --git a/envd-rs/src/rpc/entry.rs b/envd-rs/src/rpc/entry.rs index e5c8bf16..41a02b47 100644 --- a/envd-rs/src/rpc/entry.rs +++ b/envd-rs/src/rpc/entry.rs @@ -53,9 +53,7 @@ pub fn build_entry_info(path: &str) -> Result { Err(_) => FileType::FILE_TYPE_UNSPECIFIED, }; - let target_mode = std::fs::metadata(p) - .map(|m| m.mode() & 0o7777) - .unwrap_or(0); + let target_mode = std::fs::metadata(p).map(|m| m.mode() & 0o7777).unwrap_or(0); (target_type, target_mode, Some(target)) } else { diff --git a/envd-rs/src/rpc/filesystem_service.rs b/envd-rs/src/rpc/filesystem_service.rs index 58ee971a..8c17d319 100644 --- a/envd-rs/src/rpc/filesystem_service.rs +++ b/envd-rs/src/rpc/filesystem_service.rs @@ -98,8 +98,7 @@ impl Filesystem for FilesystemServiceImpl { } let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user()); - let user = - lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; + let user = lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; ensure_dirs(&path, user.uid, user.gid) .map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; @@ -123,8 +122,7 @@ impl Filesystem for FilesystemServiceImpl { let destination = self.resolve_path(request.destination, &ctx)?; let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user()); - let user = - lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; + let user = lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; if let Some(parent) = Path::new(&destination).parent() { ensure_dirs(&parent.to_string_lossy(), user.uid, user.gid) @@ -206,7 +204,12 @@ impl Filesystem for FilesystemServiceImpl { } } - Ok((RemoveResponse { ..Default::default() }, ctx)) + Ok(( + RemoveResponse { + ..Default::default() + }, + ctx, + )) } async fn watch_dir( @@ -247,8 +250,8 @@ impl Filesystem for FilesystemServiceImpl { let events: Arc>> = Arc::new(Mutex::new(Vec::new())); let events_cb = Arc::clone(&events); - let mut watcher = notify::recommended_watcher( - move |res: Result| { + let mut watcher = + notify::recommended_watcher(move |res: Result| { if let Ok(event) = res { let event_type = match event.kind { notify::EventKind::Create(_) => EventType::EVENT_TYPE_CREATE, @@ -275,11 +278,13 @@ impl Filesystem for FilesystemServiceImpl { } } } - }, - ) - .map_err(|e| { - ConnectError::new(ErrorCode::Internal, format!("failed to create watcher: {e}")) - })?; + }) + .map_err(|e| { + ConnectError::new( + ErrorCode::Internal, + format!("failed to create watcher: {e}"), + ) + })?; let mode = if recursive { RecursiveMode::Recursive @@ -342,7 +347,12 @@ impl Filesystem for FilesystemServiceImpl { ) -> Result<(RemoveWatcherResponse, Context), ConnectError> { let watcher_id: &str = request.watcher_id; self.watchers.remove(watcher_id); - Ok((RemoveWatcherResponse { ..Default::default() }, ctx)) + Ok(( + RemoveWatcherResponse { + ..Default::default() + }, + ctx, + )) } } diff --git a/envd-rs/src/rpc/mod.rs b/envd-rs/src/rpc/mod.rs index 87816c60..5e0927c0 100644 --- a/envd-rs/src/rpc/mod.rs +++ b/envd-rs/src/rpc/mod.rs @@ -1,17 +1,17 @@ -pub mod pb; pub mod entry; +pub mod filesystem_service; +pub mod pb; pub mod process_handler; pub mod process_service; -pub mod filesystem_service; use std::sync::Arc; -use crate::rpc::process_service::ProcessServiceImpl; use crate::rpc::filesystem_service::FilesystemServiceImpl; +use crate::rpc::process_service::ProcessServiceImpl; use crate::state::AppState; -use pb::process::ProcessExt; use pb::filesystem::FilesystemExt; +use pb::process::ProcessExt; /// Build the connect-rust Router with both RPC services registered. pub fn rpc_router(state: Arc) -> connectrpc::Router { diff --git a/envd-rs/src/rpc/pb.rs b/envd-rs/src/rpc/pb.rs index 87fe79cf..c90a395f 100644 --- a/envd-rs/src/rpc/pb.rs +++ b/envd-rs/src/rpc/pb.rs @@ -1,4 +1,9 @@ -#![allow(dead_code, non_camel_case_types, unused_imports, clippy::derivable_impls)] +#![allow( + dead_code, + non_camel_case_types, + unused_imports, + clippy::derivable_impls +)] use ::buffa; use ::buffa_types; diff --git a/envd-rs/src/rpc/process_handler.rs b/envd-rs/src/rpc/process_handler.rs index 68084ed2..32b8ac8e 100644 --- a/envd-rs/src/rpc/process_handler.rs +++ b/envd-rs/src/rpc/process_handler.rs @@ -1,10 +1,11 @@ +use std::collections::VecDeque; use std::io::Read; use std::os::unix::process::CommandExt; use std::process::Stdio; use std::sync::{Arc, Mutex}; use connectrpc::{ConnectError, ErrorCode}; -use nix::pty::{openpty, Winsize}; +use nix::pty::{Winsize, openpty}; use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; use tokio::sync::broadcast; @@ -15,6 +16,11 @@ const STD_CHUNK_SIZE: usize = 32768; const PTY_CHUNK_SIZE: usize = 16384; const BROADCAST_CAPACITY: usize = 4096; +// Upper bound on the per-process output kept for replay. A late Connect gets +// the most recent OUTPUT_LOG_CAPACITY bytes (older output is evicted) so the +// buffer can never grow without bound for a chatty long-running process. +const OUTPUT_LOG_CAPACITY: usize = 256 * 1024; + #[derive(Clone)] pub enum DataEvent { Stdout(Vec), @@ -30,6 +36,37 @@ pub struct EndEvent { pub error: Option, } +/// Bounded ring of recent output, kept so a late Connect can replay what it +/// missed. Evicts oldest events once the retained bytes exceed the cap. +#[derive(Default)] +struct OutputLog { + events: VecDeque, + bytes: usize, +} + +impl OutputLog { + fn push(&mut self, ev: &DataEvent) { + self.bytes += ev_len(ev); + self.events.push_back(ev.clone()); + while self.bytes > OUTPUT_LOG_CAPACITY { + match self.events.pop_front() { + Some(old) => self.bytes -= ev_len(&old), + None => break, + } + } + } + + fn snapshot(&self) -> Vec { + self.events.iter().cloned().collect() + } +} + +fn ev_len(ev: &DataEvent) -> usize { + match ev { + DataEvent::Stdout(d) | DataEvent::Stderr(d) | DataEvent::Pty(d) => d.len(), + } +} + pub struct ProcessHandle { pub config: ProcessConfig, pub tag: Option, @@ -38,6 +75,7 @@ pub struct ProcessHandle { data_tx: broadcast::Sender, end_tx: broadcast::Sender, ended: Mutex>, + output_log: Mutex, stdin: Mutex>, pty_master: Mutex>, @@ -48,6 +86,26 @@ impl ProcessHandle { self.data_tx.subscribe() } + /// Append a chunk to the replay buffer and broadcast it live, under one + /// lock. The shared lock is what makes [`subscribe_data_replay`] race-free: + /// a concurrent attach sees this chunk either in its snapshot or on its live + /// receiver — never both, never neither. + pub fn publish_data(&self, ev: DataEvent) { + let mut log = self.output_log.lock().unwrap(); + log.push(&ev); + let _ = self.data_tx.send(ev); + } + + /// Snapshot the buffered output and subscribe to live output atomically, so + /// a late Connect replays what it missed and then continues live with no gap + /// or duplicate across the handoff. + pub fn subscribe_data_replay(&self) -> (Vec, broadcast::Receiver) { + let log = self.output_log.lock().unwrap(); + let snapshot = log.snapshot(); + let rx = self.data_tx.subscribe(); + (snapshot, rx) + } + pub fn subscribe_end(&self) -> broadcast::Receiver { self.end_tx.subscribe() } @@ -160,6 +218,9 @@ pub fn spawn_process( env.push(("HOME".into(), home)); env.push(("USER".into(), user.name.clone())); env.push(("LOGNAME".into(), user.name.clone())); + if !user.shell.as_os_str().is_empty() { + env.push(("SHELL".into(), user.shell.to_string_lossy().to_string())); + } default_env_vars.iter().for_each(|entry| { env.push((entry.key().clone(), entry.value().clone())); @@ -179,21 +240,40 @@ pub fn spawn_process( let nice_delta = 0 - current_nice(); let profile_source = r#"test -f /etc/profile && . /etc/profile test -f "${HOME}/.bashrc" && . "${HOME}/.bashrc""#; - let oom_script = if nice_delta > 0 { - format!( - r#"echo 100 > /proc/$$/oom_score_adj -{} -exec /usr/bin/nice -n {} "${{@}}""#, - profile_source, nice_delta, - ) - } else { - format!( - r#"echo 100 > /proc/$$/oom_score_adj -{} -exec "$@""#, - profile_source - ) + + // Resolve the user's login shell, falling back to /bin/sh. Commands without + // explicit args are interpreted by this shell so pipes, quoting, escape + // sequences, backslash line-continuations, and other shell syntax work + // without the caller having to wrap them in `sh -c` themselves. + let shell = { + let s = user.shell.to_string_lossy(); + if s.is_empty() { + "/bin/sh".to_string() + } else { + s.to_string() + } }; + + // What the wrapper finally exec's, after the optional `nice` prefix. + // - no args: run cmd_str as a shell command line via the login shell + // ($1 is cmd_str; $0 of the inner shell is the shell path). + // - with args: exec the program + args directly, no shell interpretation + // (backward-compatible program/argv form). + let target = if args.is_empty() { + format!(r#""{shell}" -c "$1" "{shell}""#) + } else { + r#""$@""#.to_string() + }; + let nice_prefix = if nice_delta > 0 { + format!("/usr/bin/nice -n {nice_delta} ") + } else { + String::new() + }; + let oom_script = format!( + r#"echo 100 > /proc/$$/oom_score_adj +{profile_source} +exec {nice_prefix}{target}"# + ); let mut wrapper_args = vec![ "-c".to_string(), oom_script, @@ -264,7 +344,10 @@ exec "$@""#, command.stderr(Stdio::null()); let child = command.spawn().map_err(|e| { - ConnectError::new(ErrorCode::Internal, format!("error starting pty process: {e}")) + ConnectError::new( + ErrorCode::Internal, + format!("error starting pty process: {e}"), + ) })?; drop(slave_fd); @@ -280,6 +363,7 @@ exec "$@""#, data_tx: data_tx.clone(), end_tx: end_tx.clone(), ended: Mutex::new(None), + output_log: Mutex::new(OutputLog::default()), stdin: Mutex::new(None), pty_master: Mutex::new(Some(master_file)), }); @@ -287,7 +371,7 @@ exec "$@""#, let data_rx = handle.subscribe_data(); let end_rx = handle.subscribe_end(); - let data_tx_clone = data_tx.clone(); + let handle_for_reader = Arc::clone(&handle); let pty_reader = std::thread::spawn(move || { let mut master = master_clone; let mut buf = vec![0u8; PTY_CHUNK_SIZE]; @@ -295,7 +379,7 @@ exec "$@""#, match master.read(&mut buf) { Ok(0) => break, Ok(n) => { - let _ = data_tx_clone.send(DataEvent::Pty(buf[..n].to_vec())); + handle_for_reader.publish_data(DataEvent::Pty(buf[..n].to_vec())); } Err(_) => break, } @@ -329,7 +413,11 @@ exec "$@""#, }); tracing::info!(pid, cmd = cmd_str, "process started (pty)"); - Ok(SpawnedProcess { handle, data_rx, end_rx }) + Ok(SpawnedProcess { + handle, + data_rx, + end_rx, + }) } else { let mut command = std::process::Command::new("/bin/bash"); command @@ -375,6 +463,7 @@ exec "$@""#, data_tx: data_tx.clone(), end_tx: end_tx.clone(), ended: Mutex::new(None), + output_log: Mutex::new(OutputLog::default()), stdin: Mutex::new(stdin), pty_master: Mutex::new(None), }); @@ -385,14 +474,14 @@ exec "$@""#, let mut output_readers: Vec> = Vec::new(); if let Some(mut out) = stdout { - let tx = data_tx.clone(); + let handle_for_reader = Arc::clone(&handle); output_readers.push(std::thread::spawn(move || { let mut buf = vec![0u8; STD_CHUNK_SIZE]; loop { match out.read(&mut buf) { Ok(0) => break, Ok(n) => { - let _ = tx.send(DataEvent::Stdout(buf[..n].to_vec())); + handle_for_reader.publish_data(DataEvent::Stdout(buf[..n].to_vec())); } Err(_) => break, } @@ -401,14 +490,14 @@ exec "$@""#, } if let Some(mut err_pipe) = stderr { - let tx = data_tx.clone(); + let handle_for_reader = Arc::clone(&handle); output_readers.push(std::thread::spawn(move || { let mut buf = vec![0u8; STD_CHUNK_SIZE]; loop { match err_pipe.read(&mut buf) { Ok(0) => break, Ok(n) => { - let _ = tx.send(DataEvent::Stderr(buf[..n].to_vec())); + handle_for_reader.publish_data(DataEvent::Stderr(buf[..n].to_vec())); } Err(_) => break, } @@ -444,7 +533,11 @@ exec "$@""#, }); tracing::info!(pid, cmd = cmd_str, "process started (pipe)"); - Ok(SpawnedProcess { handle, data_rx, end_rx }) + Ok(SpawnedProcess { + handle, + data_rx, + end_rx, + }) } } diff --git a/envd-rs/src/rpc/process_service.rs b/envd-rs/src/rpc/process_service.rs index e33be290..c7e90905 100644 --- a/envd-rs/src/rpc/process_service.rs +++ b/envd-rs/src/rpc/process_service.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use connectrpc::{ConnectError, Context, ErrorCode}; use dashmap::DashMap; -use futures::Stream; +use futures::{Stream, StreamExt}; +use tokio::sync::broadcast; use crate::permissions::path::{expand_and_resolve, expand_tilde}; use crate::permissions::user::lookup_user; @@ -72,8 +73,7 @@ impl ProcessServiceImpl { })?; let username = self.state.defaults.user(); - let user = - lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; + let user = lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?; let cmd_raw: &str = proc_config.cmd; let args_raw: Vec = proc_config.args.iter().map(|s| s.to_string()).collect(); @@ -87,7 +87,8 @@ impl ProcessServiceImpl { let cmd = expand_tilde(cmd_raw, &home_dir) .map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?; - let args: Vec = args_raw.into_iter() + let args: Vec = args_raw + .into_iter() .map(|a| expand_tilde(&a, &home_dir).unwrap_or(a)) .collect(); @@ -136,7 +137,8 @@ impl ProcessServiceImpl { &self.state.defaults.env_vars, )?; - self.processes.insert(spawned.handle.pid, Arc::clone(&spawned.handle)); + self.processes + .insert(spawned.handle.pid, Arc::clone(&spawned.handle)); let processes = Arc::clone(&self.processes); let pid = spawned.handle.pid; @@ -203,50 +205,10 @@ impl Process for ProcessServiceImpl { let spawned = self.spawn_from_request(&request)?; let pid = spawned.handle.pid; - let mut data_rx = spawned.data_rx; - let mut end_rx = spawned.end_rx; - - let stream = async_stream::stream! { - yield Ok(make_start_response(pid)); - - loop { - tokio::select! { - biased; - data = data_rx.recv() => { - match data { - Ok(ev) => yield Ok(make_data_start_response(ev)), - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - // Data channel closed: the process ended and its - // handle was dropped. The end event is published - // before the handle drop, so it is still buffered - // — emit it rather than losing the exit code. - if let Ok(end) = end_rx.try_recv() { - yield Ok(make_end_start_response(end)); - } - break; - } - } - } - end = end_rx.recv() => { - // Process ended. The waiter joins the output readers - // before sending this event, so every byte is already - // in the data channel — drain it fully before the end. - loop { - match data_rx.try_recv() { - Ok(ev) => yield Ok(make_data_start_response(ev)), - Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => continue, - Err(_) => break, - } - } - if let Ok(end) = end { - yield Ok(make_end_start_response(end)); - } - break; - } - } - } - }; + // Start subscribes before any output is produced, so there is nothing to + // replay and the process cannot have ended yet. + let stream = process_event_stream(pid, Vec::new(), spawned.data_rx, spawned.end_rx, None) + .map(|r| r.map(wrap_start_response)); Ok((Box::pin(stream), ctx)) } @@ -268,81 +230,17 @@ impl Process for ProcessServiceImpl { let handle = self.get_process_by_selector(selector)?; let pid = handle.pid; - let mut data_rx = handle.subscribe_data(); - let mut end_rx = handle.subscribe_end(); + // Snapshot buffered output + subscribe live atomically, then read the + // exit state. Ordering matters: end_rx must be subscribed before + // cached_end is read so a process that exits in the window is still + // observed (via the channel if subscribed in time, via cached_end + // otherwise). + let (replay, data_rx) = handle.subscribe_data_replay(); + let end_rx = handle.subscribe_end(); let cached_end = handle.cached_end(); - let stream = async_stream::stream! { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(ProcessEvent { - event: Some(process_event::Event::Start(Box::new( - process_event::StartEvent { pid, ..Default::default() }, - ))), - ..Default::default() - }), - ..Default::default() - }); - - if let Some(end) = cached_end { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } else { - loop { - tokio::select! { - biased; - data = data_rx.recv() => { - match data { - Ok(ev) => { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - }); - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - // Data channel closed: the process ended and - // its handle was dropped. The end event is - // published before the handle drop, so it is - // still buffered — emit it rather than losing - // the exit code. - if let Ok(end) = end_rx.try_recv() { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } - break; - } - } - } - end = end_rx.recv() => { - // Process ended. The waiter joins the output readers - // before sending this event, so every byte is already - // in the data channel — drain it fully before the end. - loop { - match data_rx.try_recv() { - Ok(ev) => yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - }), - Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => continue, - Err(_) => break, - } - } - if let Ok(end) = end { - yield Ok(ConnectResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - }); - } - break; - } - } - } - } - }; + let stream = process_event_stream(pid, replay, data_rx, end_rx, cached_end) + .map(|r| r.map(wrap_connect_response)); Ok((Box::pin(stream), ctx)) } @@ -363,7 +261,12 @@ impl Process for ProcessServiceImpl { } } - Ok((UpdateResponse { ..Default::default() }, ctx)) + Ok(( + UpdateResponse { + ..Default::default() + }, + ctx, + )) } async fn stream_input( @@ -372,11 +275,11 @@ impl Process for ProcessServiceImpl { mut requests: Pin< Box< dyn Stream< - Item = Result< - buffa::view::OwnedView>, - ConnectError, - >, - > + Send, + Item = Result< + buffa::view::OwnedView>, + ConnectError, + >, + > + Send, >, >, ) -> Result<(StreamInputResponse, Context), ConnectError> { @@ -405,7 +308,12 @@ impl Process for ProcessServiceImpl { } } - Ok((StreamInputResponse { ..Default::default() }, ctx)) + Ok(( + StreamInputResponse { + ..Default::default() + }, + ctx, + )) } async fn send_input( @@ -422,7 +330,12 @@ impl Process for ProcessServiceImpl { write_input(&handle, input)?; } - Ok((SendInputResponse { ..Default::default() }, ctx)) + Ok(( + SendInputResponse { + ..Default::default() + }, + ctx, + )) } async fn send_signal( @@ -442,12 +355,17 @@ impl Process for ProcessServiceImpl { return Err(ConnectError::new( ErrorCode::InvalidArgument, "invalid or unspecified signal", - )) + )); } }; handle.send_signal(sig)?; - Ok((SendSignalResponse { ..Default::default() }, ctx)) + Ok(( + SendSignalResponse { + ..Default::default() + }, + ctx, + )) } async fn close_stdin( @@ -460,7 +378,12 @@ impl Process for ProcessServiceImpl { })?; let handle = self.get_process_by_selector(selector)?; handle.close_stdin()?; - Ok((CloseStdinResponse { ..Default::default() }, ctx)) + Ok(( + CloseStdinResponse { + ..Default::default() + }, + ctx, + )) } } @@ -472,17 +395,106 @@ fn write_input(handle: &ProcessHandle, input: &ProcessInputView) -> Result<(), C } } -fn make_start_response(pid: u32) -> StartResponse { +/// Shared event pump for `Start` and `Connect`. Yields a leading start event, +/// replays any buffered output (empty for `Start`), then forwards live output +/// and the final exit event. The caller wraps each `ProcessEvent` into its own +/// response envelope, so the streaming logic lives in exactly one place. +fn process_event_stream( + pid: u32, + replay: Vec, + mut data_rx: broadcast::Receiver, + mut end_rx: broadcast::Receiver, + cached_end: Option, +) -> impl Stream> { + use broadcast::error::{RecvError, TryRecvError}; + + async_stream::stream! { + yield Ok(make_start_event(pid)); + + for ev in replay { + yield Ok(make_data_event(ev)); + } + + // Process already exited before we attached. The snapshot above covers + // output up to the attach point; drain anything the live receiver + // buffered after the snapshot, then emit the cached exit. end_rx may + // never deliver here — a broadcast receiver only sees events sent after + // it subscribed, and the exit can predate that — so cached_end is the + // source of truth. + if let Some(end) = cached_end { + loop { + match data_rx.try_recv() { + Ok(ev) => yield Ok(make_data_event(ev)), + Err(TryRecvError::Lagged(_)) => continue, + Err(_) => break, + } + } + yield Ok(make_end_event(end)); + return; + } + + loop { + tokio::select! { + biased; + data = data_rx.recv() => { + match data { + Ok(ev) => yield Ok(make_data_event(ev)), + Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => { + // Data channel closed: the process ended and its + // handle was dropped. The end event is published + // before the handle drop, so it is still buffered — + // emit it rather than losing the exit code. + if let Ok(end) = end_rx.try_recv() { + yield Ok(make_end_event(end)); + } + break; + } + } + } + end = end_rx.recv() => { + // Process ended. The waiter joins the output readers before + // sending this event, so every byte is already in the data + // channel — drain it fully before the end. + loop { + match data_rx.try_recv() { + Ok(ev) => yield Ok(make_data_event(ev)), + Err(TryRecvError::Lagged(_)) => continue, + Err(_) => break, + } + } + if let Ok(end) = end { + yield Ok(make_end_event(end)); + } + break; + } + } + } + } +} + +fn wrap_start_response(event: ProcessEvent) -> StartResponse { StartResponse { - event: buffa::MessageField::some(ProcessEvent { - event: Some(process_event::Event::Start(Box::new( - process_event::StartEvent { - pid, - ..Default::default() - }, - ))), - ..Default::default() - }), + event: buffa::MessageField::some(event), + ..Default::default() + } +} + +fn wrap_connect_response(event: ProcessEvent) -> ConnectResponse { + ConnectResponse { + event: buffa::MessageField::some(event), + ..Default::default() + } +} + +fn make_start_event(pid: u32) -> ProcessEvent { + ProcessEvent { + event: Some(process_event::Event::Start(Box::new( + process_event::StartEvent { + pid, + ..Default::default() + }, + ))), ..Default::default() } } @@ -504,13 +516,6 @@ fn make_data_event(ev: DataEvent) -> ProcessEvent { } } -fn make_data_start_response(ev: DataEvent) -> StartResponse { - StartResponse { - event: buffa::MessageField::some(make_data_event(ev)), - ..Default::default() - } -} - fn make_end_event(end: process_handler::EndEvent) -> ProcessEvent { ProcessEvent { event: Some(process_event::Event::End(Box::new( @@ -526,13 +531,6 @@ fn make_end_event(end: process_handler::EndEvent) -> ProcessEvent { } } -fn make_end_start_response(end: process_handler::EndEvent) -> StartResponse { - StartResponse { - event: buffa::MessageField::some(make_end_event(end)), - ..Default::default() - } -} - #[cfg(test)] mod tests { use super::*; @@ -589,7 +587,8 @@ mod tests { fn args_other_user_left_literal() { let home_dir = "/home/testuser"; let args_raw = vec!["~other".to_string(), "~other/path".to_string()]; - let args: Vec = args_raw.into_iter() + let args: Vec = args_raw + .into_iter() .map(|a| expand_tilde(&a, home_dir).unwrap_or(a)) .collect(); assert_eq!(args, vec!["~other", "~other/path"]); @@ -618,17 +617,22 @@ mod tests { "/tmp/out".to_string(), "~other".to_string(), ]; - let args: Vec = args_raw.into_iter() + let args: Vec = args_raw + .into_iter() .map(|a| expand_tilde(&a, home_dir).unwrap_or(a)) .collect(); - assert_eq!(args, vec!["-p", "/home/testuser/data", "/tmp/out", "~other"]); + assert_eq!( + args, + vec!["-p", "/home/testuser/data", "/tmp/out", "~other"] + ); } #[test] fn args_empty_passthrough() { let home_dir = "/home/testuser"; let args_raw: Vec = vec![]; - let args: Vec = args_raw.into_iter() + let args: Vec = args_raw + .into_iter() .map(|a| expand_tilde(&a, home_dir).unwrap_or(a)) .collect(); assert!(args.is_empty()); diff --git a/envd-rs/src/state.rs b/envd-rs/src/state.rs index 4b10ccb6..f731c0d1 100644 --- a/envd-rs/src/state.rs +++ b/envd-rs/src/state.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use crate::auth::token::SecureToken; @@ -17,6 +17,11 @@ pub struct AppState { pub port_subsystem: Option>, pub cpu_used_pct: AtomicU32, pub cpu_count: AtomicU32, + /// Whole-VM IO throughput, bytes/sec, sampled over the last 1s tick. Used + /// by the host activity sampler to keep IO-bound-but-CPU-idle workloads + /// (e.g. a long download) from being mistaken for inactive. + pub net_bps: AtomicU64, + pub disk_bps: AtomicU64, /// Memory preload coordination. The host agent POSTs /memory/preload after /// a snapshot restore to materialise every physical page (so the next @@ -56,6 +61,8 @@ impl AppState { port_subsystem, cpu_used_pct: AtomicU32::new(0), cpu_count: AtomicU32::new(0), + net_bps: AtomicU64::new(0), + disk_bps: AtomicU64::new(0), mem_preload_started: AtomicBool::new(false), mem_preload_done: AtomicBool::new(false), mem_preload_cancel: AtomicBool::new(false), @@ -70,7 +77,7 @@ impl AppState { let state_clone = Arc::clone(&state); std::thread::spawn(move || { - cpu_sampler(state_clone); + activity_sampler(state_clone); }); state @@ -84,6 +91,14 @@ impl AppState { self.cpu_count.load(Ordering::Relaxed) } + pub fn net_bps(&self) -> u64 { + self.net_bps.load(Ordering::Relaxed) + } + + pub fn disk_bps(&self) -> u64 { + self.disk_bps.load(Ordering::Relaxed) + } + /// Records a new lifecycle ID, returning true if it changed (i.e. this /// is the first /init since a resume). First-ever call returns false: /// boot-time /init doesn't need port-subsystem restart since the @@ -99,12 +114,16 @@ impl AppState { } } -fn cpu_sampler(state: Arc) { +fn activity_sampler(state: Arc) { use sysinfo::System; let mut sys = System::new(); sys.refresh_cpu_all(); + // Cumulative IO counters from the previous tick. None until the first read. + let mut prev_net: Option = read_net_bytes(); + let mut prev_disk: Option = read_disk_bytes(); + loop { std::thread::sleep(std::time::Duration::from_secs(1)); @@ -123,5 +142,73 @@ fn cpu_sampler(state: Arc) { state .cpu_count .store(sys.cpus().len() as u32, Ordering::Relaxed); + + // Throughput = cumulative-counter delta over the ~1s tick. Counters can + // reset across a snapshot restore; a wrapped/negative delta reads as 0. + let cur_net = read_net_bytes(); + let net_bps = match (prev_net, cur_net) { + (Some(p), Some(c)) => c.saturating_sub(p), + _ => 0, + }; + prev_net = cur_net; + + let cur_disk = read_disk_bytes(); + let disk_bps = match (prev_disk, cur_disk) { + (Some(p), Some(c)) => c.saturating_sub(p), + _ => 0, + }; + prev_disk = cur_disk; + + state.net_bps.store(net_bps, Ordering::Relaxed); + state.disk_bps.store(disk_bps, Ordering::Relaxed); } } + +/// Sum of rx+tx bytes across all non-loopback interfaces, from /proc/net/dev. +/// Returns None if the file can't be read/parsed. +fn read_net_bytes() -> Option { + let content = std::fs::read_to_string("/proc/net/dev").ok()?; + let mut total: u64 = 0; + // First two lines are headers. + for line in content.lines().skip(2) { + let Some((iface, rest)) = line.split_once(':') else { + continue; + }; + if iface.trim() == "lo" { + continue; + } + let fields: Vec<&str> = rest.split_whitespace().collect(); + // Column 0 = rx bytes, column 8 = tx bytes. + if let Some(rx) = fields.first().and_then(|v| v.parse::().ok()) { + total = total.saturating_add(rx); + } + if let Some(tx) = fields.get(8).and_then(|v| v.parse::().ok()) { + total = total.saturating_add(tx); + } + } + Some(total) +} + +/// Sum of sectors read+written across all block devices, ×512, from +/// /proc/diskstats. Skips partitions and loop/ram devices to avoid double +/// counting. Returns None if the file can't be read/parsed. +fn read_disk_bytes() -> Option { + let content = std::fs::read_to_string("/proc/diskstats").ok()?; + let mut sectors: u64 = 0; + for line in content.lines() { + let fields: Vec<&str> = line.split_whitespace().collect(); + // 0=major 1=minor 2=name ... 5=sectors read ... 9=sectors written. + if fields.len() < 10 { + continue; + } + let name = fields[2]; + if name.starts_with("loop") || name.starts_with("ram") { + continue; + } + let read = fields[5].parse::().unwrap_or(0); + let written = fields[9].parse::().unwrap_or(0); + sectors = sectors.saturating_add(read).saturating_add(written); + } + // Linux reports diskstats sectors in fixed 512-byte units. + Some(sectors.saturating_mul(512)) +} diff --git a/envd-rs/src/util.rs b/envd-rs/src/util.rs index b8a0080f..7977a135 100644 --- a/envd-rs/src/util.rs +++ b/envd-rs/src/util.rs @@ -23,12 +23,10 @@ impl AtomicMax { if new <= current { return false; } - match self.val.compare_exchange_weak( - current, - new, - Ordering::Release, - Ordering::Relaxed, - ) { + match self + .val + .compare_exchange_weak(current, new, Ordering::Release, Ordering::Relaxed) + { Ok(_) => return true, Err(_) => continue, } diff --git a/frontend/src/routes/admin/hosts/+page.svelte b/frontend/src/routes/admin/hosts/+page.svelte index 5d252475..6bf8d2b0 100644 --- a/frontend/src/routes/admin/hosts/+page.svelte +++ b/frontend/src/routes/admin/hosts/+page.svelte @@ -53,14 +53,15 @@ let byocPageCount = $derived(Math.max(1, Math.ceil(flatByocHosts.length / PAGE_SIZE))); let byocPageHosts = $derived(flatByocHosts.slice(byocPage * PAGE_SIZE, (byocPage + 1) * PAGE_SIZE)); - // Stats across all hosts - let onlineCount = $derived(allHosts.filter((h) => h.status === 'online').length); - let pendingCount = $derived(allHosts.filter((h) => h.status === 'pending').length); - let totalCount = $derived(allHosts.length); - let totalCpuCores = $derived(allHosts.reduce((sum, h) => sum + (h.cpu_cores ?? 0), 0)); - let totalMemoryMb = $derived(allHosts.reduce((sum, h) => sum + (h.memory_mb ?? 0), 0)); - let totalRunningVcpus = $derived(allHosts.reduce((sum, h) => sum + h.running_vcpus, 0)); - let totalRunningMemoryMb = $derived(allHosts.reduce((sum, h) => sum + h.running_memory_mb, 0)); + // Aggregated stats — platform hosts only (admin needs a heads-up on + // platform capacity; BYOC capacity belongs to individual teams). + let onlineCount = $derived(platformHosts.filter((h) => h.status === 'online').length); + let pendingCount = $derived(platformHosts.filter((h) => h.status === 'pending').length); + let totalCount = $derived(platformHosts.length); + let totalCpuCores = $derived(platformHosts.reduce((sum, h) => sum + (h.cpu_cores ?? 0), 0)); + let totalMemoryMb = $derived(platformHosts.reduce((sum, h) => sum + (h.memory_mb ?? 0), 0)); + let totalRunningVcpus = $derived(platformHosts.reduce((sum, h) => sum + h.running_vcpus, 0)); + let totalRunningMemoryMb = $derived(platformHosts.reduce((sum, h) => sum + h.running_memory_mb, 0)); function formatMem(mb: number): string { return mb >= 1024 ? `${(mb / 1024).toFixed(0)} GB` : `${mb} MB`; diff --git a/frontend/src/routes/dashboard/capsules/+layout.svelte b/frontend/src/routes/dashboard/capsules/+layout.svelte index d2080a70..34a3524a 100644 --- a/frontend/src/routes/dashboard/capsules/+layout.svelte +++ b/frontend/src/routes/dashboard/capsules/+layout.svelte @@ -6,10 +6,6 @@ let { children } = $props(); - - Wrenn — Capsules - -
{#if $page.params.id} diff --git a/frontend/src/routes/dashboard/capsules/+page.svelte b/frontend/src/routes/dashboard/capsules/+page.svelte index 1dffc9df..bb9bbc5d 100644 --- a/frontend/src/routes/dashboard/capsules/+page.svelte +++ b/frontend/src/routes/dashboard/capsules/+page.svelte @@ -256,6 +256,10 @@ }); + + Wrenn — Capsules + +