diff --git a/envd-rs/src/http/health.rs b/envd-rs/src/http/health.rs index 5eb2da3..39d61c9 100644 --- a/envd-rs/src/http/health.rs +++ b/envd-rs/src/http/health.rs @@ -29,6 +29,8 @@ pub async fn get_health(State(state): State>) -> impl IntoResponse fn post_restore_recovery(state: &AppState) { tracing::info!("restore: post-restore recovery (no GC needed in Rust)"); + state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release); + state.conn_tracker.restore_after_snapshot(); tracing::info!("restore: zombie connections closed"); diff --git a/envd-rs/src/http/init.rs b/envd-rs/src/http/init.rs index ed2baa2..bc78c8c 100644 --- a/envd-rs/src/http/init.rs +++ b/envd-rs/src/http/init.rs @@ -147,6 +147,9 @@ async fn trigger_restore_and_respond(state: &AppState) -> axum::response::Respon fn post_restore_recovery(state: &AppState) { tracing::info!("restore: post-restore recovery (no GC needed in Rust)"); + + state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release); + state.conn_tracker.restore_after_snapshot(); if let Some(ref ps) = state.port_subsystem { diff --git a/envd-rs/src/http/metrics.rs b/envd-rs/src/http/metrics.rs index da13452..79f3027 100644 --- a/envd-rs/src/http/metrics.rs +++ b/envd-rs/src/http/metrics.rs @@ -46,7 +46,8 @@ fn collect_metrics(state: &AppState) -> Result { let mut sys = sysinfo::System::new(); sys.refresh_memory(); let mem_total = sys.total_memory(); - let mem_used = sys.used_memory(); + let mem_available = sys.available_memory(); + let mem_used = mem_total.saturating_sub(mem_available); let mem_total_mib = mem_total / 1024 / 1024; let mem_used_mib = mem_used / 1024 / 1024; diff --git a/envd-rs/src/http/snapshot.rs b/envd-rs/src/http/snapshot.rs index a0312f0..977b6bd 100644 --- a/envd-rs/src/http/snapshot.rs +++ b/envd-rs/src/http/snapshot.rs @@ -14,6 +14,10 @@ use crate::state::AppState; /// 2. Close idle connections via conntracker /// 3. Set needs_restore flag pub async fn post_snapshot_prepare(State(state): State>) -> impl IntoResponse { + // Block memory reclaimer before anything else — prevents drop_caches + // from running mid-freeze which would corrupt kernel page table state. + state.snapshot_in_progress.store(true, Ordering::Release); + if let Some(ref ps) = state.port_subsystem { ps.stop(); tracing::info!("snapshot/prepare: port subsystem stopped"); @@ -22,6 +26,9 @@ pub async fn post_snapshot_prepare(State(state): State>) -> impl I state.conn_tracker.prepare_for_snapshot(); tracing::info!("snapshot/prepare: connections prepared"); + // Sync filesystem buffers so dirty pages are flushed before freeze. + unsafe { libc::sync(); } + state.needs_restore.store(true, Ordering::Release); tracing::info!("snapshot/prepare: ready for freeze"); diff --git a/envd-rs/src/main.rs b/envd-rs/src/main.rs index 587fc1a..3176a28 100644 --- a/envd-rs/src/main.rs +++ b/envd-rs/src/main.rs @@ -147,6 +147,14 @@ async fn main() { Some(Arc::clone(&port_subsystem)), ); + // Memory reclaimer — drop page cache when available memory is low. + // Firecracker balloon device can only reclaim pages the guest kernel freed. + // Pauses during snapshot/prepare to avoid corrupting kernel page table state. + if !cli.is_not_fc { + let state_for_reclaimer = Arc::clone(&state); + std::thread::spawn(move || memory_reclaimer(state_for_reclaimer)); + } + // RPC services (Connect protocol — serves Connect + gRPC + gRPC-Web on same port) let connect_router = rpc::rpc_router(Arc::clone(&state)); @@ -222,3 +230,44 @@ fn spawn_initial_command(cmd: &str, state: &AppState) { } } } + +fn memory_reclaimer(state: Arc) { + use std::sync::atomic::Ordering; + + const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + const DROP_THRESHOLD_PCT: u64 = 80; + + loop { + std::thread::sleep(CHECK_INTERVAL); + + if state.snapshot_in_progress.load(Ordering::Acquire) { + continue; + } + + let mut sys = sysinfo::System::new(); + sys.refresh_memory(); + let total = sys.total_memory(); + let available = sys.available_memory(); + + if total == 0 { + continue; + } + + let used_pct = ((total - available) * 100) / total; + if used_pct >= DROP_THRESHOLD_PCT { + if state.snapshot_in_progress.load(Ordering::Acquire) { + continue; + } + + if let Err(e) = std::fs::write("/proc/sys/vm/drop_caches", "3") { + tracing::debug!(error = %e, "drop_caches failed"); + } else { + let mut sys2 = sysinfo::System::new(); + sys2.refresh_memory(); + let freed_mb = + sys2.available_memory().saturating_sub(available) / (1024 * 1024); + tracing::info!(used_pct, freed_mb, "page cache dropped"); + } + } + } +} diff --git a/envd-rs/src/state.rs b/envd-rs/src/state.rs index aa1f4a2..33d170a 100644 --- a/envd-rs/src/state.rs +++ b/envd-rs/src/state.rs @@ -19,6 +19,7 @@ pub struct AppState { pub port_subsystem: Option>, pub cpu_used_pct: AtomicU32, pub cpu_count: AtomicU32, + pub snapshot_in_progress: AtomicBool, } impl AppState { @@ -41,6 +42,7 @@ impl AppState { port_subsystem, cpu_used_pct: AtomicU32::new(0), cpu_count: AtomicU32::new(0), + snapshot_in_progress: AtomicBool::new(false), }); let state_clone = Arc::clone(&state); diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 3f295e1..f87e0f7 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -95,10 +95,10 @@ type snapshotParent struct { } // maxDiffGenerations caps how many incremental diff generations we chain -// before falling back to a Full snapshot to collapse the chain. Long diff -// chains increase restore latency and snapshot directory size; a periodic -// Full snapshot resets the counter and produces a clean base. -const maxDiffGenerations = 8 +// before merging diffs into a single file. Since UFFD lazy-loads memory +// anyway, we merge on every re-pause to keep exactly 1 diff file per +// snapshot — no accumulated chain, no extra restore overhead. +const maxDiffGenerations = 1 // buildMetadata constructs the metadata map with version information. func (m *Manager) buildMetadata(envdVersion string) map[string]string { @@ -1720,12 +1720,12 @@ func (m *Manager) startSampler(sb *sandboxState) { go m.samplerLoop(ctx, sb, fcPID, sb.VCPUs, initialCPU) } -// samplerLoop samples /proc metrics at 500ms intervals. +// samplerLoop samples metrics at 1s intervals. // lastCPU is goroutine-local to avoid shared-state races. func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpus int, lastCPU cpuStat) { defer close(sb.samplerDone) - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() clkTck := 100.0 // sysconf(_SC_CLK_TCK), almost always 100 on Linux @@ -1758,8 +1758,11 @@ func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpu cpuInitialized = true } - // Memory: VmRSS of the Firecracker process. - memBytes, _ := readMemRSS(fcPID) + // Memory: guest-reported used memory from envd /metrics. + // VmRSS of the Firecracker process includes guest page cache + // and never decreases, so we use the guest's own view which + // reports total - available (actual process memory). + memBytes, _ := readEnvdMemUsed(sb.client) // Disk: allocated bytes of the CoW sparse file. var diskBytes int64 diff --git a/internal/sandbox/metrics.go b/internal/sandbox/metrics.go index f266cb2..296bd0e 100644 --- a/internal/sandbox/metrics.go +++ b/internal/sandbox/metrics.go @@ -15,11 +15,11 @@ type MetricPoint struct { // Ring buffer capacity constants. const ( - ring10mCap = 1200 // 500ms × 1200 = 10 min - ring2hCap = 240 // 30s × 240 = 2 h - ring24hCap = 288 // 5min × 288 = 24 h + ring10mCap = 600 // 1s × 600 = 10 min + ring2hCap = 240 // 30s × 240 = 2 h + ring24hCap = 288 // 5min × 288 = 24 h - downsample2hEvery = 60 // 60 × 500ms = 30s + downsample2hEvery = 30 // 30 × 1s = 30s downsample24hEvery = 10 // 10 × 30s = 5min ) @@ -44,8 +44,8 @@ type metricsRing struct { count24h int // Accumulators for downsampling. - acc500ms [downsample2hEvery]MetricPoint - acc500msN int + acc1s [downsample2hEvery]MetricPoint + acc1sN int acc30s [downsample24hEvery]MetricPoint acc30sN int @@ -56,7 +56,7 @@ func newMetricsRing() *metricsRing { return &metricsRing{} } -// Push adds a 500ms sample to the finest tier and triggers downsampling +// Push adds a 1s sample to the finest tier and triggers downsampling // into coarser tiers when enough samples have accumulated. func (r *metricsRing) Push(p MetricPoint) { r.mu.Lock() @@ -70,12 +70,12 @@ func (r *metricsRing) Push(p MetricPoint) { } // Accumulate for 2h downsample. - r.acc500ms[r.acc500msN] = p - r.acc500msN++ - if r.acc500msN == downsample2hEvery { - avg := averagePoints(r.acc500ms[:downsample2hEvery]) + r.acc1s[r.acc1sN] = p + r.acc1sN++ + if r.acc1sN == downsample2hEvery { + avg := averagePoints(r.acc1s[:downsample2hEvery]) r.push2h(avg) - r.acc500msN = 0 + r.acc1sN = 0 } } @@ -138,7 +138,7 @@ func (r *metricsRing) Flush() (pts10m, pts2h, pts24h []MetricPoint) { r.idx10m, r.count10m = 0, 0 r.idx2h, r.count2h = 0, 0 r.idx24h, r.count24h = 0, 0 - r.acc500msN = 0 + r.acc1sN = 0 r.acc30sN = 0 return pts10m, pts2h, pts24h diff --git a/internal/sandbox/proc.go b/internal/sandbox/proc.go index 855d3c1..ede22f0 100644 --- a/internal/sandbox/proc.go +++ b/internal/sandbox/proc.go @@ -1,11 +1,15 @@ package sandbox import ( + "encoding/json" "fmt" + "io" "os" "strconv" "strings" "syscall" + + "git.omukk.dev/wrenn/wrenn/internal/envdclient" ) // cpuStat holds raw CPU jiffies read from /proc/{pid}/stat. @@ -24,16 +28,11 @@ func readCPUStat(pid int) (cpuStat, error) { return cpuStat{}, fmt.Errorf("read stat: %w", err) } - // /proc/{pid}/stat format: pid (comm) state fields... - // The comm field may contain spaces and parens, so find the last ')' first. content := string(data) idx := strings.LastIndex(content, ")") if idx < 0 { return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: no closing paren", pid) } - // After ")" there is " state field3 field4 ... fieldN" - // field1 after ')' is state (index 0), utime is field 11, stime is field 12 - // (0-indexed from after the closing paren). fields := strings.Fields(content[idx+2:]) if len(fields) < 13 { return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: too few fields (%d)", pid, len(fields)) @@ -49,27 +48,34 @@ func readCPUStat(pid int) (cpuStat, error) { return cpuStat{utime: utime, stime: stime}, nil } -// readMemRSS reads VmRSS from /proc/{pid}/status and returns bytes. -func readMemRSS(pid int) (int64, error) { - path := fmt.Sprintf("/proc/%d/status", pid) - data, err := os.ReadFile(path) +// readEnvdMemUsed fetches mem_used from envd's /metrics endpoint. Returns +// guest-side total - MemAvailable (actual process memory, excluding reclaimable +// page cache). VmRSS of the Firecracker process includes guest page cache and +// never decreases, so this is the accurate metric for dashboard display. +func readEnvdMemUsed(client *envdclient.Client) (int64, error) { + resp, err := client.HTTPClient().Get(client.BaseURL() + "/metrics") if err != nil { - return 0, fmt.Errorf("read status: %w", err) + return 0, fmt.Errorf("fetch envd metrics: %w", err) } - for _, line := range strings.Split(string(data), "\n") { - if strings.HasPrefix(line, "VmRSS:") { - fields := strings.Fields(line) - if len(fields) < 2 { - return 0, fmt.Errorf("malformed VmRSS line") - } - kb, err := strconv.ParseInt(fields[1], 10, 64) - if err != nil { - return 0, fmt.Errorf("parse VmRSS: %w", err) - } - return kb * 1024, nil - } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return 0, fmt.Errorf("envd metrics: status %d", resp.StatusCode) } - return 0, fmt.Errorf("VmRSS not found in /proc/%d/status", pid) + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("read envd metrics body: %w", err) + } + + var m struct { + MemUsed int64 `json:"mem_used"` + } + if err := json.Unmarshal(body, &m); err != nil { + return 0, fmt.Errorf("decode envd metrics: %w", err) + } + + return m.MemUsed, nil } // readDiskAllocated returns the actual allocated bytes (not apparent size) diff --git a/internal/vm/fc.go b/internal/vm/fc.go index 5a131a4..e8f1ac3 100644 --- a/internal/vm/fc.go +++ b/internal/vm/fc.go @@ -136,6 +136,25 @@ func (c *fcClient) setMMDS(ctx context.Context, sandboxID, templateID string) er }) } +// setBalloon configures the Firecracker balloon device for dynamic memory +// management. deflateOnOom lets the guest reclaim balloon pages under memory +// pressure. statsInterval enables periodic stats via GET /balloon/statistics. +// Must be called before startVM. +func (c *fcClient) setBalloon(ctx context.Context, amountMiB int, deflateOnOom bool, statsIntervalS int) error { + return c.do(ctx, http.MethodPut, "/balloon", map[string]any{ + "amount_mib": amountMiB, + "deflate_on_oom": deflateOnOom, + "stats_polling_interval_s": statsIntervalS, + }) +} + +// updateBalloon adjusts the balloon target at runtime. +func (c *fcClient) updateBalloon(ctx context.Context, amountMiB int) error { + return c.do(ctx, http.MethodPatch, "/balloon", map[string]any{ + "amount_mib": amountMiB, + }) +} + // startVM issues the InstanceStart action. func (c *fcClient) startVM(ctx context.Context) error { return c.do(ctx, http.MethodPut, "/actions", map[string]string{ diff --git a/internal/vm/manager.go b/internal/vm/manager.go index 99dbfe3..3d55620 100644 --- a/internal/vm/manager.go +++ b/internal/vm/manager.go @@ -119,6 +119,13 @@ func configureVM(ctx context.Context, client *fcClient, cfg *VMConfig) error { return fmt.Errorf("set machine config: %w", err) } + // Balloon device — allows the host to reclaim unused guest memory. + // Start with 0 (no inflation). deflate_on_oom lets the guest reclaim + // balloon pages under memory pressure. Stats interval enables monitoring. + if err := client.setBalloon(ctx, 0, true, 5); err != nil { + slog.Warn("set balloon failed (non-fatal, VM will run without memory reclaim)", "error", err) + } + // MMDS config — enable V2 token access on eth0 so that envd can read // WRENN_SANDBOX_ID and WRENN_TEMPLATE_ID from inside the guest. if err := client.setMMDSConfig(ctx, "eth0"); err != nil { @@ -162,6 +169,19 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string) error { return nil } +// UpdateBalloon adjusts the balloon target for a running VM. +// amountMiB is memory to take FROM the guest (0 = give all back). +func (m *Manager) UpdateBalloon(ctx context.Context, sandboxID string, amountMiB int) error { + m.mu.RLock() + vm, ok := m.vms[sandboxID] + m.mu.RUnlock() + if !ok { + return fmt.Errorf("VM not found: %s", sandboxID) + } + + return vm.client.updateBalloon(ctx, amountMiB) +} + // Destroy stops and cleans up a VM. func (m *Manager) Destroy(ctx context.Context, sandboxID string) error { m.mu.Lock()