1
0
forked from wrenn/wrenn

Add per-sandbox CPU/memory/disk metrics collection

Samples /proc/{fc_pid}/stat (CPU%), /proc/{fc_pid}/status (VmRSS), and
stat() on CoW files at 500ms intervals per running sandbox. Three tiered
ring buffers downsample into 30s and 5min averages for 10min/2h/24h
retention. Metrics are flushed to DB on pause (all tiers) and destroy
(24h only). New GetSandboxMetrics and FlushSandboxMetrics RPCs on the
host agent, proxied through GET /v1/sandboxes/{id}/metrics?range= on
the control plane. Returns live data for running sandboxes, DB data for
paused, and 404 for stopped.
This commit is contained in:
2026-03-25 20:10:33 +06:00
parent 7473c15f52
commit 9acdbb5ae9
16 changed files with 1430 additions and 90 deletions

View File

@ -0,0 +1,130 @@
package api
import (
"context"
"net/http"
"connectrpc.com/connect"
"github.com/go-chi/chi/v5"
"git.omukk.dev/wrenn/sandbox/internal/auth"
"git.omukk.dev/wrenn/sandbox/internal/db"
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
)
type sandboxMetricsHandler struct {
db *db.Queries
pool *lifecycle.HostClientPool
}
func newSandboxMetricsHandler(db *db.Queries, pool *lifecycle.HostClientPool) *sandboxMetricsHandler {
return &sandboxMetricsHandler{db: db, pool: pool}
}
type metricPointResponse struct {
TimestampUnix int64 `json:"timestamp_unix"`
CPUPct float64 `json:"cpu_pct"`
MemBytes int64 `json:"mem_bytes"`
DiskBytes int64 `json:"disk_bytes"`
}
type metricsResponse struct {
SandboxID string `json:"sandbox_id"`
Range string `json:"range"`
Points []metricPointResponse `json:"points"`
}
// GetMetrics handles GET /v1/sandboxes/{id}/metrics?range=10m|2h|24h.
func (h *sandboxMetricsHandler) GetMetrics(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
ac := auth.MustFromContext(ctx)
rangeTier := r.URL.Query().Get("range")
if rangeTier == "" {
rangeTier = "10m"
}
if rangeTier != "10m" && rangeTier != "2h" && rangeTier != "24h" {
writeError(w, http.StatusBadRequest, "invalid_request", "range must be 10m, 2h, or 24h")
return
}
sb, err := h.db.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: ac.TeamID})
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
switch sb.Status {
case "running":
h.getFromAgent(w, r, sandboxID, rangeTier, sb.HostID)
case "paused":
h.getFromDB(ctx, w, sandboxID, rangeTier)
default:
writeError(w, http.StatusNotFound, "not_found", "metrics not available for sandbox in state: "+sb.Status)
}
}
func (h *sandboxMetricsHandler) getFromAgent(w http.ResponseWriter, r *http.Request, sandboxID, rangeTier, hostID string) {
ctx := r.Context()
agent, err := agentForHost(ctx, h.db, h.pool, hostID)
if err != nil {
writeError(w, http.StatusServiceUnavailable, "host_unavailable", "sandbox host is not reachable")
return
}
resp, err := agent.GetSandboxMetrics(ctx, connect.NewRequest(&pb.GetSandboxMetricsRequest{
SandboxId: sandboxID,
Range: rangeTier,
}))
if err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
points := make([]metricPointResponse, len(resp.Msg.Points))
for i, p := range resp.Msg.Points {
points[i] = metricPointResponse{
TimestampUnix: p.TimestampUnix,
CPUPct: p.CpuPct,
MemBytes: p.MemBytes,
DiskBytes: p.DiskBytes,
}
}
writeJSON(w, http.StatusOK, metricsResponse{
SandboxID: sandboxID,
Range: rangeTier,
Points: points,
})
}
func (h *sandboxMetricsHandler) getFromDB(ctx context.Context, w http.ResponseWriter, sandboxID, rangeTier string) {
rows, err := h.db.GetSandboxMetricPoints(ctx, db.GetSandboxMetricPointsParams{
SandboxID: sandboxID,
Tier: rangeTier,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "internal_error", "failed to read metrics")
return
}
points := make([]metricPointResponse, len(rows))
for i, row := range rows {
points[i] = metricPointResponse{
TimestampUnix: row.Ts,
CPUPct: row.CpuPct,
MemBytes: row.MemBytes,
DiskBytes: row.DiskBytes,
}
}
writeJSON(w, http.StatusOK, metricsResponse{
SandboxID: sandboxID,
Range: rangeTier,
Points: points,
})
}

