forked from wrenn/wrenn
v0.1.6 (#45)
## What's New? Performance updates for large capsules, admin panel enhancement and bug fixes ### Envd - Fixed bug with sandbox metrics calculation - Page cache drop and balloon inflation to reduce memfile snapshot - Updated rpc timeout logic for better control - Added tests ### Admin Panel - Add/Remove platform admin - Updated template deletion logic for fine grained permission ### Others - Minor frontend visual improvement - Minor bugfixes - Version bump Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com> Reviewed-on: wrenn/wrenn#45 Co-authored-by: pptx704 <rafeed@omukk.dev> Co-committed-by: pptx704 <rafeed@omukk.dev>
This commit is contained in:
3
envd-rs/Cargo.lock
generated
3
envd-rs/Cargo.lock
generated
@ -514,7 +514,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "envd"
|
||||
version = "0.2.0"
|
||||
version = "0.2.1"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"axum",
|
||||
@ -543,6 +543,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"subtle",
|
||||
"sysinfo",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "envd"
|
||||
version = "0.2.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
rust-version = "1.88"
|
||||
|
||||
@ -72,6 +72,9 @@ buffa = "0.3"
|
||||
async-stream = "0.3.6"
|
||||
mime_guess = "2"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
||||
[build-dependencies]
|
||||
connectrpc-build = "0.3"
|
||||
|
||||
|
||||
@ -83,3 +83,128 @@ pub fn validate_signing(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn test_token(val: &[u8]) -> SecureToken {
|
||||
let t = SecureToken::new();
|
||||
t.set(val).unwrap();
|
||||
t
|
||||
}
|
||||
|
||||
fn far_future() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64
|
||||
+ 3600
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_starts_with_v1() {
|
||||
let token = test_token(b"secret");
|
||||
let sig = generate_signature(&token, "/file", "root", READ_OPERATION, None).unwrap();
|
||||
assert!(sig.starts_with("v1_"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_deterministic() {
|
||||
let token = test_token(b"secret");
|
||||
let s1 = generate_signature(&token, "/file", "root", READ_OPERATION, None).unwrap();
|
||||
let s2 = generate_signature(&token, "/file", "root", READ_OPERATION, None).unwrap();
|
||||
assert_eq!(s1, s2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_with_expiration_differs() {
|
||||
let token = test_token(b"secret");
|
||||
let without = generate_signature(&token, "/f", "u", READ_OPERATION, None).unwrap();
|
||||
let with = generate_signature(&token, "/f", "u", READ_OPERATION, Some(9999)).unwrap();
|
||||
assert_ne!(without, with);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_unset_token_errors() {
|
||||
let token = SecureToken::new();
|
||||
assert!(generate_signature(&token, "/f", "u", READ_OPERATION, None).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_no_token_set_passes() {
|
||||
let token = SecureToken::new();
|
||||
assert!(validate_signing(&token, None, None, None, "root", "/f", READ_OPERATION).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_correct_header_token() {
|
||||
let token = test_token(b"secret");
|
||||
assert!(validate_signing(&token, Some("secret"), None, None, "root", "/f", READ_OPERATION).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_wrong_header_token() {
|
||||
let token = test_token(b"secret");
|
||||
let result = validate_signing(&token, Some("wrong"), None, None, "root", "/f", READ_OPERATION);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("does not match"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_valid_signature() {
|
||||
let token = test_token(b"secret");
|
||||
let exp = far_future();
|
||||
let sig = generate_signature(&token, "/file", "root", READ_OPERATION, Some(exp)).unwrap();
|
||||
assert!(validate_signing(&token, None, Some(&sig), Some(exp), "root", "/file", READ_OPERATION).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_invalid_signature() {
|
||||
let token = test_token(b"secret");
|
||||
let result = validate_signing(&token, None, Some("v1_bad"), Some(far_future()), "root", "/f", READ_OPERATION);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("invalid signature"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_expired_signature() {
|
||||
let token = test_token(b"secret");
|
||||
let expired: i64 = 1_000_000;
|
||||
let sig = generate_signature(&token, "/f", "root", READ_OPERATION, Some(expired)).unwrap();
|
||||
let result = validate_signing(&token, None, Some(&sig), Some(expired), "root", "/f", READ_OPERATION);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("expired"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_missing_signature() {
|
||||
let token = test_token(b"secret");
|
||||
let result = validate_signing(&token, None, None, None, "root", "/f", READ_OPERATION);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("missing signature"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_empty_header_token_falls_through_to_signature() {
|
||||
let token = test_token(b"secret");
|
||||
let result = validate_signing(&token, Some(""), None, None, "root", "/f", READ_OPERATION);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("missing signature"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_valid_signature_no_expiration() {
|
||||
let token = test_token(b"secret");
|
||||
let sig = generate_signature(&token, "/file", "root", READ_OPERATION, None).unwrap();
|
||||
assert!(validate_signing(&token, None, Some(&sig), None, "root", "/file", READ_OPERATION).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn different_operations_produce_different_signatures() {
|
||||
let token = test_token(b"secret");
|
||||
let r = generate_signature(&token, "/f", "root", READ_OPERATION, None).unwrap();
|
||||
let w = generate_signature(&token, "/f", "root", WRITE_OPERATION, None).unwrap();
|
||||
assert_ne!(r, w);
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,3 +125,132 @@ impl SecureToken {
|
||||
Ok(token)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn new_is_unset() {
|
||||
let t = SecureToken::new();
|
||||
assert!(!t.is_set());
|
||||
assert!(!t.equals("anything"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_and_equals() {
|
||||
let t = SecureToken::new();
|
||||
t.set(b"secret").unwrap();
|
||||
assert!(t.is_set());
|
||||
assert!(t.equals("secret"));
|
||||
assert!(!t.equals("wrong"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_empty_errors() {
|
||||
let t = SecureToken::new();
|
||||
assert!(t.set(b"").is_err());
|
||||
assert!(!t.is_set());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_overwrites_previous() {
|
||||
let t = SecureToken::new();
|
||||
t.set(b"first").unwrap();
|
||||
t.set(b"second").unwrap();
|
||||
assert!(!t.equals("first"));
|
||||
assert!(t.equals("second"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn destroy_clears() {
|
||||
let t = SecureToken::new();
|
||||
t.set(b"secret").unwrap();
|
||||
t.destroy();
|
||||
assert!(!t.is_set());
|
||||
assert!(!t.equals("secret"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_returns_copy() {
|
||||
let t = SecureToken::new();
|
||||
assert!(t.bytes().is_none());
|
||||
t.set(b"hello").unwrap();
|
||||
assert_eq!(t.bytes().unwrap(), b"hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn take_from_transfers_and_clears_source() {
|
||||
let src = SecureToken::new();
|
||||
src.set(b"token").unwrap();
|
||||
let dst = SecureToken::new();
|
||||
dst.take_from(&src);
|
||||
assert!(!src.is_set());
|
||||
assert!(dst.equals("token"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn take_from_overwrites_existing() {
|
||||
let src = SecureToken::new();
|
||||
src.set(b"new").unwrap();
|
||||
let dst = SecureToken::new();
|
||||
dst.set(b"old").unwrap();
|
||||
dst.take_from(&src);
|
||||
assert!(dst.equals("new"));
|
||||
assert!(!dst.equals("old"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn equals_secure_matching() {
|
||||
let a = SecureToken::new();
|
||||
a.set(b"same").unwrap();
|
||||
let b = SecureToken::new();
|
||||
b.set(b"same").unwrap();
|
||||
assert!(a.equals_secure(&b));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn equals_secure_different() {
|
||||
let a = SecureToken::new();
|
||||
a.set(b"one").unwrap();
|
||||
let b = SecureToken::new();
|
||||
b.set(b"two").unwrap();
|
||||
assert!(!a.equals_secure(&b));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn equals_secure_unset() {
|
||||
let a = SecureToken::new();
|
||||
let b = SecureToken::new();
|
||||
assert!(!a.equals_secure(&b));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_json_bytes_valid() {
|
||||
let mut data = b"\"mysecret\"".to_vec();
|
||||
let t = SecureToken::from_json_bytes(&mut data).unwrap();
|
||||
assert!(t.equals("mysecret"));
|
||||
assert!(data.iter().all(|&b| b == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_json_bytes_rejects_missing_quotes() {
|
||||
let mut data = b"noquotes".to_vec();
|
||||
assert!(SecureToken::from_json_bytes(&mut data).is_err());
|
||||
assert!(data.iter().all(|&b| b == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_json_bytes_rejects_escape_sequences() {
|
||||
let mut data = b"\"has\\nescapes\"".to_vec();
|
||||
assert!(SecureToken::from_json_bytes(&mut data).is_err());
|
||||
assert!(data.iter().all(|&b| b == 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_json_bytes_rejects_empty_content() {
|
||||
let mut data = b"\"\"".to_vec();
|
||||
assert!(SecureToken::from_json_bytes(&mut data).is_err());
|
||||
assert!(data.iter().all(|&b| b == 0));
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,4 +76,125 @@ impl ConnTracker {
|
||||
pub fn keepalives_enabled(&self) -> bool {
|
||||
self.inner.lock().unwrap().keepalives_enabled
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn active_count(&self) -> usize {
|
||||
self.inner.lock().unwrap().active.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn register_assigns_sequential_ids() {
|
||||
let ct = ConnTracker::new();
|
||||
assert_eq!(ct.register_connection(), 0);
|
||||
assert_eq!(ct.register_connection(), 1);
|
||||
assert_eq!(ct.register_connection(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_clears_active() {
|
||||
let ct = ConnTracker::new();
|
||||
let id = ct.register_connection();
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
ct.remove_connection(id);
|
||||
assert_eq!(ct.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_nonexistent_is_noop() {
|
||||
let ct = ConnTracker::new();
|
||||
ct.remove_connection(999);
|
||||
assert_eq!(ct.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prepare_disables_keepalives() {
|
||||
let ct = ConnTracker::new();
|
||||
assert!(ct.keepalives_enabled());
|
||||
ct.register_connection();
|
||||
ct.prepare_for_snapshot();
|
||||
assert!(!ct.keepalives_enabled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restore_removes_zombies_and_reenables_keepalives() {
|
||||
let ct = ConnTracker::new();
|
||||
let id0 = ct.register_connection();
|
||||
let id1 = ct.register_connection();
|
||||
ct.prepare_for_snapshot();
|
||||
ct.restore_after_snapshot();
|
||||
assert!(ct.keepalives_enabled());
|
||||
// Both pre-snapshot connections removed as zombies
|
||||
assert_eq!(ct.active_count(), 0);
|
||||
// IDs don't matter anymore, but remove shouldn't panic
|
||||
ct.remove_connection(id0);
|
||||
ct.remove_connection(id1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restore_without_prepare_is_noop() {
|
||||
let ct = ConnTracker::new();
|
||||
let _id = ct.register_connection();
|
||||
ct.restore_after_snapshot();
|
||||
assert!(ct.keepalives_enabled());
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_closed_before_restore_not_zombie() {
|
||||
let ct = ConnTracker::new();
|
||||
let id0 = ct.register_connection();
|
||||
let _id1 = ct.register_connection();
|
||||
ct.prepare_for_snapshot();
|
||||
// Close id0 during snapshot window
|
||||
ct.remove_connection(id0);
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
ct.restore_after_snapshot();
|
||||
// id1 was zombie (still active at restore), id0 already gone
|
||||
assert_eq!(ct.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_snapshot_connection_survives_restore() {
|
||||
let ct = ConnTracker::new();
|
||||
ct.register_connection();
|
||||
ct.prepare_for_snapshot();
|
||||
// New connection after snapshot
|
||||
let _post = ct.register_connection();
|
||||
ct.restore_after_snapshot();
|
||||
// Pre-snapshot connection removed, post-snapshot survives
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn full_lifecycle() {
|
||||
let ct = ConnTracker::new();
|
||||
let _a = ct.register_connection();
|
||||
let b = ct.register_connection();
|
||||
let _c = ct.register_connection();
|
||||
assert_eq!(ct.active_count(), 3);
|
||||
assert!(ct.keepalives_enabled());
|
||||
|
||||
ct.prepare_for_snapshot();
|
||||
assert!(!ct.keepalives_enabled());
|
||||
|
||||
let d = ct.register_connection();
|
||||
ct.remove_connection(b);
|
||||
|
||||
ct.restore_after_snapshot();
|
||||
assert!(ct.keepalives_enabled());
|
||||
// a and c were zombies, b removed before restore, d is post-snapshot
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
ct.remove_connection(d);
|
||||
assert_eq!(ct.active_count(), 0);
|
||||
|
||||
// Can reuse tracker after restore
|
||||
let e = ct.register_connection();
|
||||
assert_eq!(ct.active_count(), 1);
|
||||
assert!(e > d);
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,8 +15,29 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_hmac_sha256() {
|
||||
let result = compute(b"key", b"message");
|
||||
assert_eq!(result.len(), 64); // SHA-256 hex = 64 chars
|
||||
fn rfc4231_tc1() {
|
||||
let key = &[0x0b; 20];
|
||||
let data = b"Hi There";
|
||||
assert_eq!(
|
||||
compute(key, data),
|
||||
"b0344c61d8db38535ca8afceaf0bf12b881dc200c9833da726e9376c2e32cff7"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rfc4231_tc2() {
|
||||
let key = b"Jefe";
|
||||
let data = b"what do ya want for nothing?";
|
||||
assert_eq!(
|
||||
compute(key, data),
|
||||
"5bdcc146bf60754e6a042426089575c75a003f089d2739839dec58b964ec3843"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_is_64_hex_chars() {
|
||||
let result = compute(b"key", b"data");
|
||||
assert_eq!(result.len(), 64);
|
||||
assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,17 +17,38 @@ pub fn hash_without_prefix(data: &[u8]) -> String {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const VECTORS: &[(&[u8], &str)] = &[
|
||||
(b"", "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU"),
|
||||
(b"abc", "ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0"),
|
||||
(b"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", "JI1qYdIGOLjlwCaTDD5gOaM85Flk/yFn9uzt1BnbBsE"),
|
||||
];
|
||||
|
||||
#[test]
|
||||
fn test_hash_format() {
|
||||
let result = hash(b"test");
|
||||
assert!(result.starts_with("$sha256$"));
|
||||
assert!(!result.contains('='));
|
||||
fn known_answer_with_prefix() {
|
||||
for (input, expected_b64) in VECTORS {
|
||||
let result = hash(input);
|
||||
assert_eq!(result, format!("$sha256${expected_b64}"), "input: {:?}", String::from_utf8_lossy(input));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hash_without_prefix() {
|
||||
let result = hash_without_prefix(b"test");
|
||||
assert!(!result.starts_with("$sha256$"));
|
||||
assert!(!result.contains('='));
|
||||
fn known_answer_without_prefix() {
|
||||
for (input, expected_b64) in VECTORS {
|
||||
let result = hash_without_prefix(input);
|
||||
assert_eq!(result, *expected_b64, "input: {:?}", String::from_utf8_lossy(input));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_base64_padding() {
|
||||
for (input, _) in VECTORS {
|
||||
assert!(!hash(input).contains('='));
|
||||
assert!(!hash_without_prefix(input).contains('='));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deterministic() {
|
||||
assert_eq!(hash(b"test"), hash(b"test"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,11 +14,30 @@ pub fn hash_access_token_bytes(token: &[u8]) -> String {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const VECTORS: &[(&str, &str)] = &[
|
||||
("", "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"),
|
||||
("abc", "ddaf35a193617abacc417349ae20413112e6fa4e89a97ea20a9eeee64b55d39a2192992a274fc1a836ba3c23a3feebbd454d4423643ce80e2a9ac94fa54ca49f"),
|
||||
("abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", "204a8fc6dda82f0a0ced7beb8e08a41657c16ef468b228a8279be331a703c33596fd15c13b1b07f9aa1d3bea57789ca031ad85c7a71dd70354ec631238ca3445"),
|
||||
];
|
||||
|
||||
#[test]
|
||||
fn test_hash_access_token() {
|
||||
let h1 = hash_access_token("test");
|
||||
let h2 = hash_access_token_bytes(b"test");
|
||||
assert_eq!(h1, h2);
|
||||
assert_eq!(h1.len(), 128); // SHA-512 hex = 128 chars
|
||||
fn known_answer() {
|
||||
for (input, expected) in VECTORS {
|
||||
assert_eq!(hash_access_token(input), *expected, "input: {input:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn str_and_bytes_agree() {
|
||||
for (input, _) in VECTORS {
|
||||
assert_eq!(hash_access_token(input), hash_access_token_bytes(input.as_bytes()));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_is_lowercase_hex_128_chars() {
|
||||
let h = hash_access_token("anything");
|
||||
assert_eq!(h.len(), 128);
|
||||
assert!(h.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,21 +1,36 @@
|
||||
use dashmap::DashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Defaults {
|
||||
pub env_vars: Arc<DashMap<String, String>>,
|
||||
pub user: String,
|
||||
pub workdir: Option<String>,
|
||||
user: RwLock<String>,
|
||||
workdir: RwLock<Option<String>>,
|
||||
}
|
||||
|
||||
impl Defaults {
|
||||
pub fn new(user: &str) -> Self {
|
||||
Self {
|
||||
env_vars: Arc::new(DashMap::new()),
|
||||
user: user.to_string(),
|
||||
workdir: None,
|
||||
user: RwLock::new(user.to_string()),
|
||||
workdir: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn user(&self) -> String {
|
||||
self.user.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_user(&self, user: String) {
|
||||
*self.user.write().unwrap() = user;
|
||||
}
|
||||
|
||||
pub fn workdir(&self) -> Option<String> {
|
||||
self.workdir.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_workdir(&self, workdir: Option<String>) {
|
||||
*self.workdir.write().unwrap() = workdir;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resolve_default_workdir(workdir: &str, default_workdir: Option<&str>) -> String {
|
||||
@ -40,3 +55,64 @@ pub fn resolve_default_username<'a>(
|
||||
}
|
||||
Err("username not provided")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn workdir_explicit_overrides_default() {
|
||||
assert_eq!(resolve_default_workdir("/explicit", Some("/default")), "/explicit");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn workdir_empty_uses_default() {
|
||||
assert_eq!(resolve_default_workdir("", Some("/default")), "/default");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn workdir_empty_no_default_returns_empty() {
|
||||
assert_eq!(resolve_default_workdir("", None), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn workdir_explicit_ignores_none_default() {
|
||||
assert_eq!(resolve_default_workdir("/explicit", None), "/explicit");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn username_explicit_returns_explicit() {
|
||||
assert_eq!(resolve_default_username(Some("root"), "wrenn").unwrap(), "root");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn username_none_uses_default() {
|
||||
assert_eq!(resolve_default_username(None, "wrenn").unwrap(), "wrenn");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn username_none_empty_default_errors() {
|
||||
assert!(resolve_default_username(None, "").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn username_some_overrides_empty_default() {
|
||||
assert_eq!(resolve_default_username(Some("root"), "").unwrap(), "root");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn defaults_user_set_and_get() {
|
||||
let d = Defaults::new("initial");
|
||||
assert_eq!(d.user(), "initial");
|
||||
d.set_user("changed".into());
|
||||
assert_eq!(d.user(), "changed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn defaults_workdir_initially_none() {
|
||||
let d = Defaults::new("user");
|
||||
assert!(d.workdir().is_none());
|
||||
d.set_workdir(Some("/home".into()));
|
||||
assert_eq!(d.workdir().unwrap(), "/home");
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,3 +145,192 @@ pub fn parse_content_encoding<B>(r: &Request<B>) -> Result<&'static str, String>
|
||||
|
||||
Err(format!("unsupported Content-Encoding: {header}, supported: {SUPPORTED_ENCODINGS:?}"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use axum::http::Request;
|
||||
|
||||
fn req_with_accept(v: &str) -> Request<()> {
|
||||
Request::builder()
|
||||
.header("accept-encoding", v)
|
||||
.body(())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn req_with_content(v: &str) -> Request<()> {
|
||||
Request::builder()
|
||||
.header("content-encoding", v)
|
||||
.body(())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn req_no_headers() -> Request<()> {
|
||||
Request::builder().body(()).unwrap()
|
||||
}
|
||||
|
||||
// parse_encoding_with_quality
|
||||
|
||||
#[test]
|
||||
fn encoding_quality_default_1() {
|
||||
let eq = parse_encoding_with_quality("gzip");
|
||||
assert_eq!(eq.encoding, "gzip");
|
||||
assert_eq!(eq.quality, 1.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encoding_quality_explicit() {
|
||||
let eq = parse_encoding_with_quality("gzip;q=0.8");
|
||||
assert_eq!(eq.encoding, "gzip");
|
||||
assert_eq!(eq.quality, 0.8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encoding_quality_case_insensitive() {
|
||||
let eq = parse_encoding_with_quality("GZIP;Q=0.5");
|
||||
assert_eq!(eq.encoding, "gzip");
|
||||
assert_eq!(eq.quality, 0.5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encoding_quality_zero() {
|
||||
let eq = parse_encoding_with_quality("gzip;q=0");
|
||||
assert_eq!(eq.quality, 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encoding_quality_whitespace_trimmed() {
|
||||
let eq = parse_encoding_with_quality(" gzip ; q=0.9 ");
|
||||
assert_eq!(eq.encoding, "gzip");
|
||||
assert_eq!(eq.quality, 0.9);
|
||||
}
|
||||
|
||||
// parse_accept_encoding_header
|
||||
|
||||
#[test]
|
||||
fn accept_header_empty() {
|
||||
let (encs, rejected) = parse_accept_encoding_header("");
|
||||
assert!(encs.is_empty());
|
||||
assert!(!rejected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_header_identity_q0_rejects() {
|
||||
let (_, rejected) = parse_accept_encoding_header("identity;q=0");
|
||||
assert!(rejected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_header_wildcard_q0_rejects_identity() {
|
||||
let (_, rejected) = parse_accept_encoding_header("*;q=0");
|
||||
assert!(rejected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_header_wildcard_q0_but_identity_explicit_accepted() {
|
||||
let (_, rejected) = parse_accept_encoding_header("*;q=0, identity");
|
||||
assert!(!rejected);
|
||||
}
|
||||
|
||||
// parse_accept_encoding (full)
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_no_header_returns_identity() {
|
||||
assert_eq!(parse_accept_encoding(&req_no_headers()).unwrap(), "identity");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_gzip() {
|
||||
assert_eq!(parse_accept_encoding(&req_with_accept("gzip")).unwrap(), "gzip");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_identity_explicit() {
|
||||
assert_eq!(parse_accept_encoding(&req_with_accept("identity")).unwrap(), "identity");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_gzip_higher_quality() {
|
||||
assert_eq!(
|
||||
parse_accept_encoding(&req_with_accept("identity;q=0.1, gzip;q=0.9")).unwrap(),
|
||||
"gzip"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_wildcard_returns_identity() {
|
||||
assert_eq!(parse_accept_encoding(&req_with_accept("*")).unwrap(), "identity");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_wildcard_identity_rejected_returns_gzip() {
|
||||
assert_eq!(
|
||||
parse_accept_encoding(&req_with_accept("identity;q=0, *")).unwrap(),
|
||||
"gzip"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_all_rejected_errors() {
|
||||
assert!(parse_accept_encoding(&req_with_accept("identity;q=0, *;q=0")).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accept_encoding_unsupported_only_falls_to_identity() {
|
||||
assert_eq!(parse_accept_encoding(&req_with_accept("br")).unwrap(), "identity");
|
||||
}
|
||||
|
||||
// is_identity_acceptable
|
||||
|
||||
#[test]
|
||||
fn identity_acceptable_no_header() {
|
||||
assert!(is_identity_acceptable(&req_no_headers()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_acceptable_gzip_only() {
|
||||
assert!(is_identity_acceptable(&req_with_accept("gzip")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_not_acceptable_identity_q0() {
|
||||
assert!(!is_identity_acceptable(&req_with_accept("identity;q=0")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_not_acceptable_wildcard_q0() {
|
||||
assert!(!is_identity_acceptable(&req_with_accept("*;q=0")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_acceptable_wildcard_q0_but_identity_explicit() {
|
||||
assert!(is_identity_acceptable(&req_with_accept("*;q=0, identity")));
|
||||
}
|
||||
|
||||
// parse_content_encoding
|
||||
|
||||
#[test]
|
||||
fn content_encoding_empty_returns_identity() {
|
||||
assert_eq!(parse_content_encoding(&req_no_headers()).unwrap(), "identity");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_encoding_gzip() {
|
||||
assert_eq!(parse_content_encoding(&req_with_content("gzip")).unwrap(), "gzip");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_encoding_identity_explicit() {
|
||||
assert_eq!(parse_content_encoding(&req_with_content("identity")).unwrap(), "identity");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_encoding_unsupported_errors() {
|
||||
assert!(parse_content_encoding(&req_with_content("br")).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_encoding_case_insensitive() {
|
||||
assert_eq!(parse_content_encoding(&req_with_content("GZIP")).unwrap(), "gzip");
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,9 +71,10 @@ pub async fn get_files(
|
||||
let path_str = params.path.as_deref().unwrap_or("");
|
||||
let header_token = extract_header_token(&req);
|
||||
|
||||
let default_user = state.defaults.user();
|
||||
let username = match execcontext::resolve_default_username(
|
||||
params.username.as_deref(),
|
||||
&state.defaults.user,
|
||||
&default_user,
|
||||
) {
|
||||
Ok(u) => u.to_string(),
|
||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
||||
@ -96,7 +97,8 @@ pub async fn get_files(
|
||||
};
|
||||
|
||||
let home_dir = user.dir.to_string_lossy().to_string();
|
||||
let resolved = match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref())
|
||||
let default_workdir = state.defaults.workdir();
|
||||
let resolved = match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref())
|
||||
{
|
||||
Ok(p) => p,
|
||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||
@ -222,9 +224,10 @@ pub async fn post_files(
|
||||
let path_str = params.path.as_deref().unwrap_or("");
|
||||
let header_token = extract_header_token(&req);
|
||||
|
||||
let default_user = state.defaults.user();
|
||||
let username = match execcontext::resolve_default_username(
|
||||
params.username.as_deref(),
|
||||
&state.defaults.user,
|
||||
&default_user,
|
||||
) {
|
||||
Ok(u) => u.to_string(),
|
||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
||||
@ -266,6 +269,7 @@ pub async fn post_files(
|
||||
};
|
||||
|
||||
let mut uploaded: Vec<EntryInfo> = Vec::new();
|
||||
let default_workdir = state.defaults.workdir();
|
||||
|
||||
while let Ok(Some(field)) = multipart.next_field().await {
|
||||
let field_name = field.name().unwrap_or("").to_string();
|
||||
@ -274,7 +278,7 @@ pub async fn post_files(
|
||||
}
|
||||
|
||||
let file_path = if !path_str.is_empty() {
|
||||
match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref()) {
|
||||
match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||
}
|
||||
@ -283,7 +287,7 @@ pub async fn post_files(
|
||||
.file_name()
|
||||
.unwrap_or("upload")
|
||||
.to_string();
|
||||
match expand_and_resolve(&fname, &home_dir, state.defaults.workdir.as_deref()) {
|
||||
match expand_and_resolve(&fname, &home_dir, default_workdir.as_deref()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||
}
|
||||
|
||||
@ -29,6 +29,8 @@ pub async fn get_health(State(state): State<Arc<AppState>>) -> impl IntoResponse
|
||||
fn post_restore_recovery(state: &AppState) {
|
||||
tracing::info!("restore: post-restore recovery (no GC needed in Rust)");
|
||||
|
||||
state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release);
|
||||
|
||||
state.conn_tracker.restore_after_snapshot();
|
||||
tracing::info!("restore: zombie connections closed");
|
||||
|
||||
|
||||
@ -78,11 +78,15 @@ pub async fn post_init(
|
||||
if let Some(ref user) = init_req.default_user {
|
||||
if !user.is_empty() {
|
||||
tracing::debug!(user = %user, "setting default user");
|
||||
let mut defaults = state.defaults.clone();
|
||||
defaults.user = user.clone();
|
||||
// Note: In Rust we'd need interior mutability for this.
|
||||
// For now, env_vars (DashMap) handles concurrent access.
|
||||
// User/workdir mutation deferred to full state refactor.
|
||||
state.defaults.set_user(user.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Set default workdir
|
||||
if let Some(ref workdir) = init_req.default_workdir {
|
||||
if !workdir.is_empty() {
|
||||
tracing::debug!(workdir = %workdir, "setting default workdir");
|
||||
state.defaults.set_workdir(Some(workdir.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,6 +151,9 @@ async fn trigger_restore_and_respond(state: &AppState) -> axum::response::Respon
|
||||
|
||||
fn post_restore_recovery(state: &AppState) {
|
||||
tracing::info!("restore: post-restore recovery (no GC needed in Rust)");
|
||||
|
||||
state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release);
|
||||
|
||||
state.conn_tracker.restore_after_snapshot();
|
||||
|
||||
if let Some(ref ps) = state.port_subsystem {
|
||||
|
||||
@ -46,7 +46,8 @@ fn collect_metrics(state: &AppState) -> Result<Metrics, String> {
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_memory();
|
||||
let mem_total = sys.total_memory();
|
||||
let mem_used = sys.used_memory();
|
||||
let mem_available = sys.available_memory();
|
||||
let mem_used = mem_total.saturating_sub(mem_available);
|
||||
let mem_total_mib = mem_total / 1024 / 1024;
|
||||
let mem_used_mib = mem_used / 1024 / 1024;
|
||||
|
||||
|
||||
@ -10,10 +10,24 @@ use crate::state::AppState;
|
||||
/// POST /snapshot/prepare — quiesce subsystems before Firecracker snapshot.
|
||||
///
|
||||
/// In Rust there is no GC dance. We just:
|
||||
/// 1. Stop port subsystem
|
||||
/// 2. Close idle connections via conntracker
|
||||
/// 3. Set needs_restore flag
|
||||
/// 1. Drop page cache to shrink snapshot size
|
||||
/// 2. Stop port subsystem
|
||||
/// 3. Close idle connections via conntracker
|
||||
/// 4. Set needs_restore flag
|
||||
pub async fn post_snapshot_prepare(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
// Drop page cache BEFORE blocking the reclaimer — avoids snapshotting
|
||||
// gigabytes of stale cache that inflates the memory dump on disk.
|
||||
// "1" = pagecache only (keep dentries/inodes for faster resume).
|
||||
if let Err(e) = std::fs::write("/proc/sys/vm/drop_caches", "1") {
|
||||
tracing::warn!(error = %e, "snapshot/prepare: drop_caches failed");
|
||||
} else {
|
||||
tracing::info!("snapshot/prepare: page cache dropped");
|
||||
}
|
||||
|
||||
// Block memory reclaimer — prevents drop_caches from running mid-freeze
|
||||
// which would corrupt kernel page table state.
|
||||
state.snapshot_in_progress.store(true, Ordering::Release);
|
||||
|
||||
if let Some(ref ps) = state.port_subsystem {
|
||||
ps.stop();
|
||||
tracing::info!("snapshot/prepare: port subsystem stopped");
|
||||
@ -22,6 +36,9 @@ pub async fn post_snapshot_prepare(State(state): State<Arc<AppState>>) -> impl I
|
||||
state.conn_tracker.prepare_for_snapshot();
|
||||
tracing::info!("snapshot/prepare: connections prepared");
|
||||
|
||||
// Sync filesystem buffers so dirty pages are flushed before freeze.
|
||||
unsafe { libc::sync(); }
|
||||
|
||||
state.needs_restore.store(true, Ordering::Release);
|
||||
tracing::info!("snapshot/prepare: ready for freeze");
|
||||
|
||||
|
||||
@ -147,6 +147,14 @@ async fn main() {
|
||||
Some(Arc::clone(&port_subsystem)),
|
||||
);
|
||||
|
||||
// Memory reclaimer — drop page cache when available memory is low.
|
||||
// Firecracker balloon device can only reclaim pages the guest kernel freed.
|
||||
// Pauses during snapshot/prepare to avoid corrupting kernel page table state.
|
||||
if !cli.is_not_fc {
|
||||
let state_for_reclaimer = Arc::clone(&state);
|
||||
std::thread::spawn(move || memory_reclaimer(state_for_reclaimer));
|
||||
}
|
||||
|
||||
// RPC services (Connect protocol — serves Connect + gRPC + gRPC-Web on same port)
|
||||
let connect_router = rpc::rpc_router(Arc::clone(&state));
|
||||
|
||||
@ -188,7 +196,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
||||
use crate::rpc::process_handler;
|
||||
use std::collections::HashMap;
|
||||
|
||||
let user = match lookup_user(&state.defaults.user) {
|
||||
let default_user = state.defaults.user();
|
||||
let user = match lookup_user(&default_user) {
|
||||
Ok(u) => u,
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, "cmd: failed to lookup user");
|
||||
@ -197,9 +206,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
||||
};
|
||||
|
||||
let home = user.dir.to_string_lossy().to_string();
|
||||
let cwd = state
|
||||
.defaults
|
||||
.workdir
|
||||
let default_workdir = state.defaults.workdir();
|
||||
let cwd = default_workdir
|
||||
.as_deref()
|
||||
.unwrap_or(&home);
|
||||
|
||||
@ -214,11 +222,52 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
||||
&user,
|
||||
&state.defaults.env_vars,
|
||||
) {
|
||||
Ok(handle) => {
|
||||
tracing::info!(pid = handle.pid, cmd, "initial command spawned");
|
||||
Ok(spawned) => {
|
||||
tracing::info!(pid = spawned.handle.pid, cmd, "initial command spawned");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(error = %e, cmd, "failed to spawn initial command");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn memory_reclaimer(state: Arc<AppState>) {
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const DROP_THRESHOLD_PCT: u64 = 80;
|
||||
|
||||
loop {
|
||||
std::thread::sleep(CHECK_INTERVAL);
|
||||
|
||||
if state.snapshot_in_progress.load(Ordering::Acquire) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_memory();
|
||||
let total = sys.total_memory();
|
||||
let available = sys.available_memory();
|
||||
|
||||
if total == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let used_pct = ((total - available) * 100) / total;
|
||||
if used_pct >= DROP_THRESHOLD_PCT {
|
||||
if state.snapshot_in_progress.load(Ordering::Acquire) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = std::fs::write("/proc/sys/vm/drop_caches", "3") {
|
||||
tracing::debug!(error = %e, "drop_caches failed");
|
||||
} else {
|
||||
let mut sys2 = sysinfo::System::new();
|
||||
sys2.refresh_memory();
|
||||
let freed_mb =
|
||||
sys2.available_memory().saturating_sub(available) / (1024 * 1024);
|
||||
tracing::info!(used_pct, freed_mb, "page cache dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,3 +70,115 @@ pub fn ensure_dirs(path: &str, uid: Uid, gid: Gid) -> Result<(), String> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// expand_tilde
|
||||
|
||||
#[test]
|
||||
fn tilde_empty_passthrough() {
|
||||
assert_eq!(expand_tilde("", "/home/u").unwrap(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_no_tilde_passthrough() {
|
||||
assert_eq!(expand_tilde("/absolute", "/home/u").unwrap(), "/absolute");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_bare() {
|
||||
assert_eq!(expand_tilde("~", "/home/user").unwrap(), "/home/user");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_slash_path() {
|
||||
assert_eq!(expand_tilde("~/docs", "/home/user").unwrap(), "/home/user/docs");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_nested() {
|
||||
assert_eq!(expand_tilde("~/a/b/c", "/h").unwrap(), "/h/a/b/c");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_other_user_errors() {
|
||||
assert!(expand_tilde("~bob/foo", "/home/user").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tilde_relative_no_tilde() {
|
||||
assert_eq!(expand_tilde("relative/path", "/home/u").unwrap(), "relative/path");
|
||||
}
|
||||
|
||||
// expand_and_resolve
|
||||
|
||||
#[test]
|
||||
fn resolve_absolute_passthrough() {
|
||||
assert_eq!(expand_and_resolve("/abs/path", "/home", None).unwrap(), "/abs/path");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_empty_uses_default() {
|
||||
assert_eq!(expand_and_resolve("", "/home", Some("/default")).unwrap(), "/default");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_empty_no_default_falls_back_to_home() {
|
||||
// Empty path with no default → joins "" with home_dir → returns home_dir
|
||||
let result = expand_and_resolve("", "/home", None).unwrap();
|
||||
assert_eq!(result, "/home");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_tilde_expands() {
|
||||
assert_eq!(expand_and_resolve("~/dir", "/home/u", None).unwrap(), "/home/u/dir");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_relative_joins_home() {
|
||||
let result = expand_and_resolve("subdir", "/tmp", None).unwrap();
|
||||
// Relative path joined with home and canonicalized (or raw join on missing)
|
||||
assert!(result.starts_with("/tmp"));
|
||||
assert!(result.contains("subdir"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_tilde_other_user_errors() {
|
||||
assert!(expand_and_resolve("~bob", "/home/u", None).is_err());
|
||||
}
|
||||
|
||||
// ensure_dirs
|
||||
|
||||
#[test]
|
||||
fn ensure_dirs_creates_nested() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let path = tmp.path().join("a/b/c");
|
||||
let uid = nix::unistd::getuid();
|
||||
let gid = nix::unistd::getgid();
|
||||
ensure_dirs(path.to_str().unwrap(), uid, gid).unwrap();
|
||||
assert!(path.is_dir());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_dirs_existing_is_ok() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let uid = nix::unistd::getuid();
|
||||
let gid = nix::unistd::getgid();
|
||||
ensure_dirs(tmp.path().to_str().unwrap(), uid, gid).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_dirs_file_in_path_errors() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let file_path = tmp.path().join("afile");
|
||||
std::fs::write(&file_path, "").unwrap();
|
||||
let nested = file_path.join("subdir");
|
||||
let uid = nix::unistd::getuid();
|
||||
let gid = nix::unistd::getgid();
|
||||
let result = ensure_dirs(nested.to_str().unwrap(), uid, gid);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("path is a file"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,3 +110,151 @@ fn parse_hex_addr(s: &str, family: u32) -> Option<(String, u32)> {
|
||||
|
||||
Some((ip_str, port))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Write;
|
||||
|
||||
// tcp_state_name
|
||||
|
||||
#[test]
|
||||
fn state_all_known_codes() {
|
||||
assert_eq!(tcp_state_name("01"), "ESTABLISHED");
|
||||
assert_eq!(tcp_state_name("02"), "SYN_SENT");
|
||||
assert_eq!(tcp_state_name("03"), "SYN_RECV");
|
||||
assert_eq!(tcp_state_name("04"), "FIN_WAIT1");
|
||||
assert_eq!(tcp_state_name("05"), "FIN_WAIT2");
|
||||
assert_eq!(tcp_state_name("06"), "TIME_WAIT");
|
||||
assert_eq!(tcp_state_name("07"), "CLOSE");
|
||||
assert_eq!(tcp_state_name("08"), "CLOSE_WAIT");
|
||||
assert_eq!(tcp_state_name("09"), "LAST_ACK");
|
||||
assert_eq!(tcp_state_name("0A"), "LISTEN");
|
||||
assert_eq!(tcp_state_name("0B"), "CLOSING");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_unknown_code() {
|
||||
assert_eq!(tcp_state_name("FF"), "UNKNOWN");
|
||||
assert_eq!(tcp_state_name("00"), "UNKNOWN");
|
||||
}
|
||||
|
||||
// parse_hex_addr
|
||||
|
||||
#[test]
|
||||
fn ipv4_localhost() {
|
||||
let (ip, port) = parse_hex_addr("0100007F:0050", libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(ip, "127.0.0.1");
|
||||
assert_eq!(port, 80);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv4_any() {
|
||||
let (ip, port) = parse_hex_addr("00000000:0035", libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(ip, "0.0.0.0");
|
||||
assert_eq!(port, 53);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv4_real_address() {
|
||||
// 192.168.1.1 in little-endian = 0101A8C0
|
||||
let (ip, port) = parse_hex_addr("0101A8C0:01BB", libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(ip, "192.168.1.1");
|
||||
assert_eq!(port, 443);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv4_wrong_byte_count_returns_none() {
|
||||
assert!(parse_hex_addr("0100:0050", libc::AF_INET as u32).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_hex_returns_none() {
|
||||
assert!(parse_hex_addr("ZZZZZZZZ:0050", libc::AF_INET as u32).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_colon_returns_none() {
|
||||
assert!(parse_hex_addr("0100007F0050", libc::AF_INET as u32).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv6_loopback() {
|
||||
// ::1 in /proc/net/tcp6 format: 00000000000000000000000001000000
|
||||
let (ip, port) = parse_hex_addr(
|
||||
"00000000000000000000000001000000:0035",
|
||||
libc::AF_INET6 as u32,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(ip, "::1");
|
||||
assert_eq!(port, 53);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv6_wrong_byte_count_returns_none() {
|
||||
assert!(parse_hex_addr("0100007F:0050", libc::AF_INET6 as u32).is_none());
|
||||
}
|
||||
|
||||
// parse_proc_net_tcp
|
||||
|
||||
fn write_tcp_file(content: &str) -> tempfile::NamedTempFile {
|
||||
let mut f = tempfile::NamedTempFile::new().unwrap();
|
||||
f.write_all(content.as_bytes()).unwrap();
|
||||
f.flush().unwrap();
|
||||
f
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_empty_file() {
|
||||
let f = write_tcp_file(
|
||||
" sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode\n",
|
||||
);
|
||||
let conns = parse_proc_net_tcp(f.path().to_str().unwrap(), libc::AF_INET as u32).unwrap();
|
||||
assert!(conns.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_single_entry() {
|
||||
let content = "\
|
||||
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
|
||||
0: 0100007F:0050 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12345 1 00000000\n";
|
||||
let f = write_tcp_file(content);
|
||||
let conns = parse_proc_net_tcp(f.path().to_str().unwrap(), libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(conns.len(), 1);
|
||||
assert_eq!(conns[0].local_ip, "127.0.0.1");
|
||||
assert_eq!(conns[0].local_port, 80);
|
||||
assert_eq!(conns[0].status, "LISTEN");
|
||||
assert_eq!(conns[0].inode, 12345);
|
||||
assert_eq!(conns[0].family, libc::AF_INET as u32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_skips_malformed_rows() {
|
||||
let content = "\
|
||||
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
|
||||
0: 0100007F:0050 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12345 1 00000000
|
||||
bad line
|
||||
1: short\n";
|
||||
let f = write_tcp_file(content);
|
||||
let conns = parse_proc_net_tcp(f.path().to_str().unwrap(), libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(conns.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_multiple_entries() {
|
||||
let content = "\
|
||||
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
|
||||
0: 0100007F:0050 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 100 1 00000000
|
||||
1: 00000000:01BB 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 200 1 00000000\n";
|
||||
let f = write_tcp_file(content);
|
||||
let conns = parse_proc_net_tcp(f.path().to_str().unwrap(), libc::AF_INET as u32).unwrap();
|
||||
assert_eq!(conns.len(), 2);
|
||||
assert_eq!(conns[0].local_port, 80);
|
||||
assert_eq!(conns[1].local_port, 443);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_nonexistent_file_errors() {
|
||||
assert!(parse_proc_net_tcp("/nonexistent/path", libc::AF_INET as u32).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,3 +140,92 @@ fn format_permissions(mode: u32) -> String {
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// format_permissions
|
||||
|
||||
#[test]
|
||||
fn regular_file_755() {
|
||||
assert_eq!(format_permissions(libc::S_IFREG | 0o755), "-rwxr-xr-x");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn directory_755() {
|
||||
assert_eq!(format_permissions(libc::S_IFDIR | 0o755), "drwxr-xr-x");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn symlink_777() {
|
||||
assert_eq!(format_permissions(libc::S_IFLNK | 0o777), "Lrwxrwxrwx");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn regular_file_000() {
|
||||
assert_eq!(format_permissions(libc::S_IFREG | 0o000), "----------");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn regular_file_644() {
|
||||
assert_eq!(format_permissions(libc::S_IFREG | 0o644), "-rw-r--r--");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_device() {
|
||||
assert_eq!(format_permissions(libc::S_IFBLK | 0o660), "brw-rw----");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn char_device() {
|
||||
assert_eq!(format_permissions(libc::S_IFCHR | 0o666), "crw-rw-rw-");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fifo() {
|
||||
assert_eq!(format_permissions(libc::S_IFIFO | 0o644), "prw-r--r--");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn socket() {
|
||||
assert_eq!(format_permissions(libc::S_IFSOCK | 0o755), "Srwxr-xr-x");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_type() {
|
||||
assert_eq!(format_permissions(0o755), "?rwxr-xr-x");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn setuid_in_mode_only_affects_lower_bits() {
|
||||
// setuid (0o4755) — format_permissions masks with 0o777, so same as 0o755
|
||||
assert_eq!(
|
||||
format_permissions(libc::S_IFREG | 0o4755),
|
||||
format_permissions(libc::S_IFREG | 0o755),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_always_10_chars() {
|
||||
for mode in [0o000, 0o777, 0o644, 0o755, 0o4755] {
|
||||
assert_eq!(format_permissions(libc::S_IFREG | mode).len(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
// meta_to_file_type — needs real filesystem
|
||||
|
||||
#[test]
|
||||
fn meta_regular_file() {
|
||||
let f = tempfile::NamedTempFile::new().unwrap();
|
||||
let meta = std::fs::metadata(f.path()).unwrap();
|
||||
assert_eq!(meta_to_file_type(&meta), FileType::FILE_TYPE_FILE);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn meta_directory() {
|
||||
let d = tempfile::TempDir::new().unwrap();
|
||||
let meta = std::fs::metadata(d.path()).unwrap();
|
||||
assert_eq!(meta_to_file_type(&meta), FileType::FILE_TYPE_DIRECTORY);
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,15 +31,15 @@ impl FilesystemServiceImpl {
|
||||
}
|
||||
|
||||
fn resolve_path(&self, path: &str, ctx: &Context) -> Result<String, ConnectError> {
|
||||
let username = extract_username(ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
||||
let username = extract_username(ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||
let user = lookup_user(&username).map_err(|e| {
|
||||
ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}"))
|
||||
})?;
|
||||
|
||||
let home_dir = user.dir.to_string_lossy().to_string();
|
||||
let default_workdir = self.state.defaults.workdir.as_deref();
|
||||
let default_workdir = self.state.defaults.workdir();
|
||||
|
||||
expand_and_resolve(path, &home_dir, default_workdir)
|
||||
expand_and_resolve(path, &home_dir, default_workdir.as_deref())
|
||||
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))
|
||||
}
|
||||
}
|
||||
@ -97,7 +97,7 @@ impl Filesystem for FilesystemServiceImpl {
|
||||
}
|
||||
}
|
||||
|
||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||
let user =
|
||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||
|
||||
@ -122,7 +122,7 @@ impl Filesystem for FilesystemServiceImpl {
|
||||
let source = self.resolve_path(request.source, &ctx)?;
|
||||
let destination = self.resolve_path(request.destination, &ctx)?;
|
||||
|
||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||
let user =
|
||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||
|
||||
|
||||
@ -37,6 +37,7 @@ pub struct ProcessHandle {
|
||||
|
||||
data_tx: broadcast::Sender<DataEvent>,
|
||||
end_tx: broadcast::Sender<EndEvent>,
|
||||
ended: Mutex<Option<EndEvent>>,
|
||||
|
||||
stdin: Mutex<Option<std::process::ChildStdin>>,
|
||||
pty_master: Mutex<Option<std::fs::File>>,
|
||||
@ -51,6 +52,10 @@ impl ProcessHandle {
|
||||
self.end_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn cached_end(&self) -> Option<EndEvent> {
|
||||
self.ended.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> {
|
||||
signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| {
|
||||
ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}"))
|
||||
@ -128,6 +133,12 @@ impl ProcessHandle {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SpawnedProcess {
|
||||
pub handle: Arc<ProcessHandle>,
|
||||
pub data_rx: broadcast::Receiver<DataEvent>,
|
||||
pub end_rx: broadcast::Receiver<EndEvent>,
|
||||
}
|
||||
|
||||
pub fn spawn_process(
|
||||
cmd_str: &str,
|
||||
args: &[String],
|
||||
@ -138,7 +149,7 @@ pub fn spawn_process(
|
||||
tag: Option<String>,
|
||||
user: &nix::unistd::User,
|
||||
default_env_vars: &dashmap::DashMap<String, String>,
|
||||
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
||||
) -> Result<SpawnedProcess, ConnectError> {
|
||||
let mut env: Vec<(String, String)> = Vec::new();
|
||||
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
|
||||
let home = user.dir.to_string_lossy().to_string();
|
||||
@ -244,10 +255,14 @@ pub fn spawn_process(
|
||||
pid,
|
||||
data_tx: data_tx.clone(),
|
||||
end_tx: end_tx.clone(),
|
||||
ended: Mutex::new(None),
|
||||
stdin: Mutex::new(None),
|
||||
pty_master: Mutex::new(Some(master_file)),
|
||||
});
|
||||
|
||||
let data_rx = handle.subscribe_data();
|
||||
let end_rx = handle.subscribe_end();
|
||||
|
||||
let data_tx_clone = data_tx.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut master = master_clone;
|
||||
@ -264,30 +279,29 @@ pub fn spawn_process(
|
||||
});
|
||||
|
||||
let end_tx_clone = end_tx.clone();
|
||||
let handle_for_waiter = Arc::clone(&handle);
|
||||
std::thread::spawn(move || {
|
||||
let mut child = child;
|
||||
match child.wait() {
|
||||
Ok(s) => {
|
||||
let _ = end_tx_clone.send(EndEvent {
|
||||
exit_code: s.code().unwrap_or(-1),
|
||||
exited: s.code().is_some(),
|
||||
status: format!("{s}"),
|
||||
error: None,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = end_tx_clone.send(EndEvent {
|
||||
exit_code: -1,
|
||||
exited: false,
|
||||
status: "error".into(),
|
||||
error: Some(e.to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
let end_event = match child.wait() {
|
||||
Ok(s) => EndEvent {
|
||||
exit_code: s.code().unwrap_or(-1),
|
||||
exited: s.code().is_some(),
|
||||
status: format!("{s}"),
|
||||
error: None,
|
||||
},
|
||||
Err(e) => EndEvent {
|
||||
exit_code: -1,
|
||||
exited: false,
|
||||
status: "error".into(),
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
};
|
||||
*handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
|
||||
let _ = end_tx_clone.send(end_event);
|
||||
});
|
||||
|
||||
tracing::info!(pid, cmd = cmd_str, "process started (pty)");
|
||||
Ok(handle)
|
||||
Ok(SpawnedProcess { handle, data_rx, end_rx })
|
||||
} else {
|
||||
let mut command = std::process::Command::new("/bin/sh");
|
||||
command
|
||||
@ -327,10 +341,14 @@ pub fn spawn_process(
|
||||
pid,
|
||||
data_tx: data_tx.clone(),
|
||||
end_tx: end_tx.clone(),
|
||||
ended: Mutex::new(None),
|
||||
stdin: Mutex::new(stdin),
|
||||
pty_master: Mutex::new(None),
|
||||
});
|
||||
|
||||
let data_rx = handle.subscribe_data();
|
||||
let end_rx = handle.subscribe_end();
|
||||
|
||||
if let Some(mut out) = stdout {
|
||||
let tx = data_tx.clone();
|
||||
std::thread::spawn(move || {
|
||||
@ -364,29 +382,28 @@ pub fn spawn_process(
|
||||
}
|
||||
|
||||
let end_tx_clone = end_tx.clone();
|
||||
let handle_for_waiter = Arc::clone(&handle);
|
||||
std::thread::spawn(move || {
|
||||
match child.wait() {
|
||||
Ok(s) => {
|
||||
let _ = end_tx_clone.send(EndEvent {
|
||||
exit_code: s.code().unwrap_or(-1),
|
||||
exited: s.code().is_some(),
|
||||
status: format!("{s}"),
|
||||
error: None,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = end_tx_clone.send(EndEvent {
|
||||
exit_code: -1,
|
||||
exited: false,
|
||||
status: "error".into(),
|
||||
error: Some(e.to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
let end_event = match child.wait() {
|
||||
Ok(s) => EndEvent {
|
||||
exit_code: s.code().unwrap_or(-1),
|
||||
exited: s.code().is_some(),
|
||||
status: format!("{s}"),
|
||||
error: None,
|
||||
},
|
||||
Err(e) => EndEvent {
|
||||
exit_code: -1,
|
||||
exited: false,
|
||||
status: "error".into(),
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
};
|
||||
*handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
|
||||
let _ = end_tx_clone.send(end_event);
|
||||
});
|
||||
|
||||
tracing::info!(pid, cmd = cmd_str, "process started (pipe)");
|
||||
Ok(handle)
|
||||
Ok(SpawnedProcess { handle, data_rx, end_rx })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -66,12 +66,12 @@ impl ProcessServiceImpl {
|
||||
fn spawn_from_request(
|
||||
&self,
|
||||
request: &StartRequestView<'_>,
|
||||
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
||||
) -> Result<process_handler::SpawnedProcess, ConnectError> {
|
||||
let proc_config = request.process.as_option().ok_or_else(|| {
|
||||
ConnectError::new(ErrorCode::InvalidArgument, "process config required")
|
||||
})?;
|
||||
|
||||
let username = self.state.defaults.user.clone();
|
||||
let username = self.state.defaults.user();
|
||||
let user =
|
||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||
|
||||
@ -85,7 +85,8 @@ impl ProcessServiceImpl {
|
||||
|
||||
let home_dir = user.dir.to_string_lossy().to_string();
|
||||
let cwd_str: &str = proc_config.cwd.unwrap_or("");
|
||||
let cwd = expand_and_resolve(cwd_str, &home_dir, self.state.defaults.workdir.as_deref())
|
||||
let default_workdir = self.state.defaults.workdir();
|
||||
let cwd = expand_and_resolve(cwd_str, &home_dir, default_workdir.as_deref())
|
||||
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?;
|
||||
|
||||
let effective_cwd = if cwd.is_empty() { "/" } else { &cwd };
|
||||
@ -116,7 +117,7 @@ impl ProcessServiceImpl {
|
||||
"process.Start request"
|
||||
);
|
||||
|
||||
let handle = process_handler::spawn_process(
|
||||
let spawned = process_handler::spawn_process(
|
||||
cmd,
|
||||
&args,
|
||||
&envs,
|
||||
@ -128,17 +129,17 @@ impl ProcessServiceImpl {
|
||||
&self.state.defaults.env_vars,
|
||||
)?;
|
||||
|
||||
self.processes.insert(handle.pid, Arc::clone(&handle));
|
||||
self.processes.insert(spawned.handle.pid, Arc::clone(&spawned.handle));
|
||||
|
||||
let processes = self.processes.clone();
|
||||
let pid = handle.pid;
|
||||
let mut end_rx = handle.subscribe_end();
|
||||
let pid = spawned.handle.pid;
|
||||
let mut cleanup_end_rx = spawned.handle.subscribe_end();
|
||||
tokio::spawn(async move {
|
||||
let _ = end_rx.recv().await;
|
||||
let _ = cleanup_end_rx.recv().await;
|
||||
processes.remove(&pid);
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
Ok(spawned)
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,26 +183,36 @@ impl Process for ProcessServiceImpl {
|
||||
),
|
||||
ConnectError,
|
||||
> {
|
||||
let handle = self.spawn_from_request(&request)?;
|
||||
let pid = handle.pid;
|
||||
let spawned = self.spawn_from_request(&request)?;
|
||||
let pid = spawned.handle.pid;
|
||||
|
||||
let mut data_rx = handle.subscribe_data();
|
||||
let mut end_rx = handle.subscribe_end();
|
||||
let mut data_rx = spawned.data_rx;
|
||||
let mut end_rx = spawned.end_rx;
|
||||
|
||||
let stream = async_stream::stream! {
|
||||
yield Ok(make_start_response(pid));
|
||||
|
||||
loop {
|
||||
match data_rx.recv().await {
|
||||
Ok(ev) => yield Ok(make_data_start_response(ev)),
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
tokio::select! {
|
||||
biased;
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Ok(ev) => yield Ok(make_data_start_response(ev)),
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
end = end_rx.recv() => {
|
||||
while let Ok(ev) = data_rx.try_recv() {
|
||||
yield Ok(make_data_start_response(ev));
|
||||
}
|
||||
if let Ok(end) = end {
|
||||
yield Ok(make_end_start_response(end));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(end) = end_rx.recv().await {
|
||||
yield Ok(make_end_start_response(end));
|
||||
}
|
||||
};
|
||||
|
||||
Ok((Box::pin(stream), ctx))
|
||||
@ -226,6 +237,7 @@ impl Process for ProcessServiceImpl {
|
||||
|
||||
let mut data_rx = handle.subscribe_data();
|
||||
let mut end_rx = handle.subscribe_end();
|
||||
let cached_end = handle.cached_end();
|
||||
|
||||
let stream = async_stream::stream! {
|
||||
yield Ok(ConnectResponse {
|
||||
@ -238,24 +250,44 @@ impl Process for ProcessServiceImpl {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
loop {
|
||||
match data_rx.recv().await {
|
||||
Ok(ev) => {
|
||||
yield Ok(ConnectResponse {
|
||||
event: buffa::MessageField::some(make_data_event(ev)),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(end) = end_rx.recv().await {
|
||||
if let Some(end) = cached_end {
|
||||
yield Ok(ConnectResponse {
|
||||
event: buffa::MessageField::some(make_end_event(end)),
|
||||
..Default::default()
|
||||
});
|
||||
} else {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Ok(ev) => {
|
||||
yield Ok(ConnectResponse {
|
||||
event: buffa::MessageField::some(make_data_event(ev)),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
end = end_rx.recv() => {
|
||||
while let Ok(ev) = data_rx.try_recv() {
|
||||
yield Ok(ConnectResponse {
|
||||
event: buffa::MessageField::some(make_data_event(ev)),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
if let Ok(end) = end {
|
||||
yield Ok(ConnectResponse {
|
||||
event: buffa::MessageField::some(make_end_event(end)),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -19,6 +19,7 @@ pub struct AppState {
|
||||
pub port_subsystem: Option<Arc<PortSubsystem>>,
|
||||
pub cpu_used_pct: AtomicU32,
|
||||
pub cpu_count: AtomicU32,
|
||||
pub snapshot_in_progress: AtomicBool,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@ -41,6 +42,7 @@ impl AppState {
|
||||
port_subsystem,
|
||||
cpu_used_pct: AtomicU32::new(0),
|
||||
cpu_count: AtomicU32::new(0),
|
||||
snapshot_in_progress: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
let state_clone = Arc::clone(&state);
|
||||
|
||||
@ -11,6 +11,10 @@ impl AtomicMax {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self) -> i64 {
|
||||
self.val.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Sets the stored value to `new` if `new` is strictly greater than
|
||||
/// the current value. Returns `true` if the value was updated.
|
||||
pub fn set_to_greater(&self, new: i64) -> bool {
|
||||
@ -31,3 +35,68 @@ impl AtomicMax {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn initial_value_is_i64_min() {
|
||||
let m = AtomicMax::new();
|
||||
assert_eq!(m.get(), i64::MIN);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn updates_when_larger() {
|
||||
let m = AtomicMax::new();
|
||||
assert!(m.set_to_greater(0));
|
||||
assert_eq!(m.get(), 0);
|
||||
assert!(m.set_to_greater(100));
|
||||
assert_eq!(m.get(), 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_false_when_equal() {
|
||||
let m = AtomicMax::new();
|
||||
m.set_to_greater(42);
|
||||
assert!(!m.set_to_greater(42));
|
||||
assert_eq!(m.get(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_false_when_smaller() {
|
||||
let m = AtomicMax::new();
|
||||
m.set_to_greater(100);
|
||||
assert!(!m.set_to_greater(50));
|
||||
assert_eq!(m.get(), 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn concurrent_convergence() {
|
||||
let m = Arc::new(AtomicMax::new());
|
||||
let threads: Vec<_> = (0..8)
|
||||
.map(|t| {
|
||||
let m = Arc::clone(&m);
|
||||
std::thread::spawn(move || {
|
||||
for i in (t * 100)..((t + 1) * 100) {
|
||||
m.set_to_greater(i);
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
assert_eq!(m.get(), 799);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn i64_max_boundary() {
|
||||
let m = AtomicMax::new();
|
||||
assert!(m.set_to_greater(i64::MAX));
|
||||
assert!(!m.set_to_greater(i64::MAX));
|
||||
assert!(!m.set_to_greater(0));
|
||||
assert_eq!(m.get(), i64::MAX);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user