forked from wrenn/wrenn
Merge pull request 'fix: resolve large operation reliability — stream hangs, pause races, and memory bloat' (#43) from fix/large-operations into dev
Reviewed-on: wrenn/wrenn#43
This commit is contained in:
8
Makefile
8
Makefile
@ -27,8 +27,12 @@ build-agent:
|
|||||||
build-envd:
|
build-envd:
|
||||||
cd envd-rs && ENVD_COMMIT=$(COMMIT) cargo build --release --target x86_64-unknown-linux-musl
|
cd envd-rs && ENVD_COMMIT=$(COMMIT) cargo build --release --target x86_64-unknown-linux-musl
|
||||||
@cp envd-rs/target/x86_64-unknown-linux-musl/release/envd $(BIN_DIR)/envd
|
@cp envd-rs/target/x86_64-unknown-linux-musl/release/envd $(BIN_DIR)/envd
|
||||||
@file $(BIN_DIR)/envd | grep -q "static-pie linked" || \
|
@readelf -h $(BIN_DIR)/envd | grep -q 'Type:.*DYN' && \
|
||||||
(echo "ERROR: envd is not statically linked!" && exit 1)
|
readelf -d $(BIN_DIR)/envd | grep -q 'FLAGS_1.*PIE' && \
|
||||||
|
! readelf -d $(BIN_DIR)/envd | grep -q '(NEEDED)' && \
|
||||||
|
{ ! readelf -lW $(BIN_DIR)/envd | grep -q 'Requesting program interpreter' || \
|
||||||
|
readelf -lW $(BIN_DIR)/envd | grep -Fq '[Requesting program interpreter: /lib/ld-musl-x86_64.so.1]'; } || \
|
||||||
|
(echo "ERROR: envd must be PIE, have no DT_NEEDED shared libs, and either have no interpreter or use /lib/ld-musl-x86_64.so.1" && exit 1)
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════
|
||||||
# Development
|
# Development
|
||||||
|
|||||||
@ -1,21 +1,36 @@
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Defaults {
|
pub struct Defaults {
|
||||||
pub env_vars: Arc<DashMap<String, String>>,
|
pub env_vars: Arc<DashMap<String, String>>,
|
||||||
pub user: String,
|
user: RwLock<String>,
|
||||||
pub workdir: Option<String>,
|
workdir: RwLock<Option<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Defaults {
|
impl Defaults {
|
||||||
pub fn new(user: &str) -> Self {
|
pub fn new(user: &str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
env_vars: Arc::new(DashMap::new()),
|
env_vars: Arc::new(DashMap::new()),
|
||||||
user: user.to_string(),
|
user: RwLock::new(user.to_string()),
|
||||||
workdir: None,
|
workdir: RwLock::new(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn user(&self) -> String {
|
||||||
|
self.user.read().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_user(&self, user: String) {
|
||||||
|
*self.user.write().unwrap() = user;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn workdir(&self) -> Option<String> {
|
||||||
|
self.workdir.read().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_workdir(&self, workdir: Option<String>) {
|
||||||
|
*self.workdir.write().unwrap() = workdir;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn resolve_default_workdir(workdir: &str, default_workdir: Option<&str>) -> String {
|
pub fn resolve_default_workdir(workdir: &str, default_workdir: Option<&str>) -> String {
|
||||||
|
|||||||
@ -71,9 +71,10 @@ pub async fn get_files(
|
|||||||
let path_str = params.path.as_deref().unwrap_or("");
|
let path_str = params.path.as_deref().unwrap_or("");
|
||||||
let header_token = extract_header_token(&req);
|
let header_token = extract_header_token(&req);
|
||||||
|
|
||||||
|
let default_user = state.defaults.user();
|
||||||
let username = match execcontext::resolve_default_username(
|
let username = match execcontext::resolve_default_username(
|
||||||
params.username.as_deref(),
|
params.username.as_deref(),
|
||||||
&state.defaults.user,
|
&default_user,
|
||||||
) {
|
) {
|
||||||
Ok(u) => u.to_string(),
|
Ok(u) => u.to_string(),
|
||||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
||||||
@ -96,7 +97,8 @@ pub async fn get_files(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let home_dir = user.dir.to_string_lossy().to_string();
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let resolved = match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref())
|
let default_workdir = state.defaults.workdir();
|
||||||
|
let resolved = match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref())
|
||||||
{
|
{
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||||
@ -222,9 +224,10 @@ pub async fn post_files(
|
|||||||
let path_str = params.path.as_deref().unwrap_or("");
|
let path_str = params.path.as_deref().unwrap_or("");
|
||||||
let header_token = extract_header_token(&req);
|
let header_token = extract_header_token(&req);
|
||||||
|
|
||||||
|
let default_user = state.defaults.user();
|
||||||
let username = match execcontext::resolve_default_username(
|
let username = match execcontext::resolve_default_username(
|
||||||
params.username.as_deref(),
|
params.username.as_deref(),
|
||||||
&state.defaults.user,
|
&default_user,
|
||||||
) {
|
) {
|
||||||
Ok(u) => u.to_string(),
|
Ok(u) => u.to_string(),
|
||||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
Err(e) => return json_error(StatusCode::BAD_REQUEST, e),
|
||||||
@ -266,6 +269,7 @@ pub async fn post_files(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut uploaded: Vec<EntryInfo> = Vec::new();
|
let mut uploaded: Vec<EntryInfo> = Vec::new();
|
||||||
|
let default_workdir = state.defaults.workdir();
|
||||||
|
|
||||||
while let Ok(Some(field)) = multipart.next_field().await {
|
while let Ok(Some(field)) = multipart.next_field().await {
|
||||||
let field_name = field.name().unwrap_or("").to_string();
|
let field_name = field.name().unwrap_or("").to_string();
|
||||||
@ -274,7 +278,7 @@ pub async fn post_files(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let file_path = if !path_str.is_empty() {
|
let file_path = if !path_str.is_empty() {
|
||||||
match expand_and_resolve(path_str, &home_dir, state.defaults.workdir.as_deref()) {
|
match expand_and_resolve(path_str, &home_dir, default_workdir.as_deref()) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||||
}
|
}
|
||||||
@ -283,7 +287,7 @@ pub async fn post_files(
|
|||||||
.file_name()
|
.file_name()
|
||||||
.unwrap_or("upload")
|
.unwrap_or("upload")
|
||||||
.to_string();
|
.to_string();
|
||||||
match expand_and_resolve(&fname, &home_dir, state.defaults.workdir.as_deref()) {
|
match expand_and_resolve(&fname, &home_dir, default_workdir.as_deref()) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
Err(e) => return json_error(StatusCode::BAD_REQUEST, &e),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -78,11 +78,15 @@ pub async fn post_init(
|
|||||||
if let Some(ref user) = init_req.default_user {
|
if let Some(ref user) = init_req.default_user {
|
||||||
if !user.is_empty() {
|
if !user.is_empty() {
|
||||||
tracing::debug!(user = %user, "setting default user");
|
tracing::debug!(user = %user, "setting default user");
|
||||||
let mut defaults = state.defaults.clone();
|
state.defaults.set_user(user.clone());
|
||||||
defaults.user = user.clone();
|
}
|
||||||
// Note: In Rust we'd need interior mutability for this.
|
}
|
||||||
// For now, env_vars (DashMap) handles concurrent access.
|
|
||||||
// User/workdir mutation deferred to full state refactor.
|
// Set default workdir
|
||||||
|
if let Some(ref workdir) = init_req.default_workdir {
|
||||||
|
if !workdir.is_empty() {
|
||||||
|
tracing::debug!(workdir = %workdir, "setting default workdir");
|
||||||
|
state.defaults.set_workdir(Some(workdir.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -196,7 +196,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
|||||||
use crate::rpc::process_handler;
|
use crate::rpc::process_handler;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
let user = match lookup_user(&state.defaults.user) {
|
let default_user = state.defaults.user();
|
||||||
|
let user = match lookup_user(&default_user) {
|
||||||
Ok(u) => u,
|
Ok(u) => u,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(error = %e, "cmd: failed to lookup user");
|
tracing::error!(error = %e, "cmd: failed to lookup user");
|
||||||
@ -205,9 +206,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let home = user.dir.to_string_lossy().to_string();
|
let home = user.dir.to_string_lossy().to_string();
|
||||||
let cwd = state
|
let default_workdir = state.defaults.workdir();
|
||||||
.defaults
|
let cwd = default_workdir
|
||||||
.workdir
|
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.unwrap_or(&home);
|
.unwrap_or(&home);
|
||||||
|
|
||||||
@ -222,8 +222,8 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
|||||||
&user,
|
&user,
|
||||||
&state.defaults.env_vars,
|
&state.defaults.env_vars,
|
||||||
) {
|
) {
|
||||||
Ok(handle) => {
|
Ok(spawned) => {
|
||||||
tracing::info!(pid = handle.pid, cmd, "initial command spawned");
|
tracing::info!(pid = spawned.handle.pid, cmd, "initial command spawned");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(error = %e, cmd, "failed to spawn initial command");
|
tracing::error!(error = %e, cmd, "failed to spawn initial command");
|
||||||
|
|||||||
@ -31,15 +31,15 @@ impl FilesystemServiceImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn resolve_path(&self, path: &str, ctx: &Context) -> Result<String, ConnectError> {
|
fn resolve_path(&self, path: &str, ctx: &Context) -> Result<String, ConnectError> {
|
||||||
let username = extract_username(ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
let username = extract_username(ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||||
let user = lookup_user(&username).map_err(|e| {
|
let user = lookup_user(&username).map_err(|e| {
|
||||||
ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}"))
|
ConnectError::new(ErrorCode::Unauthenticated, format!("invalid user: {e}"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let home_dir = user.dir.to_string_lossy().to_string();
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let default_workdir = self.state.defaults.workdir.as_deref();
|
let default_workdir = self.state.defaults.workdir();
|
||||||
|
|
||||||
expand_and_resolve(path, &home_dir, default_workdir)
|
expand_and_resolve(path, &home_dir, default_workdir.as_deref())
|
||||||
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))
|
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ impl Filesystem for FilesystemServiceImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||||
let user =
|
let user =
|
||||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||||
|
|
||||||
@ -122,7 +122,7 @@ impl Filesystem for FilesystemServiceImpl {
|
|||||||
let source = self.resolve_path(request.source, &ctx)?;
|
let source = self.resolve_path(request.source, &ctx)?;
|
||||||
let destination = self.resolve_path(request.destination, &ctx)?;
|
let destination = self.resolve_path(request.destination, &ctx)?;
|
||||||
|
|
||||||
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user.clone());
|
let username = extract_username(&ctx).unwrap_or_else(|| self.state.defaults.user());
|
||||||
let user =
|
let user =
|
||||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||||
|
|
||||||
|
|||||||
@ -37,6 +37,7 @@ pub struct ProcessHandle {
|
|||||||
|
|
||||||
data_tx: broadcast::Sender<DataEvent>,
|
data_tx: broadcast::Sender<DataEvent>,
|
||||||
end_tx: broadcast::Sender<EndEvent>,
|
end_tx: broadcast::Sender<EndEvent>,
|
||||||
|
ended: Mutex<Option<EndEvent>>,
|
||||||
|
|
||||||
stdin: Mutex<Option<std::process::ChildStdin>>,
|
stdin: Mutex<Option<std::process::ChildStdin>>,
|
||||||
pty_master: Mutex<Option<std::fs::File>>,
|
pty_master: Mutex<Option<std::fs::File>>,
|
||||||
@ -51,6 +52,10 @@ impl ProcessHandle {
|
|||||||
self.end_tx.subscribe()
|
self.end_tx.subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cached_end(&self) -> Option<EndEvent> {
|
||||||
|
self.ended.lock().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> {
|
pub fn send_signal(&self, sig: Signal) -> Result<(), ConnectError> {
|
||||||
signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| {
|
signal::kill(Pid::from_raw(self.pid as i32), sig).map_err(|e| {
|
||||||
ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}"))
|
ConnectError::new(ErrorCode::Internal, format!("error sending signal: {e}"))
|
||||||
@ -128,6 +133,12 @@ impl ProcessHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct SpawnedProcess {
|
||||||
|
pub handle: Arc<ProcessHandle>,
|
||||||
|
pub data_rx: broadcast::Receiver<DataEvent>,
|
||||||
|
pub end_rx: broadcast::Receiver<EndEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn spawn_process(
|
pub fn spawn_process(
|
||||||
cmd_str: &str,
|
cmd_str: &str,
|
||||||
args: &[String],
|
args: &[String],
|
||||||
@ -138,7 +149,7 @@ pub fn spawn_process(
|
|||||||
tag: Option<String>,
|
tag: Option<String>,
|
||||||
user: &nix::unistd::User,
|
user: &nix::unistd::User,
|
||||||
default_env_vars: &dashmap::DashMap<String, String>,
|
default_env_vars: &dashmap::DashMap<String, String>,
|
||||||
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
) -> Result<SpawnedProcess, ConnectError> {
|
||||||
let mut env: Vec<(String, String)> = Vec::new();
|
let mut env: Vec<(String, String)> = Vec::new();
|
||||||
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
|
env.push(("PATH".into(), std::env::var("PATH").unwrap_or_default()));
|
||||||
let home = user.dir.to_string_lossy().to_string();
|
let home = user.dir.to_string_lossy().to_string();
|
||||||
@ -244,10 +255,14 @@ pub fn spawn_process(
|
|||||||
pid,
|
pid,
|
||||||
data_tx: data_tx.clone(),
|
data_tx: data_tx.clone(),
|
||||||
end_tx: end_tx.clone(),
|
end_tx: end_tx.clone(),
|
||||||
|
ended: Mutex::new(None),
|
||||||
stdin: Mutex::new(None),
|
stdin: Mutex::new(None),
|
||||||
pty_master: Mutex::new(Some(master_file)),
|
pty_master: Mutex::new(Some(master_file)),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let data_rx = handle.subscribe_data();
|
||||||
|
let end_rx = handle.subscribe_end();
|
||||||
|
|
||||||
let data_tx_clone = data_tx.clone();
|
let data_tx_clone = data_tx.clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let mut master = master_clone;
|
let mut master = master_clone;
|
||||||
@ -264,30 +279,29 @@ pub fn spawn_process(
|
|||||||
});
|
});
|
||||||
|
|
||||||
let end_tx_clone = end_tx.clone();
|
let end_tx_clone = end_tx.clone();
|
||||||
|
let handle_for_waiter = Arc::clone(&handle);
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let mut child = child;
|
let mut child = child;
|
||||||
match child.wait() {
|
let end_event = match child.wait() {
|
||||||
Ok(s) => {
|
Ok(s) => EndEvent {
|
||||||
let _ = end_tx_clone.send(EndEvent {
|
|
||||||
exit_code: s.code().unwrap_or(-1),
|
exit_code: s.code().unwrap_or(-1),
|
||||||
exited: s.code().is_some(),
|
exited: s.code().is_some(),
|
||||||
status: format!("{s}"),
|
status: format!("{s}"),
|
||||||
error: None,
|
error: None,
|
||||||
});
|
},
|
||||||
}
|
Err(e) => EndEvent {
|
||||||
Err(e) => {
|
|
||||||
let _ = end_tx_clone.send(EndEvent {
|
|
||||||
exit_code: -1,
|
exit_code: -1,
|
||||||
exited: false,
|
exited: false,
|
||||||
status: "error".into(),
|
status: "error".into(),
|
||||||
error: Some(e.to_string()),
|
error: Some(e.to_string()),
|
||||||
});
|
},
|
||||||
}
|
};
|
||||||
}
|
*handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
|
||||||
|
let _ = end_tx_clone.send(end_event);
|
||||||
});
|
});
|
||||||
|
|
||||||
tracing::info!(pid, cmd = cmd_str, "process started (pty)");
|
tracing::info!(pid, cmd = cmd_str, "process started (pty)");
|
||||||
Ok(handle)
|
Ok(SpawnedProcess { handle, data_rx, end_rx })
|
||||||
} else {
|
} else {
|
||||||
let mut command = std::process::Command::new("/bin/sh");
|
let mut command = std::process::Command::new("/bin/sh");
|
||||||
command
|
command
|
||||||
@ -327,10 +341,14 @@ pub fn spawn_process(
|
|||||||
pid,
|
pid,
|
||||||
data_tx: data_tx.clone(),
|
data_tx: data_tx.clone(),
|
||||||
end_tx: end_tx.clone(),
|
end_tx: end_tx.clone(),
|
||||||
|
ended: Mutex::new(None),
|
||||||
stdin: Mutex::new(stdin),
|
stdin: Mutex::new(stdin),
|
||||||
pty_master: Mutex::new(None),
|
pty_master: Mutex::new(None),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let data_rx = handle.subscribe_data();
|
||||||
|
let end_rx = handle.subscribe_end();
|
||||||
|
|
||||||
if let Some(mut out) = stdout {
|
if let Some(mut out) = stdout {
|
||||||
let tx = data_tx.clone();
|
let tx = data_tx.clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
@ -364,29 +382,28 @@ pub fn spawn_process(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let end_tx_clone = end_tx.clone();
|
let end_tx_clone = end_tx.clone();
|
||||||
|
let handle_for_waiter = Arc::clone(&handle);
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
match child.wait() {
|
let end_event = match child.wait() {
|
||||||
Ok(s) => {
|
Ok(s) => EndEvent {
|
||||||
let _ = end_tx_clone.send(EndEvent {
|
|
||||||
exit_code: s.code().unwrap_or(-1),
|
exit_code: s.code().unwrap_or(-1),
|
||||||
exited: s.code().is_some(),
|
exited: s.code().is_some(),
|
||||||
status: format!("{s}"),
|
status: format!("{s}"),
|
||||||
error: None,
|
error: None,
|
||||||
});
|
},
|
||||||
}
|
Err(e) => EndEvent {
|
||||||
Err(e) => {
|
|
||||||
let _ = end_tx_clone.send(EndEvent {
|
|
||||||
exit_code: -1,
|
exit_code: -1,
|
||||||
exited: false,
|
exited: false,
|
||||||
status: "error".into(),
|
status: "error".into(),
|
||||||
error: Some(e.to_string()),
|
error: Some(e.to_string()),
|
||||||
});
|
},
|
||||||
}
|
};
|
||||||
}
|
*handle_for_waiter.ended.lock().unwrap() = Some(end_event.clone());
|
||||||
|
let _ = end_tx_clone.send(end_event);
|
||||||
});
|
});
|
||||||
|
|
||||||
tracing::info!(pid, cmd = cmd_str, "process started (pipe)");
|
tracing::info!(pid, cmd = cmd_str, "process started (pipe)");
|
||||||
Ok(handle)
|
Ok(SpawnedProcess { handle, data_rx, end_rx })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -66,12 +66,12 @@ impl ProcessServiceImpl {
|
|||||||
fn spawn_from_request(
|
fn spawn_from_request(
|
||||||
&self,
|
&self,
|
||||||
request: &StartRequestView<'_>,
|
request: &StartRequestView<'_>,
|
||||||
) -> Result<Arc<ProcessHandle>, ConnectError> {
|
) -> Result<process_handler::SpawnedProcess, ConnectError> {
|
||||||
let proc_config = request.process.as_option().ok_or_else(|| {
|
let proc_config = request.process.as_option().ok_or_else(|| {
|
||||||
ConnectError::new(ErrorCode::InvalidArgument, "process config required")
|
ConnectError::new(ErrorCode::InvalidArgument, "process config required")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let username = self.state.defaults.user.clone();
|
let username = self.state.defaults.user();
|
||||||
let user =
|
let user =
|
||||||
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
lookup_user(&username).map_err(|e| ConnectError::new(ErrorCode::Internal, e))?;
|
||||||
|
|
||||||
@ -85,7 +85,8 @@ impl ProcessServiceImpl {
|
|||||||
|
|
||||||
let home_dir = user.dir.to_string_lossy().to_string();
|
let home_dir = user.dir.to_string_lossy().to_string();
|
||||||
let cwd_str: &str = proc_config.cwd.unwrap_or("");
|
let cwd_str: &str = proc_config.cwd.unwrap_or("");
|
||||||
let cwd = expand_and_resolve(cwd_str, &home_dir, self.state.defaults.workdir.as_deref())
|
let default_workdir = self.state.defaults.workdir();
|
||||||
|
let cwd = expand_and_resolve(cwd_str, &home_dir, default_workdir.as_deref())
|
||||||
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?;
|
.map_err(|e| ConnectError::new(ErrorCode::InvalidArgument, e))?;
|
||||||
|
|
||||||
let effective_cwd = if cwd.is_empty() { "/" } else { &cwd };
|
let effective_cwd = if cwd.is_empty() { "/" } else { &cwd };
|
||||||
@ -116,7 +117,7 @@ impl ProcessServiceImpl {
|
|||||||
"process.Start request"
|
"process.Start request"
|
||||||
);
|
);
|
||||||
|
|
||||||
let handle = process_handler::spawn_process(
|
let spawned = process_handler::spawn_process(
|
||||||
cmd,
|
cmd,
|
||||||
&args,
|
&args,
|
||||||
&envs,
|
&envs,
|
||||||
@ -128,17 +129,17 @@ impl ProcessServiceImpl {
|
|||||||
&self.state.defaults.env_vars,
|
&self.state.defaults.env_vars,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
self.processes.insert(handle.pid, Arc::clone(&handle));
|
self.processes.insert(spawned.handle.pid, Arc::clone(&spawned.handle));
|
||||||
|
|
||||||
let processes = self.processes.clone();
|
let processes = self.processes.clone();
|
||||||
let pid = handle.pid;
|
let pid = spawned.handle.pid;
|
||||||
let mut end_rx = handle.subscribe_end();
|
let mut cleanup_end_rx = spawned.handle.subscribe_end();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _ = end_rx.recv().await;
|
let _ = cleanup_end_rx.recv().await;
|
||||||
processes.remove(&pid);
|
processes.remove(&pid);
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(handle)
|
Ok(spawned)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,26 +183,36 @@ impl Process for ProcessServiceImpl {
|
|||||||
),
|
),
|
||||||
ConnectError,
|
ConnectError,
|
||||||
> {
|
> {
|
||||||
let handle = self.spawn_from_request(&request)?;
|
let spawned = self.spawn_from_request(&request)?;
|
||||||
let pid = handle.pid;
|
let pid = spawned.handle.pid;
|
||||||
|
|
||||||
let mut data_rx = handle.subscribe_data();
|
let mut data_rx = spawned.data_rx;
|
||||||
let mut end_rx = handle.subscribe_end();
|
let mut end_rx = spawned.end_rx;
|
||||||
|
|
||||||
let stream = async_stream::stream! {
|
let stream = async_stream::stream! {
|
||||||
yield Ok(make_start_response(pid));
|
yield Ok(make_start_response(pid));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match data_rx.recv().await {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
data = data_rx.recv() => {
|
||||||
|
match data {
|
||||||
Ok(ev) => yield Ok(make_data_start_response(ev)),
|
Ok(ev) => yield Ok(make_data_start_response(ev)),
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
end = end_rx.recv() => {
|
||||||
if let Ok(end) = end_rx.recv().await {
|
while let Ok(ev) = data_rx.try_recv() {
|
||||||
|
yield Ok(make_data_start_response(ev));
|
||||||
|
}
|
||||||
|
if let Ok(end) = end {
|
||||||
yield Ok(make_end_start_response(end));
|
yield Ok(make_end_start_response(end));
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((Box::pin(stream), ctx))
|
Ok((Box::pin(stream), ctx))
|
||||||
@ -226,6 +237,7 @@ impl Process for ProcessServiceImpl {
|
|||||||
|
|
||||||
let mut data_rx = handle.subscribe_data();
|
let mut data_rx = handle.subscribe_data();
|
||||||
let mut end_rx = handle.subscribe_end();
|
let mut end_rx = handle.subscribe_end();
|
||||||
|
let cached_end = handle.cached_end();
|
||||||
|
|
||||||
let stream = async_stream::stream! {
|
let stream = async_stream::stream! {
|
||||||
yield Ok(ConnectResponse {
|
yield Ok(ConnectResponse {
|
||||||
@ -238,8 +250,17 @@ impl Process for ProcessServiceImpl {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if let Some(end) = cached_end {
|
||||||
|
yield Ok(ConnectResponse {
|
||||||
|
event: buffa::MessageField::some(make_end_event(end)),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
} else {
|
||||||
loop {
|
loop {
|
||||||
match data_rx.recv().await {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
data = data_rx.recv() => {
|
||||||
|
match data {
|
||||||
Ok(ev) => {
|
Ok(ev) => {
|
||||||
yield Ok(ConnectResponse {
|
yield Ok(ConnectResponse {
|
||||||
event: buffa::MessageField::some(make_data_event(ev)),
|
event: buffa::MessageField::some(make_data_event(ev)),
|
||||||
@ -250,13 +271,24 @@ impl Process for ProcessServiceImpl {
|
|||||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
end = end_rx.recv() => {
|
||||||
if let Ok(end) = end_rx.recv().await {
|
while let Ok(ev) = data_rx.try_recv() {
|
||||||
|
yield Ok(ConnectResponse {
|
||||||
|
event: buffa::MessageField::some(make_data_event(ev)),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Ok(end) = end {
|
||||||
yield Ok(ConnectResponse {
|
yield Ok(ConnectResponse {
|
||||||
event: buffa::MessageField::some(make_end_event(end)),
|
event: buffa::MessageField::some(make_end_event(end)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((Box::pin(stream), ctx))
|
Ok((Box::pin(stream), ctx))
|
||||||
|
|||||||
@ -350,9 +350,23 @@ func runPtyLoop(
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for msg := range inputCh {
|
// pending holds a non-input message dequeued during coalescing
|
||||||
// Use a background context for unary RPCs so they complete
|
// that must be processed on the next iteration.
|
||||||
// even if the stream context is being cancelled.
|
var pending *wsPtyIn
|
||||||
|
|
||||||
|
for {
|
||||||
|
var msg wsPtyIn
|
||||||
|
if pending != nil {
|
||||||
|
msg = *pending
|
||||||
|
pending = nil
|
||||||
|
} else {
|
||||||
|
var ok bool
|
||||||
|
msg, ok = <-inputCh
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
|
||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
@ -364,7 +378,7 @@ func runPtyLoop(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Coalesce: drain any queued input messages into a single RPC.
|
// Coalesce: drain any queued input messages into a single RPC.
|
||||||
data = coalescePtyInput(inputCh, data)
|
data, pending = coalescePtyInput(inputCh, data)
|
||||||
|
|
||||||
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
|
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
|
||||||
SandboxId: sandboxID,
|
SandboxId: sandboxID,
|
||||||
@ -418,24 +432,29 @@ func runPtyLoop(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// When any pump cancels the context, close the websocket to unblock
|
||||||
|
// the reader goroutine stuck in ReadMessage.
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
ws.conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// coalescePtyInput drains any immediately-available "input" messages from the
|
// coalescePtyInput drains any immediately-available "input" messages from the
|
||||||
// channel and appends their decoded data to buf, reducing RPC call volume
|
// channel and appends their decoded data to buf, reducing RPC call volume
|
||||||
// during bursts of fast typing.
|
// during bursts of fast typing. Returns the coalesced buffer and any
|
||||||
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
|
// non-input message that was dequeued (must be processed by the caller).
|
||||||
|
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) ([]byte, *wsPtyIn) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg, ok := <-ch:
|
case msg, ok := <-ch:
|
||||||
if !ok {
|
if !ok {
|
||||||
return buf
|
return buf, nil
|
||||||
}
|
}
|
||||||
if msg.Type != "input" {
|
if msg.Type != "input" {
|
||||||
// Non-input message — can't coalesce. Put-back isn't possible
|
return buf, &msg
|
||||||
// with channels, but resize/kill during a typing burst is rare
|
|
||||||
// enough that dropping one is acceptable.
|
|
||||||
return buf
|
|
||||||
}
|
}
|
||||||
data, err := base64.StdEncoding.DecodeString(msg.Data)
|
data, err := base64.StdEncoding.DecodeString(msg.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -443,7 +462,7 @@ func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
|
|||||||
}
|
}
|
||||||
buf = append(buf, data...)
|
buf = append(buf, data...)
|
||||||
default:
|
default:
|
||||||
return buf
|
return buf, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -135,6 +135,20 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
defer tracker.Release()
|
defer tracker.Release()
|
||||||
|
|
||||||
|
// Derive request context from the tracker's context so ForceClose()
|
||||||
|
// during pause aborts this proxied request.
|
||||||
|
trackerCtx := tracker.Context()
|
||||||
|
reqCtx, reqCancel := context.WithCancel(r.Context())
|
||||||
|
defer reqCancel()
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-trackerCtx.Done():
|
||||||
|
reqCancel()
|
||||||
|
case <-reqCtx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
r = r.WithContext(reqCtx)
|
||||||
|
|
||||||
proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum))
|
proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum))
|
||||||
proxy.ServeHTTP(w, r)
|
proxy.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,6 +11,7 @@ type SandboxStatus string
|
|||||||
const (
|
const (
|
||||||
StatusPending SandboxStatus = "pending"
|
StatusPending SandboxStatus = "pending"
|
||||||
StatusRunning SandboxStatus = "running"
|
StatusRunning SandboxStatus = "running"
|
||||||
|
StatusPausing SandboxStatus = "pausing"
|
||||||
StatusPaused SandboxStatus = "paused"
|
StatusPaused SandboxStatus = "paused"
|
||||||
StatusStopped SandboxStatus = "stopped"
|
StatusStopped SandboxStatus = "stopped"
|
||||||
StatusError SandboxStatus = "error"
|
StatusError SandboxStatus = "error"
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package sandbox
|
package sandbox
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -17,6 +18,20 @@ type ConnTracker struct {
|
|||||||
// goroutine to exit, preventing goroutine leaks on repeated pause failures.
|
// goroutine to exit, preventing goroutine leaks on repeated pause failures.
|
||||||
cancelMu sync.Mutex
|
cancelMu sync.Mutex
|
||||||
cancelDrain chan struct{}
|
cancelDrain chan struct{}
|
||||||
|
|
||||||
|
// ctx is cancelled by ForceClose to abort all in-flight proxy requests.
|
||||||
|
// Initialized lazily on first Acquire; replaced by Reset after a failed
|
||||||
|
// pause so new connections get a fresh, non-cancelled context.
|
||||||
|
ctxMu sync.Mutex
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureCtx lazily initializes the cancellable context.
|
||||||
|
func (t *ConnTracker) ensureCtx() {
|
||||||
|
if t.ctx == nil {
|
||||||
|
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire registers one in-flight connection. Returns false if the tracker
|
// Acquire registers one in-flight connection. Returns false if the tracker
|
||||||
@ -35,6 +50,16 @@ func (t *ConnTracker) Acquire() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Context returns a context that is cancelled when ForceClose is called.
|
||||||
|
// Proxy handlers should derive their request context from this so that
|
||||||
|
// force-close during pause aborts in-flight proxied requests.
|
||||||
|
func (t *ConnTracker) Context() context.Context {
|
||||||
|
t.ctxMu.Lock()
|
||||||
|
defer t.ctxMu.Unlock()
|
||||||
|
t.ensureCtx()
|
||||||
|
return t.ctx
|
||||||
|
}
|
||||||
|
|
||||||
// Release marks one connection as complete. Must be called exactly once
|
// Release marks one connection as complete. Must be called exactly once
|
||||||
// per successful Acquire.
|
// per successful Acquire.
|
||||||
func (t *ConnTracker) Release() {
|
func (t *ConnTracker) Release() {
|
||||||
@ -65,9 +90,33 @@ func (t *ConnTracker) Drain(timeout time.Duration) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForceClose cancels all in-flight proxy connections by cancelling the
|
||||||
|
// shared context. Connections whose request context derives from Context()
|
||||||
|
// will see their requests aborted, causing the proxy handler to return
|
||||||
|
// and call Release(). Waits briefly for connections to actually release.
|
||||||
|
func (t *ConnTracker) ForceClose() {
|
||||||
|
t.ctxMu.Lock()
|
||||||
|
if t.cancel != nil {
|
||||||
|
t.cancel()
|
||||||
|
}
|
||||||
|
t.ctxMu.Unlock()
|
||||||
|
|
||||||
|
// Wait briefly for force-closed connections to call Release().
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
t.wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reset re-enables the tracker after a failed drain. This allows the
|
// Reset re-enables the tracker after a failed drain. This allows the
|
||||||
// sandbox to accept proxy connections again if the pause operation fails
|
// sandbox to accept proxy connections again if the pause operation fails
|
||||||
// and the VM is resumed. It also cancels any lingering Drain goroutine.
|
// and the VM is resumed. It also cancels any lingering Drain goroutine
|
||||||
|
// and creates a fresh context for new connections.
|
||||||
func (t *ConnTracker) Reset() {
|
func (t *ConnTracker) Reset() {
|
||||||
t.cancelMu.Lock()
|
t.cancelMu.Lock()
|
||||||
if t.cancelDrain != nil {
|
if t.cancelDrain != nil {
|
||||||
@ -81,5 +130,10 @@ func (t *ConnTracker) Reset() {
|
|||||||
}
|
}
|
||||||
t.cancelMu.Unlock()
|
t.cancelMu.Unlock()
|
||||||
|
|
||||||
|
// Replace the cancelled context with a fresh one.
|
||||||
|
t.ctxMu.Lock()
|
||||||
|
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||||
|
t.ctxMu.Unlock()
|
||||||
|
|
||||||
t.draining.Store(false)
|
t.draining.Store(false)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -186,9 +186,12 @@ func (m *Manager) Create(ctx context.Context, sandboxID string, teamID, template
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create dm-snapshot with per-sandbox CoW file.
|
// Create dm-snapshot with per-sandbox CoW file.
|
||||||
|
// CoW must be at least as large as the origin — if every block is
|
||||||
|
// rewritten, the CoW stores a full copy. Undersized CoW causes
|
||||||
|
// dm-snapshot invalidation → EIO on all guest I/O.
|
||||||
dmName := "wrenn-" + sandboxID
|
dmName := "wrenn-" + sandboxID
|
||||||
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
||||||
cowSize := int64(diskSizeMB) * 1024 * 1024
|
cowSize := max(int64(diskSizeMB)*1024*1024, originSize)
|
||||||
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.loops.Release(baseRootfs)
|
m.loops.Release(baseRootfs)
|
||||||
@ -374,28 +377,43 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
|||||||
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
|
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark sandbox as pausing to block new exec/file/PTY operations.
|
||||||
|
m.mu.Lock()
|
||||||
|
sb.Status = models.StatusPausing
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// restoreRunning reverts state if any pre-freeze step fails.
|
||||||
|
restoreRunning := func() {
|
||||||
|
_ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0)
|
||||||
|
sb.connTracker.Reset()
|
||||||
|
m.mu.Lock()
|
||||||
|
sb.Status = models.StatusRunning
|
||||||
|
m.mu.Unlock()
|
||||||
|
m.startSampler(sb)
|
||||||
|
}
|
||||||
|
|
||||||
// Stop the metrics sampler goroutine before tearing down any resources
|
// Stop the metrics sampler goroutine before tearing down any resources
|
||||||
// it reads (dm device, Firecracker PID). Without this, the sampler
|
// it reads (dm device, Firecracker PID). Without this, the sampler
|
||||||
// leaks on every successful pause.
|
// leaks on every successful pause.
|
||||||
m.stopSampler(sb)
|
m.stopSampler(sb)
|
||||||
|
|
||||||
// Step 0: Drain in-flight proxy connections before freezing vCPUs.
|
// ── Step 1: Isolate from external traffic ─────────────────────────
|
||||||
// Stale TCP state from mid-flight connections causes issues on restore.
|
// Drain in-flight proxy connections (grace period for clean shutdown).
|
||||||
sb.connTracker.Drain(2 * time.Second)
|
sb.connTracker.Drain(5 * time.Second)
|
||||||
slog.Debug("pause: proxy connections drained", "id", sandboxID)
|
// Force-close any connections that didn't finish during grace period.
|
||||||
|
sb.connTracker.ForceClose()
|
||||||
|
slog.Debug("pause: external connections closed", "id", sandboxID)
|
||||||
|
|
||||||
// Step 0b: Close host-side idle connections to envd. Done before
|
// Close host-side idle connections to envd so FIN packets propagate
|
||||||
// PrepareSnapshot so FIN packets propagate to the guest during the
|
// to the guest kernel before snapshot.
|
||||||
// PrepareSnapshot window (no extra sleep needed).
|
|
||||||
sb.client.CloseIdleConnections()
|
sb.client.CloseIdleConnections()
|
||||||
slog.Debug("pause: envd client idle connections closed", "id", sandboxID)
|
|
||||||
|
|
||||||
// Step 0c: Signal envd to quiesce (stop port scanner/forwarder, mark
|
// ── Step 2: Drop page cache ──────────────────────────────────────
|
||||||
// connections for post-restore cleanup). The 3s timeout also gives time
|
// Signal envd to quiesce: drops page cache, stops port subsystem,
|
||||||
// for the FINs from Step 0b to be processed by the guest kernel.
|
// marks connections for post-restore cleanup. Page cache drop can
|
||||||
// Best-effort: a failure is logged but does not abort the pause.
|
// take significant time on large-memory VMs (20GB+).
|
||||||
func() {
|
func() {
|
||||||
prepCtx, prepCancel := context.WithTimeout(ctx, 3*time.Second)
|
prepCtx, prepCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
defer prepCancel()
|
defer prepCancel()
|
||||||
if err := sb.client.PrepareSnapshot(prepCtx); err != nil {
|
if err := sb.client.PrepareSnapshot(prepCtx); err != nil {
|
||||||
slog.Warn("pause: pre-snapshot quiesce failed (best-effort)", "id", sandboxID, "error", err)
|
slog.Warn("pause: pre-snapshot quiesce failed (best-effort)", "id", sandboxID, "error", err)
|
||||||
@ -404,11 +422,37 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// ── Step 3: Inflate balloon to reclaim free guest memory ─────────
|
||||||
|
// Freed pages become zero from FC's perspective, so ProcessMemfile
|
||||||
|
// skips them → dramatically smaller memfile (e.g. 20GB → 1GB).
|
||||||
|
func() {
|
||||||
|
memUsed, err := readEnvdMemUsed(sb.client)
|
||||||
|
if err != nil {
|
||||||
|
slog.Debug("pause: could not read guest memory, skipping balloon inflate", "id", sandboxID, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
usedMiB := int(memUsed / (1024 * 1024))
|
||||||
|
keepMiB := max(usedMiB*2, 256) + 128
|
||||||
|
inflateMiB := sb.MemoryMB - keepMiB
|
||||||
|
if inflateMiB <= 0 {
|
||||||
|
slog.Debug("pause: not enough free memory for balloon inflate", "id", sandboxID, "used_mib", usedMiB, "total_mib", sb.MemoryMB)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
balloonCtx, balloonCancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer balloonCancel()
|
||||||
|
if err := m.vm.UpdateBalloon(balloonCtx, sandboxID, inflateMiB); err != nil {
|
||||||
|
slog.Debug("pause: balloon inflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
slog.Info("pause: balloon inflated", "id", sandboxID, "inflate_mib", inflateMiB, "guest_used_mib", usedMiB)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// ── Step 4: Freeze vCPUs ─────────────────────────────────────────
|
||||||
pauseStart := time.Now()
|
pauseStart := time.Now()
|
||||||
|
|
||||||
// Step 1: Pause the VM (freeze vCPUs).
|
|
||||||
if err := m.vm.Pause(ctx, sandboxID); err != nil {
|
if err := m.vm.Pause(ctx, sandboxID); err != nil {
|
||||||
sb.connTracker.Reset()
|
restoreRunning()
|
||||||
return fmt.Errorf("pause VM: %w", err)
|
return fmt.Errorf("pause VM: %w", err)
|
||||||
}
|
}
|
||||||
slog.Debug("pause: VM paused", "id", sandboxID, "elapsed", time.Since(pauseStart))
|
slog.Debug("pause: VM paused", "id", sandboxID, "elapsed", time.Since(pauseStart))
|
||||||
@ -423,13 +467,23 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
|||||||
|
|
||||||
// resumeOnError unpauses the VM so the sandbox stays usable when a
|
// resumeOnError unpauses the VM so the sandbox stays usable when a
|
||||||
// post-freeze step fails. If the resume itself fails, the sandbox is
|
// post-freeze step fails. If the resume itself fails, the sandbox is
|
||||||
// left frozen — the caller should destroy it. It also resets the
|
// frozen and unrecoverable — destroy it to avoid a zombie.
|
||||||
// connection tracker so the sandbox can accept proxy connections again.
|
|
||||||
resumeOnError := func() {
|
resumeOnError := func() {
|
||||||
sb.connTracker.Reset()
|
// Use a fresh context — the caller's ctx may already be cancelled.
|
||||||
if err := m.vm.Resume(ctx, sandboxID); err != nil {
|
resumeCtx, resumeCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
slog.Error("failed to resume VM after pause error — sandbox is frozen", "id", sandboxID, "error", err)
|
defer resumeCancel()
|
||||||
|
if err := m.vm.Resume(resumeCtx, sandboxID); err != nil {
|
||||||
|
slog.Error("failed to resume VM after pause error — destroying frozen sandbox", "id", sandboxID, "error", err)
|
||||||
|
m.cleanup(context.Background(), sb)
|
||||||
|
m.mu.Lock()
|
||||||
|
delete(m.boxes, sandboxID)
|
||||||
|
m.mu.Unlock()
|
||||||
|
if m.onDestroy != nil {
|
||||||
|
m.onDestroy(sandboxID)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
restoreRunning()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 2: Take VM state snapshot (snapfile + memfile).
|
// Step 2: Take VM state snapshot (snapfile + memfile).
|
||||||
@ -444,6 +498,7 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
|||||||
|
|
||||||
snapshotStart := time.Now()
|
snapshotStart := time.Now()
|
||||||
if err := m.vm.Snapshot(ctx, sandboxID, snapPath, rawMemPath, snapshotType); err != nil {
|
if err := m.vm.Snapshot(ctx, sandboxID, snapPath, rawMemPath, snapshotType); err != nil {
|
||||||
|
slog.Error("pause: snapshot failed", "id", sandboxID, "type", snapshotType, "elapsed", time.Since(snapshotStart), "error", err)
|
||||||
warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir))
|
warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir))
|
||||||
resumeOnError()
|
resumeOnError()
|
||||||
return fmt.Errorf("create VM snapshot: %w", err)
|
return fmt.Errorf("create VM snapshot: %w", err)
|
||||||
@ -795,6 +850,12 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
|
|||||||
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deflate balloon — the snapshot was taken with an inflated balloon to
|
||||||
|
// reduce memfile size, so restore the guest's full memory allocation.
|
||||||
|
if err := m.vm.UpdateBalloon(ctx, sandboxID, 0); err != nil {
|
||||||
|
slog.Debug("resume: balloon deflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch envd version (best-effort).
|
// Fetch envd version (best-effort).
|
||||||
envdVersion, _ := client.FetchVersion(ctx)
|
envdVersion, _ := client.FetchVersion(ctx)
|
||||||
|
|
||||||
@ -1134,7 +1195,7 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
|
|||||||
|
|
||||||
dmName := "wrenn-" + sandboxID
|
dmName := "wrenn-" + sandboxID
|
||||||
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
||||||
cowSize := int64(diskSizeMB) * 1024 * 1024
|
cowSize := max(int64(diskSizeMB)*1024*1024, originSize)
|
||||||
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
source.Close()
|
source.Close()
|
||||||
@ -1235,6 +1296,11 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
|
|||||||
slog.Warn("post-init failed after template restore, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
slog.Warn("post-init failed after template restore, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deflate balloon — template snapshot was taken with an inflated balloon.
|
||||||
|
if err := m.vm.UpdateBalloon(ctx, sandboxID, 0); err != nil {
|
||||||
|
slog.Debug("create-from-snapshot: balloon deflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch envd version (best-effort).
|
// Fetch envd version (best-effort).
|
||||||
envdVersion, _ := client.FetchVersion(ctx)
|
envdVersion, _ := client.FetchVersion(ctx)
|
||||||
|
|
||||||
|
|||||||
@ -29,6 +29,10 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
UFFD_EVENT_PAGEFAULT = C.UFFD_EVENT_PAGEFAULT
|
UFFD_EVENT_PAGEFAULT = C.UFFD_EVENT_PAGEFAULT
|
||||||
|
UFFD_EVENT_FORK = C.UFFD_EVENT_FORK
|
||||||
|
UFFD_EVENT_REMAP = C.UFFD_EVENT_REMAP
|
||||||
|
UFFD_EVENT_REMOVE = C.UFFD_EVENT_REMOVE
|
||||||
|
UFFD_EVENT_UNMAP = C.UFFD_EVENT_UNMAP
|
||||||
UFFD_PAGEFAULT_FLAG_WRITE = C.UFFD_PAGEFAULT_FLAG_WRITE
|
UFFD_PAGEFAULT_FLAG_WRITE = C.UFFD_PAGEFAULT_FLAG_WRITE
|
||||||
UFFDIO_COPY = C.UFFDIO_COPY
|
UFFDIO_COPY = C.UFFDIO_COPY
|
||||||
UFFDIO_COPY_MODE_WP = C.UFFDIO_COPY_MODE_WP
|
UFFDIO_COPY_MODE_WP = C.UFFDIO_COPY_MODE_WP
|
||||||
|
|||||||
@ -253,8 +253,17 @@ func (s *Server) serve(ctx context.Context, uffdFd fd, mapping *Mapping) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
msg := *(*uffdMsg)(unsafe.Pointer(&buf[0]))
|
msg := *(*uffdMsg)(unsafe.Pointer(&buf[0]))
|
||||||
if getMsgEvent(&msg) != UFFD_EVENT_PAGEFAULT {
|
event := getMsgEvent(&msg)
|
||||||
return fmt.Errorf("unexpected uffd event type: %d", getMsgEvent(&msg))
|
|
||||||
|
switch event {
|
||||||
|
case UFFD_EVENT_PAGEFAULT:
|
||||||
|
// Handled below.
|
||||||
|
case UFFD_EVENT_REMOVE, UFFD_EVENT_UNMAP, UFFD_EVENT_REMAP, UFFD_EVENT_FORK:
|
||||||
|
// Non-fatal lifecycle events from the guest kernel (e.g. balloon
|
||||||
|
// deflation, mmap/munmap). No action needed — continue polling.
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unexpected uffd event type: %d", event)
|
||||||
}
|
}
|
||||||
|
|
||||||
arg := getMsgArg(&msg)
|
arg := getMsgArg(&msg)
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// fcClient talks to the Firecracker HTTP API over a Unix socket.
|
// fcClient talks to the Firecracker HTTP API over a Unix socket.
|
||||||
@ -27,7 +26,9 @@ func newFCClient(socketPath string) *fcClient {
|
|||||||
return d.DialContext(ctx, "unix", socketPath)
|
return d.DialContext(ctx, "unix", socketPath)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Timeout: 10 * time.Second,
|
// No global timeout — callers pass context.Context with appropriate
|
||||||
|
// deadlines. A fixed 10s timeout was too short for snapshot/resume
|
||||||
|
// operations on large-memory VMs (20GB+ memfiles).
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -47,7 +47,7 @@ func NewHostClientPoolTLS(tlsCfg *tls.Config) *HostClientPool {
|
|||||||
TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper),
|
TLSNextProto: make(map[string]func(authority string, c *tls.Conn) http.RoundTripper),
|
||||||
MaxIdleConnsPerHost: 20,
|
MaxIdleConnsPerHost: 20,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
IdleConnTimeout: 90 * time.Second,
|
||||||
ResponseHeaderTimeout: 45 * time.Second,
|
ResponseHeaderTimeout: 5 * time.Minute,
|
||||||
DialContext: (&net.Dialer{
|
DialContext: (&net.Dialer{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
KeepAlive: 30 * time.Second,
|
KeepAlive: 30 * time.Second,
|
||||||
|
|||||||
@ -239,12 +239,26 @@ func (s *SandboxService) Pause(ctx context.Context, sandboxID, teamID pgtype.UUI
|
|||||||
if _, err := agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{
|
if _, err := agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{
|
||||||
SandboxId: sandboxIDStr,
|
SandboxId: sandboxIDStr,
|
||||||
})); err != nil {
|
})); err != nil {
|
||||||
// Revert status on failure.
|
// Check if the agent still has this sandbox. If it was destroyed
|
||||||
if _, dbErr := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
|
// (e.g. frozen VM couldn't be resumed), mark as "error" instead of
|
||||||
ID: sandboxID, Status: "running",
|
// reverting to "running" — which would create a ghost record.
|
||||||
|
// Use a fresh context since the original ctx may already be expired.
|
||||||
|
revertStatus := "running"
|
||||||
|
pingCtx, pingCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
if _, pingErr := agent.PingSandbox(pingCtx, connect.NewRequest(&pb.PingSandboxRequest{
|
||||||
|
SandboxId: sandboxIDStr,
|
||||||
|
})); pingErr != nil {
|
||||||
|
revertStatus = "error"
|
||||||
|
slog.Warn("sandbox gone from agent after failed pause, marking as error", "sandbox_id", sandboxIDStr)
|
||||||
|
}
|
||||||
|
pingCancel()
|
||||||
|
dbCtx, dbCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
if _, dbErr := s.DB.UpdateSandboxStatus(dbCtx, db.UpdateSandboxStatusParams{
|
||||||
|
ID: sandboxID, Status: revertStatus,
|
||||||
}); dbErr != nil {
|
}); dbErr != nil {
|
||||||
slog.Warn("failed to revert sandbox status after pause error", "sandbox_id", sandboxIDStr, "error", dbErr)
|
slog.Warn("failed to revert sandbox status after pause error", "sandbox_id", sandboxIDStr, "error", dbErr)
|
||||||
}
|
}
|
||||||
|
dbCancel()
|
||||||
return db.Sandbox{}, fmt.Errorf("agent pause: %w", err)
|
return db.Sandbox{}, fmt.Errorf("agent pause: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -57,7 +57,7 @@ if [ ! -f "${ENVD_BIN}" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if ! file "${ENVD_BIN}" | grep -q "statically linked"; then
|
if ! ldd "${ENVD_BIN}" | grep -q "statically linked"; then
|
||||||
echo "ERROR: envd is not statically linked!"
|
echo "ERROR: envd is not statically linked!"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|||||||
@ -17,7 +17,8 @@ set -euo pipefail
|
|||||||
|
|
||||||
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||||
PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||||
ROOTFS="${1:-/var/lib/wrenn/images/minimal/rootfs.ext4}"
|
WRENN_DIR="${WRENN_DIR:-/var/lib/wrenn}"
|
||||||
|
ROOTFS="${1:-${WRENN_DIR}/images/minimal/rootfs.ext4}"
|
||||||
MOUNT_DIR="/tmp/wrenn-rootfs-update"
|
MOUNT_DIR="/tmp/wrenn-rootfs-update"
|
||||||
|
|
||||||
if [ ! -f "${ROOTFS}" ]; then
|
if [ ! -f "${ROOTFS}" ]; then
|
||||||
@ -36,6 +37,11 @@ if [ ! -f "${ENVD_BIN}" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if ! ldd "${ENVD_BIN}" | grep -q "statically linked"; then
|
||||||
|
echo "ERROR: envd is not statically linked!"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
# Step 2: Mount the rootfs.
|
# Step 2: Mount the rootfs.
|
||||||
echo "==> Mounting rootfs at ${MOUNT_DIR}..."
|
echo "==> Mounting rootfs at ${MOUNT_DIR}..."
|
||||||
mkdir -p "${MOUNT_DIR}"
|
mkdir -p "${MOUNT_DIR}"
|
||||||
|
|||||||
Reference in New Issue
Block a user