View File

@ -751,6 +751,60 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/metrics:
parameters:
- name: id
in: path
required: true
schema:
type: string
get:
summary: Get per-sandbox resource metrics
operationId: getSandboxMetrics
tags: [sandboxes]
security:
- apiKeyAuth: []
- bearerAuth: []
description: |
Returns time-series CPU, memory, and disk metrics for a sandbox.
Three tiers are available with different granularity and retention:
- `10m`: 500ms samples, last 10 minutes
- `2h`: 30-second averages, last 2 hours
- `24h`: 5-minute averages, last 24 hours
For running sandboxes, data comes from the host agent's in-memory
ring buffer. For paused sandboxes, data is read from persisted
snapshots in the database. Stopped/destroyed sandboxes return 404.
parameters:
- name: range
in: query
required: false
schema:
type: string
enum: ["10m", "2h", "24h"]
default: "10m"
description: Time range tier to query
responses:
"200":
description: Metrics retrieved
content:
application/json:
schema:
$ref: "#/components/schemas/SandboxMetrics"
"400":
description: Invalid range parameter
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"404":
description: Sandbox not found or metrics not available
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/pause:
parameters:
- name: id
@ -1981,6 +2035,38 @@ components:
items:
$ref: "#/components/schemas/TeamMember"
SandboxMetrics:
type: object
properties:
sandbox_id:
type: string
range:
type: string
enum: ["10m", "2h", "24h"]
points:
type: array
items:
$ref: "#/components/schemas/MetricPoint"
MetricPoint:
type: object
properties:
timestamp_unix:
type: integer
format: int64
cpu_pct:
type: number
format: double
description: "CPU utilization percentage (0-100), normalized to vCPU count"
mem_bytes:
type: integer
format: int64
description: "Resident memory in bytes (VmRSS of Firecracker process)"
disk_bytes:
type: integer
format: int64
description: "Allocated disk bytes for the CoW sparse file"
Error:
type: object
properties:

View File

@ -64,6 +64,7 @@ func New(
usersH := newUsersHandler(teamSvc)
auditH := newAuditHandler(auditSvc)
statsH := newStatsHandler(statsSvc)
metricsH := newSandboxMetricsHandler(queries, pool)
// OpenAPI spec and docs.
r.Get("/openapi.yaml", serveOpenAPI)
@ -125,6 +126,7 @@ func New(
r.Post("/files/read", files.Download)
r.Post("/files/stream/write", filesStream.StreamUpload)
r.Post("/files/stream/read", filesStream.StreamDownload)
r.Get("/metrics", metricsH.GetMetrics)
})
})

View File

