Merge pull request 'fix: accurate sandbox metrics and memory management' (#41) from bugfix/sandbox-metrics-calculations into dev
Reviewed-on: #41
This commit is contained in:
@ -29,6 +29,8 @@ pub async fn get_health(State(state): State<Arc<AppState>>) -> impl IntoResponse
|
||||
fn post_restore_recovery(state: &AppState) {
|
||||
tracing::info!("restore: post-restore recovery (no GC needed in Rust)");
|
||||
|
||||
state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release);
|
||||
|
||||
state.conn_tracker.restore_after_snapshot();
|
||||
tracing::info!("restore: zombie connections closed");
|
||||
|
||||
|
||||
@ -147,6 +147,9 @@ async fn trigger_restore_and_respond(state: &AppState) -> axum::response::Respon
|
||||
|
||||
fn post_restore_recovery(state: &AppState) {
|
||||
tracing::info!("restore: post-restore recovery (no GC needed in Rust)");
|
||||
|
||||
state.snapshot_in_progress.store(false, std::sync::atomic::Ordering::Release);
|
||||
|
||||
state.conn_tracker.restore_after_snapshot();
|
||||
|
||||
if let Some(ref ps) = state.port_subsystem {
|
||||
|
||||
@ -46,7 +46,8 @@ fn collect_metrics(state: &AppState) -> Result<Metrics, String> {
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_memory();
|
||||
let mem_total = sys.total_memory();
|
||||
let mem_used = sys.used_memory();
|
||||
let mem_available = sys.available_memory();
|
||||
let mem_used = mem_total.saturating_sub(mem_available);
|
||||
let mem_total_mib = mem_total / 1024 / 1024;
|
||||
let mem_used_mib = mem_used / 1024 / 1024;
|
||||
|
||||
|
||||
@ -14,6 +14,10 @@ use crate::state::AppState;
|
||||
/// 2. Close idle connections via conntracker
|
||||
/// 3. Set needs_restore flag
|
||||
pub async fn post_snapshot_prepare(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
// Block memory reclaimer before anything else — prevents drop_caches
|
||||
// from running mid-freeze which would corrupt kernel page table state.
|
||||
state.snapshot_in_progress.store(true, Ordering::Release);
|
||||
|
||||
if let Some(ref ps) = state.port_subsystem {
|
||||
ps.stop();
|
||||
tracing::info!("snapshot/prepare: port subsystem stopped");
|
||||
@ -22,6 +26,9 @@ pub async fn post_snapshot_prepare(State(state): State<Arc<AppState>>) -> impl I
|
||||
state.conn_tracker.prepare_for_snapshot();
|
||||
tracing::info!("snapshot/prepare: connections prepared");
|
||||
|
||||
// Sync filesystem buffers so dirty pages are flushed before freeze.
|
||||
unsafe { libc::sync(); }
|
||||
|
||||
state.needs_restore.store(true, Ordering::Release);
|
||||
tracing::info!("snapshot/prepare: ready for freeze");
|
||||
|
||||
|
||||
@ -147,6 +147,14 @@ async fn main() {
|
||||
Some(Arc::clone(&port_subsystem)),
|
||||
);
|
||||
|
||||
// Memory reclaimer — drop page cache when available memory is low.
|
||||
// Firecracker balloon device can only reclaim pages the guest kernel freed.
|
||||
// Pauses during snapshot/prepare to avoid corrupting kernel page table state.
|
||||
if !cli.is_not_fc {
|
||||
let state_for_reclaimer = Arc::clone(&state);
|
||||
std::thread::spawn(move || memory_reclaimer(state_for_reclaimer));
|
||||
}
|
||||
|
||||
// RPC services (Connect protocol — serves Connect + gRPC + gRPC-Web on same port)
|
||||
let connect_router = rpc::rpc_router(Arc::clone(&state));
|
||||
|
||||
@ -222,3 +230,44 @@ fn spawn_initial_command(cmd: &str, state: &AppState) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn memory_reclaimer(state: Arc<AppState>) {
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const DROP_THRESHOLD_PCT: u64 = 80;
|
||||
|
||||
loop {
|
||||
std::thread::sleep(CHECK_INTERVAL);
|
||||
|
||||
if state.snapshot_in_progress.load(Ordering::Acquire) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut sys = sysinfo::System::new();
|
||||
sys.refresh_memory();
|
||||
let total = sys.total_memory();
|
||||
let available = sys.available_memory();
|
||||
|
||||
if total == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let used_pct = ((total - available) * 100) / total;
|
||||
if used_pct >= DROP_THRESHOLD_PCT {
|
||||
if state.snapshot_in_progress.load(Ordering::Acquire) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = std::fs::write("/proc/sys/vm/drop_caches", "3") {
|
||||
tracing::debug!(error = %e, "drop_caches failed");
|
||||
} else {
|
||||
let mut sys2 = sysinfo::System::new();
|
||||
sys2.refresh_memory();
|
||||
let freed_mb =
|
||||
sys2.available_memory().saturating_sub(available) / (1024 * 1024);
|
||||
tracing::info!(used_pct, freed_mb, "page cache dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ pub struct AppState {
|
||||
pub port_subsystem: Option<Arc<PortSubsystem>>,
|
||||
pub cpu_used_pct: AtomicU32,
|
||||
pub cpu_count: AtomicU32,
|
||||
pub snapshot_in_progress: AtomicBool,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@ -41,6 +42,7 @@ impl AppState {
|
||||
port_subsystem,
|
||||
cpu_used_pct: AtomicU32::new(0),
|
||||
cpu_count: AtomicU32::new(0),
|
||||
snapshot_in_progress: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
let state_clone = Arc::clone(&state);
|
||||
|
||||
@ -95,10 +95,10 @@ type snapshotParent struct {
|
||||
}
|
||||
|
||||
// maxDiffGenerations caps how many incremental diff generations we chain
|
||||
// before falling back to a Full snapshot to collapse the chain. Long diff
|
||||
// chains increase restore latency and snapshot directory size; a periodic
|
||||
// Full snapshot resets the counter and produces a clean base.
|
||||
const maxDiffGenerations = 8
|
||||
// before merging diffs into a single file. Since UFFD lazy-loads memory
|
||||
// anyway, we merge on every re-pause to keep exactly 1 diff file per
|
||||
// snapshot — no accumulated chain, no extra restore overhead.
|
||||
const maxDiffGenerations = 1
|
||||
|
||||
// buildMetadata constructs the metadata map with version information.
|
||||
func (m *Manager) buildMetadata(envdVersion string) map[string]string {
|
||||
@ -1720,12 +1720,12 @@ func (m *Manager) startSampler(sb *sandboxState) {
|
||||
go m.samplerLoop(ctx, sb, fcPID, sb.VCPUs, initialCPU)
|
||||
}
|
||||
|
||||
// samplerLoop samples /proc metrics at 500ms intervals.
|
||||
// samplerLoop samples metrics at 1s intervals.
|
||||
// lastCPU is goroutine-local to avoid shared-state races.
|
||||
func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpus int, lastCPU cpuStat) {
|
||||
defer close(sb.samplerDone)
|
||||
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
clkTck := 100.0 // sysconf(_SC_CLK_TCK), almost always 100 on Linux
|
||||
@ -1758,8 +1758,11 @@ func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpu
|
||||
cpuInitialized = true
|
||||
}
|
||||
|
||||
// Memory: VmRSS of the Firecracker process.
|
||||
memBytes, _ := readMemRSS(fcPID)
|
||||
// Memory: guest-reported used memory from envd /metrics.
|
||||
// VmRSS of the Firecracker process includes guest page cache
|
||||
// and never decreases, so we use the guest's own view which
|
||||
// reports total - available (actual process memory).
|
||||
memBytes, _ := readEnvdMemUsed(sb.client)
|
||||
|
||||
// Disk: allocated bytes of the CoW sparse file.
|
||||
var diskBytes int64
|
||||
|
||||
@ -15,11 +15,11 @@ type MetricPoint struct {
|
||||
|
||||
// Ring buffer capacity constants.
|
||||
const (
|
||||
ring10mCap = 1200 // 500ms × 1200 = 10 min
|
||||
ring2hCap = 240 // 30s × 240 = 2 h
|
||||
ring24hCap = 288 // 5min × 288 = 24 h
|
||||
ring10mCap = 600 // 1s × 600 = 10 min
|
||||
ring2hCap = 240 // 30s × 240 = 2 h
|
||||
ring24hCap = 288 // 5min × 288 = 24 h
|
||||
|
||||
downsample2hEvery = 60 // 60 × 500ms = 30s
|
||||
downsample2hEvery = 30 // 30 × 1s = 30s
|
||||
downsample24hEvery = 10 // 10 × 30s = 5min
|
||||
)
|
||||
|
||||
@ -44,8 +44,8 @@ type metricsRing struct {
|
||||
count24h int
|
||||
|
||||
// Accumulators for downsampling.
|
||||
acc500ms [downsample2hEvery]MetricPoint
|
||||
acc500msN int
|
||||
acc1s [downsample2hEvery]MetricPoint
|
||||
acc1sN int
|
||||
|
||||
acc30s [downsample24hEvery]MetricPoint
|
||||
acc30sN int
|
||||
@ -56,7 +56,7 @@ func newMetricsRing() *metricsRing {
|
||||
return &metricsRing{}
|
||||
}
|
||||
|
||||
// Push adds a 500ms sample to the finest tier and triggers downsampling
|
||||
// Push adds a 1s sample to the finest tier and triggers downsampling
|
||||
// into coarser tiers when enough samples have accumulated.
|
||||
func (r *metricsRing) Push(p MetricPoint) {
|
||||
r.mu.Lock()
|
||||
@ -70,12 +70,12 @@ func (r *metricsRing) Push(p MetricPoint) {
|
||||
}
|
||||
|
||||
// Accumulate for 2h downsample.
|
||||
r.acc500ms[r.acc500msN] = p
|
||||
r.acc500msN++
|
||||
if r.acc500msN == downsample2hEvery {
|
||||
avg := averagePoints(r.acc500ms[:downsample2hEvery])
|
||||
r.acc1s[r.acc1sN] = p
|
||||
r.acc1sN++
|
||||
if r.acc1sN == downsample2hEvery {
|
||||
avg := averagePoints(r.acc1s[:downsample2hEvery])
|
||||
r.push2h(avg)
|
||||
r.acc500msN = 0
|
||||
r.acc1sN = 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,7 +138,7 @@ func (r *metricsRing) Flush() (pts10m, pts2h, pts24h []MetricPoint) {
|
||||
r.idx10m, r.count10m = 0, 0
|
||||
r.idx2h, r.count2h = 0, 0
|
||||
r.idx24h, r.count24h = 0, 0
|
||||
r.acc500msN = 0
|
||||
r.acc1sN = 0
|
||||
r.acc30sN = 0
|
||||
|
||||
return pts10m, pts2h, pts24h
|
||||
|
||||
@ -1,11 +1,15 @@
|
||||
package sandbox
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/internal/envdclient"
|
||||
)
|
||||
|
||||
// cpuStat holds raw CPU jiffies read from /proc/{pid}/stat.
|
||||
@ -24,16 +28,11 @@ func readCPUStat(pid int) (cpuStat, error) {
|
||||
return cpuStat{}, fmt.Errorf("read stat: %w", err)
|
||||
}
|
||||
|
||||
// /proc/{pid}/stat format: pid (comm) state fields...
|
||||
// The comm field may contain spaces and parens, so find the last ')' first.
|
||||
content := string(data)
|
||||
idx := strings.LastIndex(content, ")")
|
||||
if idx < 0 {
|
||||
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: no closing paren", pid)
|
||||
}
|
||||
// After ")" there is " state field3 field4 ... fieldN"
|
||||
// field1 after ')' is state (index 0), utime is field 11, stime is field 12
|
||||
// (0-indexed from after the closing paren).
|
||||
fields := strings.Fields(content[idx+2:])
|
||||
if len(fields) < 13 {
|
||||
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: too few fields (%d)", pid, len(fields))
|
||||
@ -49,27 +48,34 @@ func readCPUStat(pid int) (cpuStat, error) {
|
||||
return cpuStat{utime: utime, stime: stime}, nil
|
||||
}
|
||||
|
||||
// readMemRSS reads VmRSS from /proc/{pid}/status and returns bytes.
|
||||
func readMemRSS(pid int) (int64, error) {
|
||||
path := fmt.Sprintf("/proc/%d/status", pid)
|
||||
data, err := os.ReadFile(path)
|
||||
// readEnvdMemUsed fetches mem_used from envd's /metrics endpoint. Returns
|
||||
// guest-side total - MemAvailable (actual process memory, excluding reclaimable
|
||||
// page cache). VmRSS of the Firecracker process includes guest page cache and
|
||||
// never decreases, so this is the accurate metric for dashboard display.
|
||||
func readEnvdMemUsed(client *envdclient.Client) (int64, error) {
|
||||
resp, err := client.HTTPClient().Get(client.BaseURL() + "/metrics")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read status: %w", err)
|
||||
return 0, fmt.Errorf("fetch envd metrics: %w", err)
|
||||
}
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
if strings.HasPrefix(line, "VmRSS:") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 2 {
|
||||
return 0, fmt.Errorf("malformed VmRSS line")
|
||||
}
|
||||
kb, err := strconv.ParseInt(fields[1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("parse VmRSS: %w", err)
|
||||
}
|
||||
return kb * 1024, nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return 0, fmt.Errorf("envd metrics: status %d", resp.StatusCode)
|
||||
}
|
||||
return 0, fmt.Errorf("VmRSS not found in /proc/%d/status", pid)
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read envd metrics body: %w", err)
|
||||
}
|
||||
|
||||
var m struct {
|
||||
MemUsed int64 `json:"mem_used"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &m); err != nil {
|
||||
return 0, fmt.Errorf("decode envd metrics: %w", err)
|
||||
}
|
||||
|
||||
return m.MemUsed, nil
|
||||
}
|
||||
|
||||
// readDiskAllocated returns the actual allocated bytes (not apparent size)
|
||||
|
||||
@ -136,6 +136,25 @@ func (c *fcClient) setMMDS(ctx context.Context, sandboxID, templateID string) er
|
||||
})
|
||||
}
|
||||
|
||||
// setBalloon configures the Firecracker balloon device for dynamic memory
|
||||
// management. deflateOnOom lets the guest reclaim balloon pages under memory
|
||||
// pressure. statsInterval enables periodic stats via GET /balloon/statistics.
|
||||
// Must be called before startVM.
|
||||
func (c *fcClient) setBalloon(ctx context.Context, amountMiB int, deflateOnOom bool, statsIntervalS int) error {
|
||||
return c.do(ctx, http.MethodPut, "/balloon", map[string]any{
|
||||
"amount_mib": amountMiB,
|
||||
"deflate_on_oom": deflateOnOom,
|
||||
"stats_polling_interval_s": statsIntervalS,
|
||||
})
|
||||
}
|
||||
|
||||
// updateBalloon adjusts the balloon target at runtime.
|
||||
func (c *fcClient) updateBalloon(ctx context.Context, amountMiB int) error {
|
||||
return c.do(ctx, http.MethodPatch, "/balloon", map[string]any{
|
||||
"amount_mib": amountMiB,
|
||||
})
|
||||
}
|
||||
|
||||
// startVM issues the InstanceStart action.
|
||||
func (c *fcClient) startVM(ctx context.Context) error {
|
||||
return c.do(ctx, http.MethodPut, "/actions", map[string]string{
|
||||
|
||||
@ -119,6 +119,13 @@ func configureVM(ctx context.Context, client *fcClient, cfg *VMConfig) error {
|
||||
return fmt.Errorf("set machine config: %w", err)
|
||||
}
|
||||
|
||||
// Balloon device — allows the host to reclaim unused guest memory.
|
||||
// Start with 0 (no inflation). deflate_on_oom lets the guest reclaim
|
||||
// balloon pages under memory pressure. Stats interval enables monitoring.
|
||||
if err := client.setBalloon(ctx, 0, true, 5); err != nil {
|
||||
slog.Warn("set balloon failed (non-fatal, VM will run without memory reclaim)", "error", err)
|
||||
}
|
||||
|
||||
// MMDS config — enable V2 token access on eth0 so that envd can read
|
||||
// WRENN_SANDBOX_ID and WRENN_TEMPLATE_ID from inside the guest.
|
||||
if err := client.setMMDSConfig(ctx, "eth0"); err != nil {
|
||||
@ -162,6 +169,19 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateBalloon adjusts the balloon target for a running VM.
|
||||
// amountMiB is memory to take FROM the guest (0 = give all back).
|
||||
func (m *Manager) UpdateBalloon(ctx context.Context, sandboxID string, amountMiB int) error {
|
||||
m.mu.RLock()
|
||||
vm, ok := m.vms[sandboxID]
|
||||
m.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("VM not found: %s", sandboxID)
|
||||
}
|
||||
|
||||
return vm.client.updateBalloon(ctx, amountMiB)
|
||||
}
|
||||
|
||||
// Destroy stops and cleans up a VM.
|
||||
func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
|
||||
m.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user