forked from wrenn/wrenn
v0.1.6 (#45)
## What's New? Performance updates for large capsules, admin panel enhancement and bug fixes ### Envd - Fixed bug with sandbox metrics calculation - Page cache drop and balloon inflation to reduce memfile snapshot - Updated rpc timeout logic for better control - Added tests ### Admin Panel - Add/Remove platform admin - Updated template deletion logic for fine grained permission ### Others - Minor frontend visual improvement - Minor bugfixes - Version bump Co-authored-by: Tasnim Kabir Sadik <tksadik92@gmail.com> Reviewed-on: wrenn/wrenn#45 Co-authored-by: pptx704 <rafeed@omukk.dev> Co-committed-by: pptx704 <rafeed@omukk.dev>
This commit is contained in:
@ -350,9 +350,23 @@ func runPtyLoop(
|
||||
defer wg.Done()
|
||||
defer cancel()
|
||||
|
||||
for msg := range inputCh {
|
||||
// Use a background context for unary RPCs so they complete
|
||||
// even if the stream context is being cancelled.
|
||||
// pending holds a non-input message dequeued during coalescing
|
||||
// that must be processed on the next iteration.
|
||||
var pending *wsPtyIn
|
||||
|
||||
for {
|
||||
var msg wsPtyIn
|
||||
if pending != nil {
|
||||
msg = *pending
|
||||
pending = nil
|
||||
} else {
|
||||
var ok bool
|
||||
msg, ok = <-inputCh
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
|
||||
switch msg.Type {
|
||||
@ -364,7 +378,7 @@ func runPtyLoop(
|
||||
}
|
||||
|
||||
// Coalesce: drain any queued input messages into a single RPC.
|
||||
data = coalescePtyInput(inputCh, data)
|
||||
data, pending = coalescePtyInput(inputCh, data)
|
||||
|
||||
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
|
||||
SandboxId: sandboxID,
|
||||
@ -418,24 +432,29 @@ func runPtyLoop(
|
||||
}
|
||||
}()
|
||||
|
||||
// When any pump cancels the context, close the websocket to unblock
|
||||
// the reader goroutine stuck in ReadMessage.
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
ws.conn.Close()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// coalescePtyInput drains any immediately-available "input" messages from the
|
||||
// channel and appends their decoded data to buf, reducing RPC call volume
|
||||
// during bursts of fast typing.
|
||||
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
|
||||
// during bursts of fast typing. Returns the coalesced buffer and any
|
||||
// non-input message that was dequeued (must be processed by the caller).
|
||||
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) ([]byte, *wsPtyIn) {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-ch:
|
||||
if !ok {
|
||||
return buf
|
||||
return buf, nil
|
||||
}
|
||||
if msg.Type != "input" {
|
||||
// Non-input message — can't coalesce. Put-back isn't possible
|
||||
// with channels, but resize/kill during a typing burst is rare
|
||||
// enough that dropping one is acceptable.
|
||||
return buf
|
||||
return buf, &msg
|
||||
}
|
||||
data, err := base64.StdEncoding.DecodeString(msg.Data)
|
||||
if err != nil {
|
||||
@ -443,7 +462,7 @@ func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
|
||||
}
|
||||
buf = append(buf, data...)
|
||||
default:
|
||||
return buf
|
||||
return buf, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,3 +162,58 @@ func (h *usersHandler) SetUserActive(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// SetUserAdmin handles PUT /v1/admin/users/{id}/admin
|
||||
// Grants or revokes platform admin status. Cannot remove the last admin.
|
||||
func (h *usersHandler) SetUserAdmin(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
userIDStr := chi.URLParam(r, "id")
|
||||
|
||||
userID, err := id.ParseUserID(userIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid user ID")
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Admin bool `json:"admin"`
|
||||
}
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
user, err := h.db.GetUserByID(r.Context(), userID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "user not found")
|
||||
return
|
||||
}
|
||||
|
||||
if user.IsAdmin == req.Admin {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
if req.Admin {
|
||||
if err := h.db.SetUserAdmin(r.Context(), db.SetUserAdminParams{
|
||||
ID: userID,
|
||||
IsAdmin: true,
|
||||
}); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "internal", "failed to update admin status")
|
||||
return
|
||||
}
|
||||
h.audit.LogUserGrantAdmin(r.Context(), ac, userID, user.Email)
|
||||
} else {
|
||||
affected, err := h.db.RevokeUserAdmin(r.Context(), userID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "internal", "failed to update admin status")
|
||||
return
|
||||
}
|
||||
if affected == 0 {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "cannot remove the last admin")
|
||||
return
|
||||
}
|
||||
h.audit.LogUserRevokeAdmin(r.Context(), ac, userID, user.Email)
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
@ -2346,6 +2346,54 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/admin/users/{id}/admin:
|
||||
put:
|
||||
summary: Grant or revoke platform admin
|
||||
operationId: setUserAdmin
|
||||
tags: [admin]
|
||||
description: |
|
||||
Sets the platform admin flag on a user. Cannot remove the last admin.
|
||||
Requires platform admin access (JWT + is_admin).
|
||||
The target user's JWT is not re-issued — their frontend will reflect the
|
||||
change on next login or team switch.
|
||||
security:
|
||||
- bearerAuth: []
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
example: "usr-a1b2c3d4"
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required: [admin]
|
||||
properties:
|
||||
admin:
|
||||
type: boolean
|
||||
description: true to grant admin, false to revoke.
|
||||
responses:
|
||||
"204":
|
||||
description: Admin status updated
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
"403":
|
||||
description: Caller is not a platform admin
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"404":
|
||||
description: User not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
apiKeyAuth:
|
||||
|
||||
@ -269,6 +269,7 @@ func New(
|
||||
r.Delete("/teams/{id}", teamH.AdminDeleteTeam)
|
||||
r.Get("/users", usersH.AdminListUsers)
|
||||
r.Put("/users/{id}/active", usersH.SetUserActive)
|
||||
r.Put("/users/{id}/admin", usersH.SetUserAdmin)
|
||||
r.Get("/audit-logs", auditH.AdminList)
|
||||
r.Get("/templates", buildH.ListTemplates)
|
||||
r.Delete("/templates/{name}", buildH.DeleteTemplate)
|
||||
|
||||
@ -135,6 +135,20 @@ func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
defer tracker.Release()
|
||||
|
||||
// Derive request context from the tracker's context so ForceClose()
|
||||
// during pause aborts this proxied request.
|
||||
trackerCtx := tracker.Context()
|
||||
reqCtx, reqCancel := context.WithCancel(r.Context())
|
||||
defer reqCancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-trackerCtx.Done():
|
||||
reqCancel()
|
||||
case <-reqCtx.Done():
|
||||
}
|
||||
}()
|
||||
r = r.WithContext(reqCtx)
|
||||
|
||||
proxy := h.getOrCreateProxy(sandboxID, port, fmt.Sprintf("%s:%d", hostIP, portNum))
|
||||
proxy.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ type SandboxStatus string
|
||||
const (
|
||||
StatusPending SandboxStatus = "pending"
|
||||
StatusRunning SandboxStatus = "running"
|
||||
StatusPausing SandboxStatus = "pausing"
|
||||
StatusPaused SandboxStatus = "paused"
|
||||
StatusStopped SandboxStatus = "stopped"
|
||||
StatusError SandboxStatus = "error"
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package sandbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -17,6 +18,20 @@ type ConnTracker struct {
|
||||
// goroutine to exit, preventing goroutine leaks on repeated pause failures.
|
||||
cancelMu sync.Mutex
|
||||
cancelDrain chan struct{}
|
||||
|
||||
// ctx is cancelled by ForceClose to abort all in-flight proxy requests.
|
||||
// Initialized lazily on first Acquire; replaced by Reset after a failed
|
||||
// pause so new connections get a fresh, non-cancelled context.
|
||||
ctxMu sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// ensureCtx lazily initializes the cancellable context.
|
||||
func (t *ConnTracker) ensureCtx() {
|
||||
if t.ctx == nil {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire registers one in-flight connection. Returns false if the tracker
|
||||
@ -35,6 +50,16 @@ func (t *ConnTracker) Acquire() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Context returns a context that is cancelled when ForceClose is called.
|
||||
// Proxy handlers should derive their request context from this so that
|
||||
// force-close during pause aborts in-flight proxied requests.
|
||||
func (t *ConnTracker) Context() context.Context {
|
||||
t.ctxMu.Lock()
|
||||
defer t.ctxMu.Unlock()
|
||||
t.ensureCtx()
|
||||
return t.ctx
|
||||
}
|
||||
|
||||
// Release marks one connection as complete. Must be called exactly once
|
||||
// per successful Acquire.
|
||||
func (t *ConnTracker) Release() {
|
||||
@ -65,9 +90,33 @@ func (t *ConnTracker) Drain(timeout time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// ForceClose cancels all in-flight proxy connections by cancelling the
|
||||
// shared context. Connections whose request context derives from Context()
|
||||
// will see their requests aborted, causing the proxy handler to return
|
||||
// and call Release(). Waits briefly for connections to actually release.
|
||||
func (t *ConnTracker) ForceClose() {
|
||||
t.ctxMu.Lock()
|
||||
if t.cancel != nil {
|
||||
t.cancel()
|
||||
}
|
||||
t.ctxMu.Unlock()
|
||||
|
||||
// Wait briefly for force-closed connections to call Release().
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
// Reset re-enables the tracker after a failed drain. This allows the
|
||||
// sandbox to accept proxy connections again if the pause operation fails
|
||||
// and the VM is resumed. It also cancels any lingering Drain goroutine.
|
||||
// and the VM is resumed. It also cancels any lingering Drain goroutine
|
||||
// and creates a fresh context for new connections.
|
||||
func (t *ConnTracker) Reset() {
|
||||
t.cancelMu.Lock()
|
||||
if t.cancelDrain != nil {
|
||||
@ -81,5 +130,10 @@ func (t *ConnTracker) Reset() {
|
||||
}
|
||||
t.cancelMu.Unlock()
|
||||
|
||||
// Replace the cancelled context with a fresh one.
|
||||
t.ctxMu.Lock()
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.ctxMu.Unlock()
|
||||
|
||||
t.draining.Store(false)
|
||||
}
|
||||
|
||||
@ -95,10 +95,10 @@ type snapshotParent struct {
|
||||
}
|
||||
|
||||
// maxDiffGenerations caps how many incremental diff generations we chain
|
||||
// before falling back to a Full snapshot to collapse the chain. Long diff
|
||||
// chains increase restore latency and snapshot directory size; a periodic
|
||||
// Full snapshot resets the counter and produces a clean base.
|
||||
const maxDiffGenerations = 8
|
||||
// before merging diffs into a single file. Since UFFD lazy-loads memory
|
||||
// anyway, we merge on every re-pause to keep exactly 1 diff file per
|
||||
// snapshot — no accumulated chain, no extra restore overhead.
|
||||
const maxDiffGenerations = 1
|
||||
|
||||
// buildMetadata constructs the metadata map with version information.
|
||||
func (m *Manager) buildMetadata(envdVersion string) map[string]string {
|
||||
@ -186,9 +186,12 @@ func (m *Manager) Create(ctx context.Context, sandboxID string, teamID, template
|
||||
}
|
||||
|
||||
// Create dm-snapshot with per-sandbox CoW file.
|
||||
// CoW must be at least as large as the origin — if every block is
|
||||
// rewritten, the CoW stores a full copy. Undersized CoW causes
|
||||
// dm-snapshot invalidation → EIO on all guest I/O.
|
||||
dmName := "wrenn-" + sandboxID
|
||||
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
||||
cowSize := int64(diskSizeMB) * 1024 * 1024
|
||||
cowSize := max(int64(diskSizeMB)*1024*1024, originSize)
|
||||
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
||||
if err != nil {
|
||||
m.loops.Release(baseRootfs)
|
||||
@ -374,28 +377,43 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
|
||||
}
|
||||
|
||||
// Mark sandbox as pausing to block new exec/file/PTY operations.
|
||||
m.mu.Lock()
|
||||
sb.Status = models.StatusPausing
|
||||
m.mu.Unlock()
|
||||
|
||||
// restoreRunning reverts state if any pre-freeze step fails.
|
||||
restoreRunning := func() {
|
||||
_ = m.vm.UpdateBalloon(context.Background(), sandboxID, 0)
|
||||
sb.connTracker.Reset()
|
||||
m.mu.Lock()
|
||||
sb.Status = models.StatusRunning
|
||||
m.mu.Unlock()
|
||||
m.startSampler(sb)
|
||||
}
|
||||
|
||||
// Stop the metrics sampler goroutine before tearing down any resources
|
||||
// it reads (dm device, Firecracker PID). Without this, the sampler
|
||||
// leaks on every successful pause.
|
||||
m.stopSampler(sb)
|
||||
|
||||
// Step 0: Drain in-flight proxy connections before freezing vCPUs.
|
||||
// Stale TCP state from mid-flight connections causes issues on restore.
|
||||
sb.connTracker.Drain(2 * time.Second)
|
||||
slog.Debug("pause: proxy connections drained", "id", sandboxID)
|
||||
// ── Step 1: Isolate from external traffic ─────────────────────────
|
||||
// Drain in-flight proxy connections (grace period for clean shutdown).
|
||||
sb.connTracker.Drain(5 * time.Second)
|
||||
// Force-close any connections that didn't finish during grace period.
|
||||
sb.connTracker.ForceClose()
|
||||
slog.Debug("pause: external connections closed", "id", sandboxID)
|
||||
|
||||
// Step 0b: Close host-side idle connections to envd. Done before
|
||||
// PrepareSnapshot so FIN packets propagate to the guest during the
|
||||
// PrepareSnapshot window (no extra sleep needed).
|
||||
// Close host-side idle connections to envd so FIN packets propagate
|
||||
// to the guest kernel before snapshot.
|
||||
sb.client.CloseIdleConnections()
|
||||
slog.Debug("pause: envd client idle connections closed", "id", sandboxID)
|
||||
|
||||
// Step 0c: Signal envd to quiesce (stop port scanner/forwarder, mark
|
||||
// connections for post-restore cleanup). The 3s timeout also gives time
|
||||
// for the FINs from Step 0b to be processed by the guest kernel.
|
||||
// Best-effort: a failure is logged but does not abort the pause.
|
||||
// ── Step 2: Drop page cache ──────────────────────────────────────
|
||||
// Signal envd to quiesce: drops page cache, stops port subsystem,
|
||||
// marks connections for post-restore cleanup. Page cache drop can
|
||||
// take significant time on large-memory VMs (20GB+).
|
||||
func() {
|
||||
prepCtx, prepCancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
prepCtx, prepCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer prepCancel()
|
||||
if err := sb.client.PrepareSnapshot(prepCtx); err != nil {
|
||||
slog.Warn("pause: pre-snapshot quiesce failed (best-effort)", "id", sandboxID, "error", err)
|
||||
@ -404,11 +422,37 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
}
|
||||
}()
|
||||
|
||||
// ── Step 3: Inflate balloon to reclaim free guest memory ─────────
|
||||
// Freed pages become zero from FC's perspective, so ProcessMemfile
|
||||
// skips them → dramatically smaller memfile (e.g. 20GB → 1GB).
|
||||
func() {
|
||||
memUsed, err := readEnvdMemUsed(sb.client)
|
||||
if err != nil {
|
||||
slog.Debug("pause: could not read guest memory, skipping balloon inflate", "id", sandboxID, "error", err)
|
||||
return
|
||||
}
|
||||
usedMiB := int(memUsed / (1024 * 1024))
|
||||
keepMiB := max(usedMiB*2, 256) + 128
|
||||
inflateMiB := sb.MemoryMB - keepMiB
|
||||
if inflateMiB <= 0 {
|
||||
slog.Debug("pause: not enough free memory for balloon inflate", "id", sandboxID, "used_mib", usedMiB, "total_mib", sb.MemoryMB)
|
||||
return
|
||||
}
|
||||
balloonCtx, balloonCancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer balloonCancel()
|
||||
if err := m.vm.UpdateBalloon(balloonCtx, sandboxID, inflateMiB); err != nil {
|
||||
slog.Debug("pause: balloon inflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||
return
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
slog.Info("pause: balloon inflated", "id", sandboxID, "inflate_mib", inflateMiB, "guest_used_mib", usedMiB)
|
||||
}()
|
||||
|
||||
// ── Step 4: Freeze vCPUs ─────────────────────────────────────────
|
||||
pauseStart := time.Now()
|
||||
|
||||
// Step 1: Pause the VM (freeze vCPUs).
|
||||
if err := m.vm.Pause(ctx, sandboxID); err != nil {
|
||||
sb.connTracker.Reset()
|
||||
restoreRunning()
|
||||
return fmt.Errorf("pause VM: %w", err)
|
||||
}
|
||||
slog.Debug("pause: VM paused", "id", sandboxID, "elapsed", time.Since(pauseStart))
|
||||
@ -423,13 +467,23 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
|
||||
// resumeOnError unpauses the VM so the sandbox stays usable when a
|
||||
// post-freeze step fails. If the resume itself fails, the sandbox is
|
||||
// left frozen — the caller should destroy it. It also resets the
|
||||
// connection tracker so the sandbox can accept proxy connections again.
|
||||
// frozen and unrecoverable — destroy it to avoid a zombie.
|
||||
resumeOnError := func() {
|
||||
sb.connTracker.Reset()
|
||||
if err := m.vm.Resume(ctx, sandboxID); err != nil {
|
||||
slog.Error("failed to resume VM after pause error — sandbox is frozen", "id", sandboxID, "error", err)
|
||||
// Use a fresh context — the caller's ctx may already be cancelled.
|
||||
resumeCtx, resumeCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer resumeCancel()
|
||||
if err := m.vm.Resume(resumeCtx, sandboxID); err != nil {
|
||||
slog.Error("failed to resume VM after pause error — destroying frozen sandbox", "id", sandboxID, "error", err)
|
||||
m.cleanup(context.Background(), sb)
|
||||
m.mu.Lock()
|
||||
delete(m.boxes, sandboxID)
|
||||
m.mu.Unlock()
|
||||
if m.onDestroy != nil {
|
||||
m.onDestroy(sandboxID)
|
||||
}
|
||||
return
|
||||
}
|
||||
restoreRunning()
|
||||
}
|
||||
|
||||
// Step 2: Take VM state snapshot (snapfile + memfile).
|
||||
@ -444,6 +498,7 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
|
||||
snapshotStart := time.Now()
|
||||
if err := m.vm.Snapshot(ctx, sandboxID, snapPath, rawMemPath, snapshotType); err != nil {
|
||||
slog.Error("pause: snapshot failed", "id", sandboxID, "type", snapshotType, "elapsed", time.Since(snapshotStart), "error", err)
|
||||
warnErr("snapshot dir cleanup error", sandboxID, os.RemoveAll(pauseDir))
|
||||
resumeOnError()
|
||||
return fmt.Errorf("create VM snapshot: %w", err)
|
||||
@ -795,6 +850,12 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string, timeoutSec int,
|
||||
slog.Warn("post-init failed after resume, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
||||
}
|
||||
|
||||
// Deflate balloon — the snapshot was taken with an inflated balloon to
|
||||
// reduce memfile size, so restore the guest's full memory allocation.
|
||||
if err := m.vm.UpdateBalloon(ctx, sandboxID, 0); err != nil {
|
||||
slog.Debug("resume: balloon deflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||
}
|
||||
|
||||
// Fetch envd version (best-effort).
|
||||
envdVersion, _ := client.FetchVersion(ctx)
|
||||
|
||||
@ -1134,7 +1195,7 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
|
||||
|
||||
dmName := "wrenn-" + sandboxID
|
||||
cowPath := filepath.Join(layout.SandboxesDir(m.cfg.WrennDir), fmt.Sprintf("%s.cow", sandboxID))
|
||||
cowSize := int64(diskSizeMB) * 1024 * 1024
|
||||
cowSize := max(int64(diskSizeMB)*1024*1024, originSize)
|
||||
dmDev, err := devicemapper.CreateSnapshot(dmName, originLoop, cowPath, originSize, cowSize)
|
||||
if err != nil {
|
||||
source.Close()
|
||||
@ -1235,6 +1296,11 @@ func (m *Manager) createFromSnapshot(ctx context.Context, sandboxID string, team
|
||||
slog.Warn("post-init failed after template restore, metadata files may be stale", "sandbox", sandboxID, "error", err)
|
||||
}
|
||||
|
||||
// Deflate balloon — template snapshot was taken with an inflated balloon.
|
||||
if err := m.vm.UpdateBalloon(ctx, sandboxID, 0); err != nil {
|
||||
slog.Debug("create-from-snapshot: balloon deflate failed (non-fatal)", "id", sandboxID, "error", err)
|
||||
}
|
||||
|
||||
// Fetch envd version (best-effort).
|
||||
envdVersion, _ := client.FetchVersion(ctx)
|
||||
|
||||
@ -1720,12 +1786,12 @@ func (m *Manager) startSampler(sb *sandboxState) {
|
||||
go m.samplerLoop(ctx, sb, fcPID, sb.VCPUs, initialCPU)
|
||||
}
|
||||
|
||||
// samplerLoop samples /proc metrics at 500ms intervals.
|
||||
// samplerLoop samples metrics at 1s intervals.
|
||||
// lastCPU is goroutine-local to avoid shared-state races.
|
||||
func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpus int, lastCPU cpuStat) {
|
||||
defer close(sb.samplerDone)
|
||||
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
clkTck := 100.0 // sysconf(_SC_CLK_TCK), almost always 100 on Linux
|
||||
@ -1758,8 +1824,11 @@ func (m *Manager) samplerLoop(ctx context.Context, sb *sandboxState, fcPID, vcpu
|
||||
cpuInitialized = true
|
||||
}
|
||||
|
||||
// Memory: VmRSS of the Firecracker process.
|
||||
memBytes, _ := readMemRSS(fcPID)
|
||||
// Memory: guest-reported used memory from envd /metrics.
|
||||
// VmRSS of the Firecracker process includes guest page cache
|
||||
// and never decreases, so we use the guest's own view which
|
||||
// reports total - available (actual process memory).
|
||||
memBytes, _ := readEnvdMemUsed(sb.client)
|
||||
|
||||
// Disk: allocated bytes of the CoW sparse file.
|
||||
var diskBytes int64
|
||||
|
||||
@ -15,11 +15,11 @@ type MetricPoint struct {
|
||||
|
||||
// Ring buffer capacity constants.
|
||||
const (
|
||||
ring10mCap = 1200 // 500ms × 1200 = 10 min
|
||||
ring2hCap = 240 // 30s × 240 = 2 h
|
||||
ring24hCap = 288 // 5min × 288 = 24 h
|
||||
ring10mCap = 600 // 1s × 600 = 10 min
|
||||
ring2hCap = 240 // 30s × 240 = 2 h
|
||||
ring24hCap = 288 // 5min × 288 = 24 h
|
||||
|
||||
downsample2hEvery = 60 // 60 × 500ms = 30s
|
||||
downsample2hEvery = 30 // 30 × 1s = 30s
|
||||
downsample24hEvery = 10 // 10 × 30s = 5min
|
||||
)
|
||||
|
||||
@ -44,8 +44,8 @@ type metricsRing struct {
|
||||
count24h int
|
||||
|
||||
// Accumulators for downsampling.
|
||||
acc500ms [downsample2hEvery]MetricPoint
|
||||
acc500msN int
|
||||
acc1s [downsample2hEvery]MetricPoint
|
||||
acc1sN int
|
||||
|
||||
acc30s [downsample24hEvery]MetricPoint
|
||||
acc30sN int
|
||||
@ -56,7 +56,7 @@ func newMetricsRing() *metricsRing {
|
||||
return &metricsRing{}
|
||||
}
|
||||
|
||||
// Push adds a 500ms sample to the finest tier and triggers downsampling
|
||||
// Push adds a 1s sample to the finest tier and triggers downsampling
|
||||
// into coarser tiers when enough samples have accumulated.
|
||||
func (r *metricsRing) Push(p MetricPoint) {
|
||||
r.mu.Lock()
|
||||
@ -70,12 +70,12 @@ func (r *metricsRing) Push(p MetricPoint) {
|
||||
}
|
||||
|
||||
// Accumulate for 2h downsample.
|
||||
r.acc500ms[r.acc500msN] = p
|
||||
r.acc500msN++
|
||||
if r.acc500msN == downsample2hEvery {
|
||||
avg := averagePoints(r.acc500ms[:downsample2hEvery])
|
||||
r.acc1s[r.acc1sN] = p
|
||||
r.acc1sN++
|
||||
if r.acc1sN == downsample2hEvery {
|
||||
avg := averagePoints(r.acc1s[:downsample2hEvery])
|
||||
r.push2h(avg)
|
||||
r.acc500msN = 0
|
||||
r.acc1sN = 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,7 +138,7 @@ func (r *metricsRing) Flush() (pts10m, pts2h, pts24h []MetricPoint) {
|
||||
r.idx10m, r.count10m = 0, 0
|
||||
r.idx2h, r.count2h = 0, 0
|
||||
r.idx24h, r.count24h = 0, 0
|
||||
r.acc500msN = 0
|
||||
r.acc1sN = 0
|
||||
r.acc30sN = 0
|
||||
|
||||
return pts10m, pts2h, pts24h
|
||||
|
||||
@ -1,11 +1,15 @@
|
||||
package sandbox
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/internal/envdclient"
|
||||
)
|
||||
|
||||
// cpuStat holds raw CPU jiffies read from /proc/{pid}/stat.
|
||||
@ -24,16 +28,11 @@ func readCPUStat(pid int) (cpuStat, error) {
|
||||
return cpuStat{}, fmt.Errorf("read stat: %w", err)
|
||||
}
|
||||
|
||||
// /proc/{pid}/stat format: pid (comm) state fields...
|
||||
// The comm field may contain spaces and parens, so find the last ')' first.
|
||||
content := string(data)
|
||||
idx := strings.LastIndex(content, ")")
|
||||
if idx < 0 {
|
||||
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: no closing paren", pid)
|
||||
}
|
||||
// After ")" there is " state field3 field4 ... fieldN"
|
||||
// field1 after ')' is state (index 0), utime is field 11, stime is field 12
|
||||
// (0-indexed from after the closing paren).
|
||||
fields := strings.Fields(content[idx+2:])
|
||||
if len(fields) < 13 {
|
||||
return cpuStat{}, fmt.Errorf("malformed /proc/%d/stat: too few fields (%d)", pid, len(fields))
|
||||
@ -49,27 +48,34 @@ func readCPUStat(pid int) (cpuStat, error) {
|
||||
return cpuStat{utime: utime, stime: stime}, nil
|
||||
}
|
||||
|
||||
// readMemRSS reads VmRSS from /proc/{pid}/status and returns bytes.
|
||||
func readMemRSS(pid int) (int64, error) {
|
||||
path := fmt.Sprintf("/proc/%d/status", pid)
|
||||
data, err := os.ReadFile(path)
|
||||
// readEnvdMemUsed fetches mem_used from envd's /metrics endpoint. Returns
|
||||
// guest-side total - MemAvailable (actual process memory, excluding reclaimable
|
||||
// page cache). VmRSS of the Firecracker process includes guest page cache and
|
||||
// never decreases, so this is the accurate metric for dashboard display.
|
||||
func readEnvdMemUsed(client *envdclient.Client) (int64, error) {
|
||||
resp, err := client.HTTPClient().Get(client.BaseURL() + "/metrics")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read status: %w", err)
|
||||
return 0, fmt.Errorf("fetch envd metrics: %w", err)
|
||||
}
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
if strings.HasPrefix(line, "VmRSS:") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 2 {
|
||||
return 0, fmt.Errorf("malformed VmRSS line")
|
||||
}
|
||||
kb, err := strconv.ParseInt(fields[1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("parse VmRSS: %w", err)
|
||||
}
|
||||
return kb * 1024, nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return 0, fmt.Errorf("envd metrics: status %d", resp.StatusCode)
|
||||
}
|
||||
return 0, fmt.Errorf("VmRSS not found in /proc/%d/status", pid)
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read envd metrics body: %w", err)
|
||||
}
|
||||
|
||||
var m struct {
|
||||
MemUsed int64 `json:"mem_used"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &m); err != nil {
|
||||
return 0, fmt.Errorf("decode envd metrics: %w", err)
|
||||
}
|
||||
|
||||
return m.MemUsed, nil
|
||||
}
|
||||
|
||||
// readDiskAllocated returns the actual allocated bytes (not apparent size)
|
||||
|
||||
@ -29,6 +29,10 @@ import (
|
||||
|
||||
const (
|
||||
UFFD_EVENT_PAGEFAULT = C.UFFD_EVENT_PAGEFAULT
|
||||
UFFD_EVENT_FORK = C.UFFD_EVENT_FORK
|
||||
UFFD_EVENT_REMAP = C.UFFD_EVENT_REMAP
|
||||
UFFD_EVENT_REMOVE = C.UFFD_EVENT_REMOVE
|
||||
UFFD_EVENT_UNMAP = C.UFFD_EVENT_UNMAP
|
||||
UFFD_PAGEFAULT_FLAG_WRITE = C.UFFD_PAGEFAULT_FLAG_WRITE
|
||||
UFFDIO_COPY = C.UFFDIO_COPY
|
||||
UFFDIO_COPY_MODE_WP = C.UFFDIO_COPY_MODE_WP
|
||||
|
||||
@ -253,8 +253,17 @@ func (s *Server) serve(ctx context.Context, uffdFd fd, mapping *Mapping) error {
|
||||
}
|
||||
|
||||
msg := *(*uffdMsg)(unsafe.Pointer(&buf[0]))
|
||||
if getMsgEvent(&msg) != UFFD_EVENT_PAGEFAULT {
|
||||
return fmt.Errorf("unexpected uffd event type: %d", getMsgEvent(&msg))
|
||||
event := getMsgEvent(&msg)
|
||||
|
||||
switch event {
|
||||
case UFFD_EVENT_PAGEFAULT:
|
||||
// Handled below.
|
||||
case UFFD_EVENT_REMOVE, UFFD_EVENT_UNMAP, UFFD_EVENT_REMAP, UFFD_EVENT_FORK:
|
||||
// Non-fatal lifecycle events from the guest kernel (e.g. balloon
|
||||
// deflation, mmap/munmap). No action needed — continue polling.
|
||||
continue
|
||||
default:
|
||||
return fmt.Errorf("unexpected uffd event type: %d", event)
|
||||
}
|
||||
|
||||
arg := getMsgArg(&msg)
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fcClient talks to the Firecracker HTTP API over a Unix socket.
|
||||
@ -27,7 +26,9 @@ func newFCClient(socketPath string) *fcClient {
|
||||
return d.DialContext(ctx, "unix", socketPath)
|
||||
},
|
||||
},
|
||||
Timeout: 10 * time.Second,
|
||||
// No global timeout — callers pass context.Context with appropriate
|
||||
// deadlines. A fixed 10s timeout was too short for snapshot/resume
|
||||
// operations on large-memory VMs (20GB+ memfiles).
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -136,6 +137,25 @@ func (c *fcClient) setMMDS(ctx context.Context, sandboxID, templateID string) er
|
||||
})
|
||||
}
|
||||
|
||||
// setBalloon configures the Firecracker balloon device for dynamic memory
|
||||
// management. deflateOnOom lets the guest reclaim balloon pages under memory
|
||||
// pressure. statsInterval enables periodic stats via GET /balloon/statistics.
|
||||
// Must be called before startVM.
|
||||
func (c *fcClient) setBalloon(ctx context.Context, amountMiB int, deflateOnOom bool, statsIntervalS int) error {
|
||||
return c.do(ctx, http.MethodPut, "/balloon", map[string]any{
|
||||
"amount_mib": amountMiB,
|
||||
"deflate_on_oom": deflateOnOom,
|
||||
"stats_polling_interval_s": statsIntervalS,
|
||||
})
|
||||
}
|
||||
|
||||
// updateBalloon adjusts the balloon target at runtime.
|
||||
func (c *fcClient) updateBalloon(ctx context.Context, amountMiB int) error {
|
||||
return c.do(ctx, http.MethodPatch, "/balloon", map[string]any{
|
||||
"amount_mib": amountMiB,
|
||||
})
|
||||
}
|
||||
|
||||
// startVM issues the InstanceStart action.
|
||||
func (c *fcClient) startVM(ctx context.Context) error {
|
||||
return c.do(ctx, http.MethodPut, "/actions", map[string]string{
|
||||
|
||||
@ -119,6 +119,13 @@ func configureVM(ctx context.Context, client *fcClient, cfg *VMConfig) error {
|
||||
return fmt.Errorf("set machine config: %w", err)
|
||||
}
|
||||
|
||||
// Balloon device — allows the host to reclaim unused guest memory.
|
||||
// Start with 0 (no inflation). deflate_on_oom lets the guest reclaim
|
||||
// balloon pages under memory pressure. Stats interval enables monitoring.
|
||||
if err := client.setBalloon(ctx, 0, true, 5); err != nil {
|
||||
slog.Warn("set balloon failed (non-fatal, VM will run without memory reclaim)", "error", err)
|
||||
}
|
||||
|
||||
// MMDS config — enable V2 token access on eth0 so that envd can read
|
||||
// WRENN_SANDBOX_ID and WRENN_TEMPLATE_ID from inside the guest.
|
||||
if err := client.setMMDSConfig(ctx, "eth0"); err != nil {
|
||||
@ -162,6 +169,19 @@ func (m *Manager) Resume(ctx context.Context, sandboxID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateBalloon adjusts the balloon target for a running VM.
|
||||
// amountMiB is memory to take FROM the guest (0 = give all back).
|
||||
func (m *Manager) UpdateBalloon(ctx context.Context, sandboxID string, amountMiB int) error {
|
||||
m.mu.RLock()
|
||||
vm, ok := m.vms[sandboxID]
|
||||
m.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("VM not found: %s", sandboxID)
|
||||
}
|
||||
|
||||
return vm.client.updateBalloon(ctx, amountMiB)
|
||||
}
|
||||
|
||||
// Destroy stops and cleans up a VM.
|
||||
func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
|
||||
m.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user