@ -9,6 +9,31 @@ import (
"context"
)
const deleteSandboxMetricPoints = `-- name: DeleteSandboxMetricPoints :exec
DELETE FROM sandbox_metric_points
WHERE sandbox_id = $1
`
func (q *Queries) DeleteSandboxMetricPoints(ctx context.Context, sandboxID string) error {
_, err := q.db.Exec(ctx, deleteSandboxMetricPoints, sandboxID)
return err
}
const deleteSandboxMetricPointsByTier = `-- name: DeleteSandboxMetricPointsByTier :exec
DELETE FROM sandbox_metric_points
WHERE sandbox_id = $1 AND tier = $2
`
type DeleteSandboxMetricPointsByTierParams struct {
SandboxID string `json:"sandbox_id"`
Tier string `json:"tier"`
}
func (q *Queries) DeleteSandboxMetricPointsByTier(ctx context.Context, arg DeleteSandboxMetricPointsByTierParams) error {
_, err := q.db.Exec(ctx, deleteSandboxMetricPointsByTier, arg.SandboxID, arg.Tier)
return err
}
const getLiveMetrics = `-- name: GetLiveMetrics :one
SELECT
(COUNT(*) FILTER (WHERE status IN ('running', 'starting')))::INTEGER AS running_count,
@ -58,6 +83,50 @@ func (q *Queries) GetPeakMetrics(ctx context.Context, teamID string) (GetPeakMet
return i, err
}
const getSandboxMetricPoints = `-- name: GetSandboxMetricPoints :many
SELECT ts, cpu_pct, mem_bytes, disk_bytes
FROM sandbox_metric_points
WHERE sandbox_id = $1 AND tier = $2
ORDER BY ts ASC
`
type GetSandboxMetricPointsParams struct {
SandboxID string `json:"sandbox_id"`
Tier string `json:"tier"`
}
type GetSandboxMetricPointsRow struct {
Ts int64 `json:"ts"`
CpuPct float64 `json:"cpu_pct"`
MemBytes int64 `json:"mem_bytes"`
DiskBytes int64 `json:"disk_bytes"`
}
func (q *Queries) GetSandboxMetricPoints(ctx context.Context, arg GetSandboxMetricPointsParams) ([]GetSandboxMetricPointsRow, error) {
rows, err := q.db.Query(ctx, getSandboxMetricPoints, arg.SandboxID, arg.Tier)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetSandboxMetricPointsRow
for rows.Next() {
var i GetSandboxMetricPointsRow
if err := rows.Scan(
&i.Ts,
&i.CpuPct,
&i.MemBytes,
&i.DiskBytes,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertMetricsSnapshot = `-- name: InsertMetricsSnapshot :exec
INSERT INTO sandbox_metrics_snapshots (team_id, running_count, vcpus_reserved, memory_mb_reserved)
VALUES ($1, $2, $3, $4)
@ -80,6 +149,33 @@ func (q *Queries) InsertMetricsSnapshot(ctx context.Context, arg InsertMetricsSn
return err
}
const insertSandboxMetricPoint = `-- name: InsertSandboxMetricPoint :exec
INSERT INTO sandbox_metric_points (sandbox_id, tier, ts, cpu_pct, mem_bytes, disk_bytes)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (sandbox_id, tier, ts) DO NOTHING
`
type InsertSandboxMetricPointParams struct {
SandboxID string `json:"sandbox_id"`
Tier string `json:"tier"`
Ts int64 `json:"ts"`
CpuPct float64 `json:"cpu_pct"`
MemBytes int64 `json:"mem_bytes"`
DiskBytes int64 `json:"disk_bytes"`
}
func (q *Queries) InsertSandboxMetricPoint(ctx context.Context, arg InsertSandboxMetricPointParams) error {
_, err := q.db.Exec(ctx, insertSandboxMetricPoint,
arg.SandboxID,
arg.Tier,
arg.Ts,
arg.CpuPct,
arg.MemBytes,
arg.DiskBytes,
)
return err
}
const pruneOldMetrics = `-- name: PruneOldMetrics :exec
DELETE FROM sandbox_metrics_snapshots
WHERE sampled_at < NOW() - INTERVAL '60 days'
@ -90,6 +186,17 @@ func (q *Queries) PruneOldMetrics(ctx context.Context) error {
return err
}
const pruneSandboxMetricPoints = `-- name: PruneSandboxMetricPoints :exec
DELETE FROM sandbox_metric_points
WHERE ts < EXTRACT(EPOCH FROM NOW() - INTERVAL '30 days')::BIGINT
`
// Remove metric points older than 30 days for destroyed sandboxes.
func (q *Queries) PruneSandboxMetricPoints(ctx context.Context) error {
_, err := q.db.Exec(ctx, pruneSandboxMetricPoints)
return err
}
const sampleSandboxMetrics = `-- name: SampleSandboxMetrics :many
SELECT
team_id,

View File

@ -99,6 +99,15 @@ type Sandbox struct {
TeamID string `json:"team_id"`
}
type SandboxMetricPoint struct {
SandboxID string `json:"sandbox_id"`
Tier string `json:"tier"`
Ts int64 `json:"ts"`
CpuPct float64 `json:"cpu_pct"`
MemBytes int64 `json:"mem_bytes"`
DiskBytes int64 `json:"disk_bytes"`
}
type SandboxMetricsSnapshot struct {
ID int64 `json:"id"`
TeamID string `json:"team_id"`

View File

@ -426,3 +426,55 @@ func (s *Server) Terminate(
}
return connect.NewResponse(&pb.TerminateResponse{}), nil
}
func (s *Server) GetSandboxMetrics(
_ context.Context,
req *connect.Request[pb.GetSandboxMetricsRequest],
) (*connect.Response[pb.GetSandboxMetricsResponse], error) {
msg := req.Msg
points, err := s.mgr.GetMetrics(msg.SandboxId, msg.Range)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, connect.NewError(connect.CodeNotFound, err)
}
if strings.Contains(err.Error(), "invalid range") {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&pb.GetSandboxMetricsResponse{Points: metricPointsToPB(points)}), nil
}
func (s *Server) FlushSandboxMetrics(
_ context.Context,
req *connect.Request[pb.FlushSandboxMetricsRequest],
) (*connect.Response[pb.FlushSandboxMetricsResponse], error) {
pts10m, pts2h, pts24h, err := s.mgr.FlushMetrics(req.Msg.SandboxId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&pb.FlushSandboxMetricsResponse{
Points_10M: metricPointsToPB(pts10m),
Points_2H: metricPointsToPB(pts2h),
Points_24H: metricPointsToPB(pts24h),
}), nil
}
func metricPointsToPB(pts []sandbox.MetricPoint) []*pb.MetricPoint {
out := make([]*pb.MetricPoint, len(pts))
for i, p := range pts {
out[i] = &pb.MetricPoint{
TimestampUnix: p.Timestamp.Unix(),
CpuPct: p.CPUPct,
MemBytes: p.MemBytes,
DiskBytes: p.DiskBytes,
}
}
return out
}

View File

@ -58,6 +58,12 @@ type sandboxState struct {
// sandbox was restored. Non-nil means re-pause should use "Diff" snapshot
// type instead of "Full", avoiding the UFFD fault-in storm.
parent *snapshotParent
// Metrics sampling state.
fcPID int // Firecracker process PID (child of unshare wrapper)
ring *metricsRing // tiered ring buffers for CPU/mem/disk metrics
samplerCancel context.CancelFunc // cancels the per-sandbox sampling goroutine
samplerDone chan struct{} // closed when the sampling goroutine exits
}
// snapshotParent stores the previous generation's snapshot state so that
@ -232,6 +238,8 @@ func (m *Manager) Create(ctx context.Context, sandboxID, template string, vcpus,
m.boxes[sandboxID] = sb
m.mu.Unlock()
m.startSampler(sb)
slog.Info("sandbox created",
"id", sandboxID,
"template", template,
@ -265,6 +273,7 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
// cleanup tears down all resources for a sandbox.
func (m *Manager) cleanup(ctx context.Context, sb *sandboxState) {
m.stopSampler(sb)
if err := m.vm.Destroy(ctx, sb.ID); err != nil {
slog.Warn("vm destroy error", "id", sb.ID, "error", err)
}
@ -668,6 +677,8 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int)
m.boxes[sandboxID] = sb
m.mu.Unlock()
m.startSampler(sb)
// Don't delete snapshot dir — diff files are needed for re-pause.
// The CoW file was already moved out. The dir will be cleaned up
// on destroy or overwritten on re-pause.
@ -987,6 +998,8 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID, snapshotNam
m.boxes[sandboxID] = sb
m.mu.Unlock()
m.startSampler(sb)
slog.Info("sandbox created from snapshot",
"id", sandboxID,
"snapshot", snapshotName,
@ -1213,6 +1226,158 @@ func warnErr(msg string, id string, err error) {
}
}
// startSampler resolves the Firecracker child PID and starts a background
// goroutine that samples CPU/mem/disk at 500ms intervals into the ring buffer.
// Must be called after the sandbox is registered in m.boxes.
func (m *Manager) startSampler(sb *sandboxState) {
// Resolve the Firecracker PID (child of unshare wrapper).
v, ok := m.vm.Get(sb.ID)
if !ok {
slog.Warn("metrics: VM not found, skipping sampler", "id", sb.ID)
return
}
unshPID := v.PID()
var fcPID int
for attempt := 0; attempt < 5; attempt++ {
var err error
fcPID, err = findChildPID(unshPID)
if err == nil {
break
}
if attempt == 4 {
slog.Warn("metrics: could not resolve FC PID, skipping sampler", "id", sb.ID, "error", err)
return
}
time.Sleep(50 * time.Millisecond)
}
sb.fcPID = fcPID
sb.ring = newMetricsRing()
ctx, cancel := context.WithCancel(context.Background())
sb.samplerCancel = cancel
sb.samplerDone = make(chan struct{})
// Read initial CPU counters for delta calculation.
// Passed to goroutine as local state — no shared mutation.
initialCPU, err := readCPUStat(fcPID)
if err != nil {
slog.Warn("metrics: could not read initial CPU stat", "id", sb.ID, "error", err)
}
go m.samplerLoop(ctx, sb, fcPID, sb.VCPUs, initialCPU)
}
// samplerLoop samples /proc metrics at 500ms intervals.
// lastCPU is goroutine-local to avoid shared-state races.
func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpus int, lastCPU cpuStat) {
defer close(sb.samplerDone)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
clkTck := 100.0 // sysconf(_SC_CLK_TCK), almost always 100 on Linux
lastTime := time.Now()
cpuInitialized := lastCPU != (cpuStat{})
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
elapsed := now.Sub(lastTime).Seconds()
lastTime = now
// CPU: delta jiffies / (elapsed * CLK_TCK * vcpus) * 100
var cpuPct float64
cur, err := readCPUStat(fcPID)
if err == nil {
if cpuInitialized && elapsed > 0 && vcpus > 0 {
deltaJiffies := float64((cur.utime + cur.stime) - (lastCPU.utime + lastCPU.stime))
cpuPct = (deltaJiffies / (elapsed * clkTck * float64(vcpus))) * 100.0
if cpuPct > 100.0 {
cpuPct = 100.0
}
if cpuPct < 0 {
cpuPct = 0
}
}
lastCPU = cur
cpuInitialized = true
}
// Memory: VmRSS of the Firecracker process.
memBytes, _ := readMemRSS(fcPID)
// Disk: allocated bytes of the CoW sparse file.
var diskBytes int64
if sb.dmDevice != nil {
diskBytes, _ = readDiskAllocated(sb.dmDevice.CowPath)
}
sb.ring.Push(MetricPoint{
Timestamp: now,
CPUPct: cpuPct,
MemBytes: memBytes,
DiskBytes: diskBytes,
})
}
}
}
// stopSampler stops the metrics sampling goroutine and waits for it to exit.
func (m *Manager) stopSampler(sb *sandboxState) {
if sb.samplerCancel != nil {
sb.samplerCancel()
<-sb.samplerDone
sb.samplerCancel = nil
}
}
// GetMetrics returns the ring buffer data for the given range tier.
// Valid ranges: "10m", "2h", "24h".
func (m *Manager) GetMetrics(sandboxID, rangeTier string) ([]MetricPoint, error) {
m.mu.RLock()
sb, ok := m.boxes[sandboxID]
m.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("sandbox not found: %s", sandboxID)
}
if sb.ring == nil {
return nil, nil
}
switch rangeTier {
case "10m":
return sb.ring.Get10m(), nil
case "2h":
return sb.ring.Get2h(), nil
case "24h":
return sb.ring.Get24h(), nil
default:
return nil, fmt.Errorf("invalid range: %s (valid: 10m, 2h, 24h)", rangeTier)
}
}
// FlushMetrics returns all three tier ring buffers, clears the ring, and
// stops the sampler goroutine. Called by the control plane before pause/destroy.
func (m *Manager) FlushMetrics(sandboxID string) (pts10m, pts2h, pts24h []MetricPoint, err error) {
m.mu.RLock()
sb, ok := m.boxes[sandboxID]
m.mu.RUnlock()
if !ok {
return nil, nil, nil, fmt.Errorf("sandbox not found: %s", sandboxID)
}
m.stopSampler(sb)
if sb.ring == nil {
return nil, nil, nil, nil
}
pts10m, pts2h, pts24h = sb.ring.Flush()
return pts10m, pts2h, pts24h, nil
}
// copyFile copies a regular file from src to dst using streaming I/O.
func copyFile(src, dst string) error {
sf, err := os.Open(src)

178
internal/sandbox/metrics.go Normal file
View File

@ -0,0 +1,178 @@
package sandbox
import (
"sync"
"time"
)
// MetricPoint holds one metrics sample.
type MetricPoint struct {
Timestamp time.Time
CPUPct float64
MemBytes int64
DiskBytes int64
}
// Ring buffer capacity constants.
const (
ring10mCap = 1200 // 500ms × 1200 = 10 min
ring2hCap = 240 // 30s × 240 = 2 h
ring24hCap = 288 // 5min × 288 = 24 h
downsample2hEvery = 60 // 60 × 500ms = 30s
downsample24hEvery = 10 // 10 × 30s = 5min
)
// metricsRing holds three tiered ring buffers with automatic downsampling
// from the finest tier into coarser tiers.
type metricsRing struct {
mu sync.Mutex
// 10-minute tier: 500ms samples.
buf10m [ring10mCap]MetricPoint
idx10m int
count10m int
// 2-hour tier: 30s averages.
buf2h [ring2hCap]MetricPoint
idx2h int
count2h int
// 24-hour tier: 5min averages.
buf24h [ring24hCap]MetricPoint
idx24h int
count24h int
// Accumulators for downsampling.
acc500ms [downsample2hEvery]MetricPoint
acc500msN int
acc30s [downsample24hEvery]MetricPoint
acc30sN int
}
// newMetricsRing creates an empty metrics ring buffer.
func newMetricsRing() *metricsRing {
return &metricsRing{}
}
// Push adds a 500ms sample to the finest tier and triggers downsampling
// into coarser tiers when enough samples have accumulated.
func (r *metricsRing) Push(p MetricPoint) {
r.mu.Lock()
defer r.mu.Unlock()
// Write to 10m ring.
r.buf10m[r.idx10m] = p
r.idx10m = (r.idx10m + 1) % ring10mCap
if r.count10m < ring10mCap {
r.count10m++
}
// Accumulate for 2h downsample.
r.acc500ms[r.acc500msN] = p
r.acc500msN++
if r.acc500msN == downsample2hEvery {
avg := averagePoints(r.acc500ms[:downsample2hEvery])
r.push2h(avg)
r.acc500msN = 0
}
}
func (r *metricsRing) push2h(p MetricPoint) {
r.buf2h[r.idx2h] = p
r.idx2h = (r.idx2h + 1) % ring2hCap
if r.count2h < ring2hCap {
r.count2h++
}
// Accumulate for 24h downsample.
r.acc30s[r.acc30sN] = p
r.acc30sN++
if r.acc30sN == downsample24hEvery {
avg := averagePoints(r.acc30s[:downsample24hEvery])
r.push24h(avg)
r.acc30sN = 0
}
}
func (r *metricsRing) push24h(p MetricPoint) {
r.buf24h[r.idx24h] = p
r.idx24h = (r.idx24h + 1) % ring24hCap
if r.count24h < ring24hCap {
r.count24h++
}
}
// Get10m returns the 10-minute tier points in chronological order.
func (r *metricsRing) Get10m() []MetricPoint {
r.mu.Lock()
defer r.mu.Unlock()
return r.readRing(r.buf10m[:], r.idx10m, r.count10m)
}
// Get2h returns the 2-hour tier points in chronological order.
func (r *metricsRing) Get2h() []MetricPoint {
r.mu.Lock()
defer r.mu.Unlock()
return r.readRing(r.buf2h[:], r.idx2h, r.count2h)
}
// Get24h returns the 24-hour tier points in chronological order.
func (r *metricsRing) Get24h() []MetricPoint {
r.mu.Lock()
defer r.mu.Unlock()
return r.readRing(r.buf24h[:], r.idx24h, r.count24h)
}
// Flush returns all three tiers and resets the ring buffer.
func (r *metricsRing) Flush() (pts10m, pts2h, pts24h []MetricPoint) {
r.mu.Lock()
defer r.mu.Unlock()
pts10m = r.readRing(r.buf10m[:], r.idx10m, r.count10m)
pts2h = r.readRing(r.buf2h[:], r.idx2h, r.count2h)
pts24h = r.readRing(r.buf24h[:], r.idx24h, r.count24h)
// Reset all state.
r.idx10m, r.count10m = 0, 0
r.idx2h, r.count2h = 0, 0
r.idx24h, r.count24h = 0, 0
r.acc500msN = 0
r.acc30sN = 0
return pts10m, pts2h, pts24h
}
// readRing extracts elements from a circular buffer in chronological order.
func (r *metricsRing) readRing(buf []MetricPoint, nextIdx, count int) []MetricPoint {
if count == 0 {
return nil
}
result := make([]MetricPoint, count)
bufLen := len(buf)
start := (nextIdx - count + bufLen) % bufLen
for i := range count {
result[i] = buf[(start+i)%bufLen]
}
return result
}
// averagePoints computes the average of a slice of MetricPoints.
// The timestamp is set to the last point's timestamp.
func averagePoints(pts []MetricPoint) MetricPoint {
n := float64(len(pts))
var cpu float64
var mem, disk int64
for _, p := range pts {
cpu += p.CPUPct
mem += p.MemBytes
disk += p.DiskBytes
}
return MetricPoint{
Timestamp: pts[len(pts)-1].Timestamp,
CPUPct: cpu / n,
MemBytes: int64(float64(mem) / n),
DiskBytes: int64(float64(disk) / n),
}
}

105
internal/sandbox/proc.go Normal file
View File

@ -0,0 +1,105 @@
package sandbox
import (
"fmt"
"os"
"strconv"
"strings"
"syscall"
)
// findChildPID reads the direct child PID of a given parent process.
// The Firecracker process is a direct child of the unshare wrapper because
// the init script uses `exec ip netns exec ... firecracker`, which replaces
// bash with ip-netns-exec, which in turn execs firecracker — same PID,
// direct child of unshare.
func findChildPID(parentPID int) (int, error) {
path := fmt.Sprintf("/proc/%d/task/%d/children", parentPID, parentPID)
data, err := os.ReadFile(path)
if err != nil {
return 0, fmt.Errorf("read children: %w", err)
}
fields := strings.Fields(string(data))
if len(fields) == 0 {
return 0, fmt.Errorf("no child processes found for PID %d", parentPID)
}
pid, err := strconv.Atoi(fields[0])
if err != nil {
return 0, fmt.Errorf("parse child PID %q: %w", fields[0], err)
}
return pid, nil
}
// cpuStat holds raw CPU jiffies read from /proc/{pid}/stat.
type cpuStat struct {
utime uint64
stime uint64
}
// readCPUStat reads user and system CPU jiffies from /proc/{pid}/stat.
// Fields 14 (utime) and 15 (stime) are 1-indexed in the man page;
// after splitting on space, they are at indices 13 and 14.
func readCPUStat(pid int) (cpuStat, error) {
path := fmt.Sprintf("/proc/%d/stat", pid)
data, err := os.ReadFile(path)
if err != nil {
return cpuStat{}, fmt.Errorf("read stat: %w", err)
}
// /proc/{pid}/stat format: pid (comm) state fields...
// The comm field may contain spaces and parens, so find the last ')' first.
content := string(data)
idx := strings.LastIndex(content, ")")
if idx < 0 {
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: no closing paren", pid)
}
// After ")" there is " state field3 field4 ... fieldN"
// field1 after ')' is state (index 0), utime is field 11, stime is field 12
// (0-indexed from after the closing paren).
fields := strings.Fields(content[idx+2:])
if len(fields) < 13 {
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: too few fields (%d)", pid, len(fields))
}
utime, err := strconv.ParseUint(fields[11], 10, 64)
if err != nil {
return cpuStat{}, fmt.Errorf("parse utime: %w", err)
}
stime, err := strconv.ParseUint(fields[12], 10, 64)
if err != nil {
return cpuStat{}, fmt.Errorf("parse stime: %w", err)
}
return cpuStat{utime: utime, stime: stime}, nil
}
// readMemRSS reads VmRSS from /proc/{pid}/status and returns bytes.
func readMemRSS(pid int) (int64, error) {
path := fmt.Sprintf("/proc/%d/status", pid)
data, err := os.ReadFile(path)
if err != nil {
return 0, fmt.Errorf("read status: %w", err)
}
for _, line := range strings.Split(string(data), "\n") {
if strings.HasPrefix(line, "VmRSS:") {
fields := strings.Fields(line)
if len(fields) < 2 {
return 0, fmt.Errorf("malformed VmRSS line")
}
kb, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return 0, fmt.Errorf("parse VmRSS: %w", err)
}
return kb * 1024, nil
}
}
return 0, fmt.Errorf("VmRSS not found in /proc/%d/status", pid)
}
// readDiskAllocated returns the actual allocated bytes (not apparent size)
// of the file at path. This uses stat's block count × 512.
func readDiskAllocated(path string) (int64, error) {
var stat syscall.Stat_t
if err := syscall.Stat(path, &stat); err != nil {
return 0, fmt.Errorf("stat %s: %w", path, err)
}
return stat.Blocks * 512, nil
}

View File

@ -58,6 +58,8 @@ type hostagentClient = interface {
PauseSandbox(ctx context.Context, req *connect.Request[pb.PauseSandboxRequest]) (*connect.Response[pb.PauseSandboxResponse], error)
ResumeSandbox(ctx context.Context, req *connect.Request[pb.ResumeSandboxRequest]) (*connect.Response[pb.ResumeSandboxResponse], error)
PingSandbox(ctx context.Context, req *connect.Request[pb.PingSandboxRequest]) (*connect.Response[pb.PingSandboxResponse], error)
GetSandboxMetrics(ctx context.Context, req *connect.Request[pb.GetSandboxMetricsRequest]) (*connect.Response[pb.GetSandboxMetricsResponse], error)
FlushSandboxMetrics(ctx context.Context, req *connect.Request[pb.FlushSandboxMetricsRequest]) (*connect.Response[pb.FlushSandboxMetricsResponse], error)
}
// Create creates a new sandbox: picks a host via the scheduler, inserts a pending
@ -180,6 +182,9 @@ func (s *SandboxService) Pause(ctx context.Context, sandboxID, teamID string) (d
return db.Sandbox{}, err
}
// Flush all metrics tiers before pausing so data survives in DB.
s.flushAndPersistMetrics(ctx, agent, sandboxID, true)
if _, err := agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{
SandboxId: sandboxID,
})); err != nil {
@ -236,7 +241,8 @@ func (s *SandboxService) Resume(ctx context.Context, sandboxID, teamID string) (
// Destroy stops a sandbox and marks it as stopped.
func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID string) error {
if _, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID}); err != nil {
sb, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID})
if err != nil {
return fmt.Errorf("sandbox not found: %w", err)
}
@ -245,6 +251,11 @@ func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID string)
return err
}
// If running, flush 24h tier metrics for analytics before destroying.
if sb.Status == "running" {
s.flushAndPersistMetrics(ctx, agent, sandboxID, false)
}
// Destroy on host agent. A not-found response is fine — sandbox is already gone.
if _, err := agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{
SandboxId: sandboxID,
@ -252,6 +263,16 @@ func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID string)
return fmt.Errorf("agent destroy: %w", err)
}
// For a paused sandbox, only keep 24h tier; remove the finer-grained tiers.
if sb.Status == "paused" {
_ = s.DB.DeleteSandboxMetricPointsByTier(ctx, db.DeleteSandboxMetricPointsByTierParams{
SandboxID: sandboxID, Tier: "10m",
})
_ = s.DB.DeleteSandboxMetricPointsByTier(ctx, db.DeleteSandboxMetricPointsByTierParams{
SandboxID: sandboxID, Tier: "2h",
})
}
if _, err := s.DB.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
ID: sandboxID, Status: "stopped",
}); err != nil {
@ -260,6 +281,41 @@ func (s *SandboxService) Destroy(ctx context.Context, sandboxID, teamID string)
return nil
}
// flushAndPersistMetrics calls FlushSandboxMetrics on the agent and stores
// the returned data to DB. If allTiers is true, all three tiers are saved;
// otherwise only the 24h tier (for post-destroy analytics).
func (s *SandboxService) flushAndPersistMetrics(ctx context.Context, agent hostagentClient, sandboxID string, allTiers bool) {
resp, err := agent.FlushSandboxMetrics(ctx, connect.NewRequest(&pb.FlushSandboxMetricsRequest{
SandboxId: sandboxID,
}))
if err != nil {
slog.Warn("flush metrics failed (best-effort)", "sandbox_id", sandboxID, "error", err)
return
}
msg := resp.Msg
if allTiers {
s.persistMetricPoints(ctx, sandboxID, "10m", msg.Points_10M)
s.persistMetricPoints(ctx, sandboxID, "2h", msg.Points_2H)
}
s.persistMetricPoints(ctx, sandboxID, "24h", msg.Points_24H)
}
func (s *SandboxService) persistMetricPoints(ctx context.Context, sandboxID, tier string, points []*pb.MetricPoint) {
for _, p := range points {
if err := s.DB.InsertSandboxMetricPoint(ctx, db.InsertSandboxMetricPointParams{
SandboxID: sandboxID,
Tier: tier,
Ts: p.TimestampUnix,
CpuPct: p.CpuPct,
MemBytes: p.MemBytes,
DiskBytes: p.DiskBytes,
}); err != nil {
slog.Warn("persist metric point failed", "sandbox_id", sandboxID, "tier", tier, "error", err)
}
}
}
// Ping resets the inactivity timer for a running sandbox.
func (s *SandboxService) Ping(ctx context.Context, sandboxID, teamID string) error {
sb, err := s.DB.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: teamID})

View File

@ -250,6 +250,12 @@ func (m *Manager) CreateFromSnapshot(ctx context.Context, cfg VMConfig, snapPath
return vm, nil
}
// PID returns the process ID of the unshare wrapper process.
// The actual Firecracker process is a direct child of this PID.
func (v *VM) PID() int {
return v.process.cmd.Process.Pid
}
// Get returns a running VM by sandbox ID.
func (m *Manager) Get(sandboxID string) (*VM, bool) {
vm, ok := m.vms[sandboxID]