Add minimal control plane with REST API, database, and reconciler

- REST API (chi router): sandbox CRUD, exec, pause/resume, file write/read
- PostgreSQL persistence via pgx/v5 + sqlc (sandboxes table with goose migration)
- Connect RPC client to host agent for all VM operations
- Reconciler syncs host agent state with DB every 30s (detects TTL-reaped sandboxes)
- OpenAPI 3.1 spec served at /openapi.yaml, Swagger UI at /docs
- Added WriteFile/ReadFile RPCs to hostagent proto and implementations
- File upload via multipart form, download via JSON body POST
- sandbox_id propagated from control plane to host agent on create
This commit is contained in:
2026-03-10 16:50:12 +06:00
parent d7b25b0891
commit ec3360d9ad
46 changed files with 2210 additions and 33 deletions

View File

@ -0,0 +1 @@
package admin

View File

@ -0,0 +1,124 @@
package api
import (
"encoding/base64"
"encoding/json"
"net/http"
"time"
"unicode/utf8"
"connectrpc.com/connect"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"git.omukk.dev/wrenn/sandbox/internal/db"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
)
type execHandler struct {
db *db.Queries
agent hostagentv1connect.HostAgentServiceClient
}
func newExecHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *execHandler {
return &execHandler{db: db, agent: agent}
}
type execRequest struct {
Cmd string `json:"cmd"`
Args []string `json:"args"`
TimeoutSec int32 `json:"timeout_sec"`
}
type execResponse struct {
SandboxID string `json:"sandbox_id"`
Cmd string `json:"cmd"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
ExitCode int32 `json:"exit_code"`
DurationMs int64 `json:"duration_ms"`
// Encoding is "utf-8" for text output, "base64" for binary output.
Encoding string `json:"encoding"`
}
// Exec handles POST /v1/sandboxes/{id}/exec.
func (h *execHandler) Exec(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
sb, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
if sb.Status != "running" {
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")")
return
}
var req execRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
return
}
if req.Cmd == "" {
writeError(w, http.StatusBadRequest, "invalid_request", "cmd is required")
return
}
start := time.Now()
resp, err := h.agent.Exec(ctx, connect.NewRequest(&pb.ExecRequest{
SandboxId: sandboxID,
Cmd: req.Cmd,
Args: req.Args,
TimeoutSec: req.TimeoutSec,
}))
if err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
duration := time.Since(start)
// Update last active.
h.db.UpdateLastActive(ctx, db.UpdateLastActiveParams{
ID: sandboxID,
LastActiveAt: pgtype.Timestamptz{
Time: time.Now(),
Valid: true,
},
})
// Use base64 encoding if output contains non-UTF-8 bytes.
stdout := resp.Msg.Stdout
stderr := resp.Msg.Stderr
encoding := "utf-8"
if !utf8.Valid(stdout) || !utf8.Valid(stderr) {
encoding = "base64"
writeJSON(w, http.StatusOK, execResponse{
SandboxID: sandboxID,
Cmd: req.Cmd,
Stdout: base64.StdEncoding.EncodeToString(stdout),
Stderr: base64.StdEncoding.EncodeToString(stderr),
ExitCode: resp.Msg.ExitCode,
DurationMs: duration.Milliseconds(),
Encoding: encoding,
})
return
}
writeJSON(w, http.StatusOK, execResponse{
SandboxID: sandboxID,
Cmd: req.Cmd,
Stdout: string(stdout),
Stderr: string(stderr),
ExitCode: resp.Msg.ExitCode,
DurationMs: duration.Milliseconds(),
Encoding: encoding,
})
}

View File

@ -0,0 +1,132 @@
package api
import (
"encoding/json"
"errors"
"io"
"net/http"
"connectrpc.com/connect"
"github.com/go-chi/chi/v5"
"git.omukk.dev/wrenn/sandbox/internal/db"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
)
type filesHandler struct {
db *db.Queries
agent hostagentv1connect.HostAgentServiceClient
}
func newFilesHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *filesHandler {
return &filesHandler{db: db, agent: agent}
}
// Upload handles POST /v1/sandboxes/{id}/files/write.
// Expects multipart/form-data with:
// - "path" text field: absolute destination path inside the sandbox
// - "file" file field: binary content to write
func (h *filesHandler) Upload(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
sb, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
if sb.Status != "running" {
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running")
return
}
// Limit to 100 MB.
r.Body = http.MaxBytesReader(w, r.Body, 100<<20)
if err := r.ParseMultipartForm(100 << 20); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
writeError(w, http.StatusRequestEntityTooLarge, "too_large", "file exceeds 100 MB limit")
return
}
writeError(w, http.StatusBadRequest, "invalid_request", "expected multipart/form-data")
return
}
filePath := r.FormValue("path")
if filePath == "" {
writeError(w, http.StatusBadRequest, "invalid_request", "path field is required")
return
}
file, _, err := r.FormFile("file")
if err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "file field is required")
return
}
defer file.Close()
content, err := io.ReadAll(file)
if err != nil {
writeError(w, http.StatusInternalServerError, "read_error", "failed to read uploaded file")
return
}
if _, err := h.agent.WriteFile(ctx, connect.NewRequest(&pb.WriteFileRequest{
SandboxId: sandboxID,
Path: filePath,
Content: content,
})); err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
w.WriteHeader(http.StatusNoContent)
}
type readFileRequest struct {
Path string `json:"path"`
}
// Download handles POST /v1/sandboxes/{id}/files/read.
// Accepts JSON body with path, returns raw file content with Content-Disposition.
func (h *filesHandler) Download(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
sb, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
if sb.Status != "running" {
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running")
return
}
var req readFileRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
return
}
if req.Path == "" {
writeError(w, http.StatusBadRequest, "invalid_request", "path is required")
return
}
resp, err := h.agent.ReadFile(ctx, connect.NewRequest(&pb.ReadFileRequest{
SandboxId: sandboxID,
Path: req.Path,
}))
if err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(resp.Msg.Content)
}

View File

@ -0,0 +1,277 @@
package api
import (
"encoding/json"
"log/slog"
"net/http"
"time"
"connectrpc.com/connect"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
"git.omukk.dev/wrenn/sandbox/internal/db"
"git.omukk.dev/wrenn/sandbox/internal/id"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
)
type sandboxHandler struct {
db *db.Queries
agent hostagentv1connect.HostAgentServiceClient
}
func newSandboxHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *sandboxHandler {
return &sandboxHandler{db: db, agent: agent}
}
type createSandboxRequest struct {
Template string `json:"template"`
VCPUs int32 `json:"vcpus"`
MemoryMB int32 `json:"memory_mb"`
TimeoutSec int32 `json:"timeout_sec"`
}
type sandboxResponse struct {
ID string `json:"id"`
Status string `json:"status"`
Template string `json:"template"`
VCPUs int32 `json:"vcpus"`
MemoryMB int32 `json:"memory_mb"`
TimeoutSec int32 `json:"timeout_sec"`
GuestIP string `json:"guest_ip,omitempty"`
HostIP string `json:"host_ip,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt *string `json:"started_at,omitempty"`
LastActiveAt *string `json:"last_active_at,omitempty"`
LastUpdated string `json:"last_updated"`
}
func sandboxToResponse(sb db.Sandbox) sandboxResponse {
resp := sandboxResponse{
ID: sb.ID,
Status: sb.Status,
Template: sb.Template,
VCPUs: sb.Vcpus,
MemoryMB: sb.MemoryMb,
TimeoutSec: sb.TimeoutSec,
GuestIP: sb.GuestIp,
HostIP: sb.HostIp,
}
if sb.CreatedAt.Valid {
resp.CreatedAt = sb.CreatedAt.Time.Format(time.RFC3339)
}
if sb.StartedAt.Valid {
s := sb.StartedAt.Time.Format(time.RFC3339)
resp.StartedAt = &s
}
if sb.LastActiveAt.Valid {
s := sb.LastActiveAt.Time.Format(time.RFC3339)
resp.LastActiveAt = &s
}
if sb.LastUpdated.Valid {
resp.LastUpdated = sb.LastUpdated.Time.Format(time.RFC3339)
}
return resp
}
// Create handles POST /v1/sandboxes.
func (h *sandboxHandler) Create(w http.ResponseWriter, r *http.Request) {
var req createSandboxRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
return
}
if req.Template == "" {
req.Template = "minimal"
}
if req.VCPUs <= 0 {
req.VCPUs = 1
}
if req.MemoryMB <= 0 {
req.MemoryMB = 512
}
if req.TimeoutSec <= 0 {
req.TimeoutSec = 300
}
ctx := r.Context()
sandboxID := id.NewSandboxID()
// Insert pending record.
sb, err := h.db.InsertSandbox(ctx, db.InsertSandboxParams{
ID: sandboxID,
OwnerID: "",
HostID: "default",
Template: req.Template,
Status: "pending",
Vcpus: req.VCPUs,
MemoryMb: req.MemoryMB,
TimeoutSec: req.TimeoutSec,
})
if err != nil {
slog.Error("failed to insert sandbox", "error", err)
writeError(w, http.StatusInternalServerError, "db_error", "failed to create sandbox record")
return
}
// Call host agent to create the sandbox.
resp, err := h.agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{
SandboxId: sandboxID,
Template: req.Template,
Vcpus: req.VCPUs,
MemoryMb: req.MemoryMB,
TimeoutSec: req.TimeoutSec,
}))
if err != nil {
h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
ID: sandboxID, Status: "error",
})
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
// Update to running.
now := time.Now()
sb, err = h.db.UpdateSandboxRunning(ctx, db.UpdateSandboxRunningParams{
ID: sandboxID,
HostIp: resp.Msg.HostIp,
GuestIp: "",
StartedAt: pgtype.Timestamptz{
Time: now,
Valid: true,
},
})
if err != nil {
writeError(w, http.StatusInternalServerError, "db_error", "failed to update sandbox status")
return
}
writeJSON(w, http.StatusCreated, sandboxToResponse(sb))
}
// List handles GET /v1/sandboxes.
func (h *sandboxHandler) List(w http.ResponseWriter, r *http.Request) {
sandboxes, err := h.db.ListSandboxes(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, "db_error", "failed to list sandboxes")
return
}
resp := make([]sandboxResponse, len(sandboxes))
for i, sb := range sandboxes {
resp[i] = sandboxToResponse(sb)
}
writeJSON(w, http.StatusOK, resp)
}
// Get handles GET /v1/sandboxes/{id}.
func (h *sandboxHandler) Get(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
sb, err := h.db.GetSandbox(r.Context(), sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
writeJSON(w, http.StatusOK, sandboxToResponse(sb))
}
// Pause handles POST /v1/sandboxes/{id}/pause.
func (h *sandboxHandler) Pause(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
sb, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
if sb.Status != "running" {
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running")
return
}
if _, err := h.agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{
SandboxId: sandboxID,
})); err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
sb, err = h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
ID: sandboxID, Status: "paused",
})
if err != nil {
writeError(w, http.StatusInternalServerError, "db_error", "failed to update status")
return
}
writeJSON(w, http.StatusOK, sandboxToResponse(sb))
}
// Resume handles POST /v1/sandboxes/{id}/resume.
func (h *sandboxHandler) Resume(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
sb, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
if sb.Status != "paused" {
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not paused")
return
}
if _, err := h.agent.ResumeSandbox(ctx, connect.NewRequest(&pb.ResumeSandboxRequest{
SandboxId: sandboxID,
})); err != nil {
status, code, msg := agentErrToHTTP(err)
writeError(w, status, code, msg)
return
}
sb, err = h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
ID: sandboxID, Status: "running",
})
if err != nil {
writeError(w, http.StatusInternalServerError, "db_error", "failed to update status")
return
}
writeJSON(w, http.StatusOK, sandboxToResponse(sb))
}
// Destroy handles DELETE /v1/sandboxes/{id}.
func (h *sandboxHandler) Destroy(w http.ResponseWriter, r *http.Request) {
sandboxID := chi.URLParam(r, "id")
ctx := r.Context()
_, err := h.db.GetSandbox(ctx, sandboxID)
if err != nil {
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
return
}
// Best-effort destroy on host agent — sandbox may already be gone (TTL reap).
if _, err := h.agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{
SandboxId: sandboxID,
})); err != nil {
slog.Warn("destroy: agent RPC failed (sandbox may already be gone)", "sandbox_id", sandboxID, "error", err)
}
if _, err := h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{
ID: sandboxID, Status: "stopped",
}); err != nil {
slog.Error("destroy: failed to update sandbox status in DB", "sandbox_id", sandboxID, "error", err)
}
w.WriteHeader(http.StatusNoContent)
}

View File

@ -0,0 +1 @@
package api

View File

@ -0,0 +1,72 @@
package api
import (
"encoding/json"
"log/slog"
"net/http"
"time"
"connectrpc.com/connect"
)
type errorResponse struct {
Error errorDetail `json:"error"`
}
type errorDetail struct {
Code string `json:"code"`
Message string `json:"message"`
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, code, message string) {
writeJSON(w, status, errorResponse{
Error: errorDetail{Code: code, Message: message},
})
}
// agentErrToHTTP maps a Connect RPC error to an HTTP status, error code, and message.
func agentErrToHTTP(err error) (int, string, string) {
switch connect.CodeOf(err) {
case connect.CodeNotFound:
return http.StatusNotFound, "not_found", err.Error()
case connect.CodeInvalidArgument:
return http.StatusBadRequest, "invalid_request", err.Error()
case connect.CodeFailedPrecondition:
return http.StatusConflict, "conflict", err.Error()
default:
return http.StatusBadGateway, "agent_error", err.Error()
}
}
// requestLogger returns middleware that logs each request.
func requestLogger() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
sw := &statusWriter{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(sw, r)
slog.Info("request",
"method", r.Method,
"path", r.URL.Path,
"status", sw.status,
"duration", time.Since(start),
)
})
}
}
type statusWriter struct {
http.ResponseWriter
status int
}
func (w *statusWriter) WriteHeader(status int) {
w.status = status
w.ResponseWriter.WriteHeader(status)
}

346
internal/api/openapi.yaml Normal file
View File

@ -0,0 +1,346 @@
openapi: "3.1.0"
info:
title: Wrenn Sandbox API
description: MicroVM-based code execution platform API.
version: "0.1.0"
servers:
- url: http://localhost:8080
description: Local development
paths:
/v1/sandboxes:
post:
summary: Create a sandbox
operationId: createSandbox
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/CreateSandboxRequest"
responses:
"201":
description: Sandbox created
content:
application/json:
schema:
$ref: "#/components/schemas/Sandbox"
"502":
description: Host agent error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
get:
summary: List all sandboxes
operationId: listSandboxes
responses:
"200":
description: List of sandboxes
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/Sandbox"
/v1/sandboxes/{id}:
parameters:
- name: id
in: path
required: true
schema:
type: string
get:
summary: Get sandbox details
operationId: getSandbox
responses:
"200":
description: Sandbox details
content:
application/json:
schema:
$ref: "#/components/schemas/Sandbox"
"404":
description: Sandbox not found
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
delete:
summary: Destroy a sandbox
operationId: destroySandbox
responses:
"204":
description: Sandbox destroyed
/v1/sandboxes/{id}/exec:
parameters:
- name: id
in: path
required: true
schema:
type: string
post:
summary: Execute a command
operationId: execCommand
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ExecRequest"
responses:
"200":
description: Command output
content:
application/json:
schema:
$ref: "#/components/schemas/ExecResponse"
"404":
description: Sandbox not found
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"409":
description: Sandbox not running
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/pause:
parameters:
- name: id
in: path
required: true
schema:
type: string
post:
summary: Pause a running sandbox
operationId: pauseSandbox
responses:
"200":
description: Sandbox paused
content:
application/json:
schema:
$ref: "#/components/schemas/Sandbox"
"409":
description: Sandbox not running
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/resume:
parameters:
- name: id
in: path
required: true
schema:
type: string
post:
summary: Resume a paused sandbox
operationId: resumeSandbox
responses:
"200":
description: Sandbox resumed
content:
application/json:
schema:
$ref: "#/components/schemas/Sandbox"
"409":
description: Sandbox not paused
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/files/write:
parameters:
- name: id
in: path
required: true
schema:
type: string
post:
summary: Upload a file
operationId: uploadFile
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
required: [path, file]
properties:
path:
type: string
description: Absolute destination path inside the sandbox
file:
type: string
format: binary
description: File content
responses:
"204":
description: File uploaded
"409":
description: Sandbox not running
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"413":
description: File too large
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/sandboxes/{id}/files/read:
parameters:
- name: id
in: path
required: true
schema:
type: string
post:
summary: Download a file
operationId: downloadFile
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ReadFileRequest"
responses:
"200":
description: File content
content:
application/octet-stream:
schema:
type: string
format: binary
"404":
description: Sandbox or file not found
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
schemas:
CreateSandboxRequest:
type: object
properties:
template:
type: string
default: minimal
vcpus:
type: integer
default: 1
memory_mb:
type: integer
default: 512
timeout_sec:
type: integer
default: 300
Sandbox:
type: object
properties:
id:
type: string
status:
type: string
enum: [pending, running, paused, stopped, error]
template:
type: string
vcpus:
type: integer
memory_mb:
type: integer
timeout_sec:
type: integer
guest_ip:
type: string
host_ip:
type: string
created_at:
type: string
format: date-time
started_at:
type: string
format: date-time
nullable: true
last_active_at:
type: string
format: date-time
nullable: true
last_updated:
type: string
format: date-time
ExecRequest:
type: object
required: [cmd]
properties:
cmd:
type: string
args:
type: array
items:
type: string
timeout_sec:
type: integer
default: 30
ExecResponse:
type: object
properties:
sandbox_id:
type: string
cmd:
type: string
stdout:
type: string
stderr:
type: string
exit_code:
type: integer
duration_ms:
type: integer
encoding:
type: string
enum: [utf-8, base64]
description: Output encoding. "base64" when stdout/stderr contain binary data.
ReadFileRequest:
type: object
required: [path]
properties:
path:
type: string
description: Absolute file path inside the sandbox
Error:
type: object
properties:
error:
type: object
properties:
code:
type: string
message:
type: string

View File

@ -0,0 +1,95 @@
package api
import (
"context"
"log/slog"
"time"
"connectrpc.com/connect"
"git.omukk.dev/wrenn/sandbox/internal/db"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
)
// Reconciler periodically compares the host agent's sandbox list with the DB
// and marks sandboxes that no longer exist on the host as stopped.
type Reconciler struct {
db *db.Queries
agent hostagentv1connect.HostAgentServiceClient
hostID string
interval time.Duration
}
// NewReconciler creates a new reconciler.
func NewReconciler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient, hostID string, interval time.Duration) *Reconciler {
return &Reconciler{
db: db,
agent: agent,
hostID: hostID,
interval: interval,
}
}
// Start runs the reconciliation loop until the context is cancelled.
func (rc *Reconciler) Start(ctx context.Context) {
go func() {
ticker := time.NewTicker(rc.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rc.reconcile(ctx)
}
}
}()
}
func (rc *Reconciler) reconcile(ctx context.Context) {
// Get all sandboxes the host agent knows about.
resp, err := rc.agent.ListSandboxes(ctx, connect.NewRequest(&pb.ListSandboxesRequest{}))
if err != nil {
slog.Warn("reconciler: failed to list sandboxes from host agent", "error", err)
return
}
// Build a set of sandbox IDs that are alive on the host.
alive := make(map[string]struct{}, len(resp.Msg.Sandboxes))
for _, sb := range resp.Msg.Sandboxes {
alive[sb.SandboxId] = struct{}{}
}
// Get all DB sandboxes for this host that are running or paused.
dbSandboxes, err := rc.db.ListSandboxesByHostAndStatus(ctx, db.ListSandboxesByHostAndStatusParams{
HostID: rc.hostID,
Column2: []string{"running", "paused"},
})
if err != nil {
slog.Warn("reconciler: failed to list DB sandboxes", "error", err)
return
}
// Find sandboxes in DB that are no longer on the host.
var stale []string
for _, sb := range dbSandboxes {
if _, ok := alive[sb.ID]; !ok {
stale = append(stale, sb.ID)
}
}
if len(stale) == 0 {
return
}
slog.Info("reconciler: marking stale sandboxes as stopped", "count", len(stale), "ids", stale)
if err := rc.db.BulkUpdateStatusByIDs(ctx, db.BulkUpdateStatusByIDsParams{
Column1: stale,
Status: "stopped",
}); err != nil {
slog.Warn("reconciler: failed to update stale sandboxes", "error", err)
}
}

View File

@ -0,0 +1,90 @@
package api
import (
_ "embed"
"fmt"
"net/http"
"github.com/go-chi/chi/v5"
"git.omukk.dev/wrenn/sandbox/internal/db"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
)
//go:embed openapi.yaml
var openapiYAML []byte
// Server is the control plane HTTP server.
type Server struct {
router chi.Router
}
// New constructs the chi router and registers all routes.
func New(queries *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *Server {
r := chi.NewRouter()
r.Use(requestLogger())
sandbox := newSandboxHandler(queries, agent)
exec := newExecHandler(queries, agent)
files := newFilesHandler(queries, agent)
// OpenAPI spec and docs.
r.Get("/openapi.yaml", serveOpenAPI)
r.Get("/docs", serveDocs)
// Sandbox CRUD.
r.Route("/v1/sandboxes", func(r chi.Router) {
r.Post("/", sandbox.Create)
r.Get("/", sandbox.List)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", sandbox.Get)
r.Delete("/", sandbox.Destroy)
r.Post("/exec", exec.Exec)
r.Post("/pause", sandbox.Pause)
r.Post("/resume", sandbox.Resume)
r.Post("/files/write", files.Upload)
r.Post("/files/read", files.Download)
})
})
return &Server{router: r}
}
// Handler returns the HTTP handler.
func (s *Server) Handler() http.Handler {
return s.router
}
func serveOpenAPI(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/yaml")
w.Write(openapiYAML)
}
func serveDocs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
fmt.Fprint(w, `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Wrenn Sandbox API</title>
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist@5/swagger-ui.css">
<style>
body { margin: 0; background: #fafafa; }
.swagger-ui .topbar { display: none; }
</style>
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@5/swagger-ui-bundle.js"></script>
<script>
SwaggerUIBundle({
url: "/openapi.yaml",
dom_id: "#swagger-ui",
deepLinking: true,
});
</script>
</body>
</html>`)
}

View File

@ -0,0 +1 @@
package auth

View File

@ -0,0 +1 @@
package auth

View File

@ -0,0 +1,36 @@
package config
import (
"os"
"strings"
)
// Config holds the control plane configuration.
type Config struct {
DatabaseURL string
ListenAddr string
HostAgentAddr string
}
// Load reads configuration from environment variables.
func Load() Config {
cfg := Config{
DatabaseURL: envOrDefault("DATABASE_URL", "postgres://wrenn:wrenn@localhost:5432/wrenn?sslmode=disable"),
ListenAddr: envOrDefault("CP_LISTEN_ADDR", ":8080"),
HostAgentAddr: envOrDefault("CP_HOST_AGENT_ADDR", "http://localhost:50051"),
}
// Ensure the host agent address has a scheme.
if !strings.HasPrefix(cfg.HostAgentAddr, "http://") && !strings.HasPrefix(cfg.HostAgentAddr, "https://") {
cfg.HostAgentAddr = "http://" + cfg.HostAgentAddr
}
return cfg
}
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}

32
internal/db/db.go Normal file
View File

@ -0,0 +1,32 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
package db
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
}
func New(db DBTX) *Queries {
return &Queries{db: db}
}
type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
return &Queries{
db: tx,
}
}

26
internal/db/models.go Normal file
View File

@ -0,0 +1,26 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
package db
import (
"github.com/jackc/pgx/v5/pgtype"
)
type Sandbox struct {
ID string `json:"id"`
OwnerID string `json:"owner_id"`
HostID string `json:"host_id"`
Template string `json:"template"`
Status string `json:"status"`
Vcpus int32 `json:"vcpus"`
MemoryMb int32 `json:"memory_mb"`
TimeoutSec int32 `json:"timeout_sec"`
GuestIp string `json:"guest_ip"`
HostIp string `json:"host_ip"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
StartedAt pgtype.Timestamptz `json:"started_at"`
LastActiveAt pgtype.Timestamptz `json:"last_active_at"`
LastUpdated pgtype.Timestamptz `json:"last_updated"`
}

View File

@ -0,0 +1,286 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: sandboxes.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const bulkUpdateStatusByIDs = `-- name: BulkUpdateStatusByIDs :exec
UPDATE sandboxes
SET status = $2,
last_updated = NOW()
WHERE id = ANY($1::text[])
`
type BulkUpdateStatusByIDsParams struct {
Column1 []string `json:"column_1"`
Status string `json:"status"`
}
func (q *Queries) BulkUpdateStatusByIDs(ctx context.Context, arg BulkUpdateStatusByIDsParams) error {
_, err := q.db.Exec(ctx, bulkUpdateStatusByIDs, arg.Column1, arg.Status)
return err
}
const getSandbox = `-- name: GetSandbox :one
SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes WHERE id = $1
`
func (q *Queries) GetSandbox(ctx context.Context, id string) (Sandbox, error) {
row := q.db.QueryRow(ctx, getSandbox, id)
var i Sandbox
err := row.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
)
return i, err
}
const insertSandbox = `-- name: InsertSandbox :one
INSERT INTO sandboxes (id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated
`
type InsertSandboxParams struct {
ID string `json:"id"`
OwnerID string `json:"owner_id"`
HostID string `json:"host_id"`
Template string `json:"template"`
Status string `json:"status"`
Vcpus int32 `json:"vcpus"`
MemoryMb int32 `json:"memory_mb"`
TimeoutSec int32 `json:"timeout_sec"`
}
func (q *Queries) InsertSandbox(ctx context.Context, arg InsertSandboxParams) (Sandbox, error) {
row := q.db.QueryRow(ctx, insertSandbox,
arg.ID,
arg.OwnerID,
arg.HostID,
arg.Template,
arg.Status,
arg.Vcpus,
arg.MemoryMb,
arg.TimeoutSec,
)
var i Sandbox
err := row.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
)
return i, err
}
const listSandboxes = `-- name: ListSandboxes :many
SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes ORDER BY created_at DESC
`
func (q *Queries) ListSandboxes(ctx context.Context) ([]Sandbox, error) {
rows, err := q.db.Query(ctx, listSandboxes)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Sandbox
for rows.Next() {
var i Sandbox
if err := rows.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listSandboxesByHostAndStatus = `-- name: ListSandboxesByHostAndStatus :many
SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes
WHERE host_id = $1 AND status = ANY($2::text[])
ORDER BY created_at DESC
`
type ListSandboxesByHostAndStatusParams struct {
HostID string `json:"host_id"`
Column2 []string `json:"column_2"`
}
func (q *Queries) ListSandboxesByHostAndStatus(ctx context.Context, arg ListSandboxesByHostAndStatusParams) ([]Sandbox, error) {
rows, err := q.db.Query(ctx, listSandboxesByHostAndStatus, arg.HostID, arg.Column2)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Sandbox
for rows.Next() {
var i Sandbox
if err := rows.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updateLastActive = `-- name: UpdateLastActive :exec
UPDATE sandboxes
SET last_active_at = $2,
last_updated = NOW()
WHERE id = $1
`
type UpdateLastActiveParams struct {
ID string `json:"id"`
LastActiveAt pgtype.Timestamptz `json:"last_active_at"`
}
func (q *Queries) UpdateLastActive(ctx context.Context, arg UpdateLastActiveParams) error {
_, err := q.db.Exec(ctx, updateLastActive, arg.ID, arg.LastActiveAt)
return err
}
const updateSandboxRunning = `-- name: UpdateSandboxRunning :one
UPDATE sandboxes
SET status = 'running',
host_ip = $2,
guest_ip = $3,
started_at = $4,
last_active_at = $4,
last_updated = NOW()
WHERE id = $1
RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated
`
type UpdateSandboxRunningParams struct {
ID string `json:"id"`
HostIp string `json:"host_ip"`
GuestIp string `json:"guest_ip"`
StartedAt pgtype.Timestamptz `json:"started_at"`
}
func (q *Queries) UpdateSandboxRunning(ctx context.Context, arg UpdateSandboxRunningParams) (Sandbox, error) {
row := q.db.QueryRow(ctx, updateSandboxRunning,
arg.ID,
arg.HostIp,
arg.GuestIp,
arg.StartedAt,
)
var i Sandbox
err := row.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
)
return i, err
}
const updateSandboxStatus = `-- name: UpdateSandboxStatus :one
UPDATE sandboxes
SET status = $2,
last_updated = NOW()
WHERE id = $1
RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated
`
type UpdateSandboxStatusParams struct {
ID string `json:"id"`
Status string `json:"status"`
}
func (q *Queries) UpdateSandboxStatus(ctx context.Context, arg UpdateSandboxStatusParams) (Sandbox, error) {
row := q.db.QueryRow(ctx, updateSandboxStatus, arg.ID, arg.Status)
var i Sandbox
err := row.Scan(
&i.ID,
&i.OwnerID,
&i.HostID,
&i.Template,
&i.Status,
&i.Vcpus,
&i.MemoryMb,
&i.TimeoutSec,
&i.GuestIp,
&i.HostIp,
&i.CreatedAt,
&i.StartedAt,
&i.LastActiveAt,
&i.LastUpdated,
)
return i, err
}

View File

@ -1,11 +1,14 @@
package envdclient
import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"mime/multipart"
"net/http"
"net/url"
"connectrpc.com/connect"
@ -107,19 +110,80 @@ func (c *Client) Exec(ctx context.Context, cmd string, args ...string) (*ExecRes
return result, nil
}
// WriteFile writes content to a file inside the sandbox via envd's filesystem service.
// WriteFile writes content to a file inside the sandbox via envd's REST endpoint.
// envd expects POST /files?path=...&username=root with multipart/form-data (field name "file").
func (c *Client) WriteFile(ctx context.Context, path string, content []byte) error {
// envd uses HTTP upload for files, not Connect RPC.
// POST /files with multipart form data.
// For now, use the filesystem MakeDir for directories.
// TODO: Implement file upload via envd's REST endpoint.
return fmt.Errorf("WriteFile not yet implemented")
var body bytes.Buffer
writer := multipart.NewWriter(&body)
part, err := writer.CreateFormFile("file", "upload")
if err != nil {
return fmt.Errorf("create multipart: %w", err)
}
if _, err := part.Write(content); err != nil {
return fmt.Errorf("write multipart: %w", err)
}
writer.Close()
u := fmt.Sprintf("%s/files?%s", c.base, url.Values{
"path": {path},
"username": {"root"},
}.Encode())
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, &body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("write file: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("write file %s: status %d: %s", path, resp.StatusCode, string(respBody))
}
slog.Debug("envd write file", "path", path, "status", resp.StatusCode, "response", string(respBody))
return nil
}
// ReadFile reads a file from inside the sandbox.
// ReadFile reads a file from inside the sandbox via envd's REST endpoint.
// envd expects GET /files?path=...&username=root.
func (c *Client) ReadFile(ctx context.Context, path string) ([]byte, error) {
// TODO: Implement file download via envd's REST endpoint.
return nil, fmt.Errorf("ReadFile not yet implemented")
u := fmt.Sprintf("%s/files?%s", c.base, url.Values{
"path": {path},
"username": {"root"},
}.Encode())
slog.Debug("envd read file", "url", u, "path", path)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("read file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("read file %s: status %d: %s", path, resp.StatusCode, string(respBody))
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read file body: %w", err)
}
return data, nil
}
// ListDir lists directory contents inside the sandbox.

View File

@ -30,7 +30,7 @@ func (s *Server) CreateSandbox(
) (*connect.Response[pb.CreateSandboxResponse], error) {
msg := req.Msg
sb, err := s.mgr.Create(ctx, msg.Template, int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec))
sb, err := s.mgr.Create(ctx, msg.SandboxId, msg.Template, int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create sandbox: %w", err))
}
@ -98,6 +98,43 @@ func (s *Server) Exec(
}), nil
}
func (s *Server) WriteFile(
ctx context.Context,
req *connect.Request[pb.WriteFileRequest],
) (*connect.Response[pb.WriteFileResponse], error) {
msg := req.Msg
client, err := s.mgr.GetClient(msg.SandboxId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
if err := client.WriteFile(ctx, msg.Path, msg.Content); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write file: %w", err))
}
return connect.NewResponse(&pb.WriteFileResponse{}), nil
}
func (s *Server) ReadFile(
ctx context.Context,
req *connect.Request[pb.ReadFileRequest],
) (*connect.Response[pb.ReadFileResponse], error) {
msg := req.Msg
client, err := s.mgr.GetClient(msg.SandboxId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
content, err := client.ReadFile(ctx, msg.Path)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("read file: %w", err))
}
return connect.NewResponse(&pb.ReadFileResponse{Content: content}), nil
}
func (s *Server) ListSandboxes(
ctx context.Context,
req *connect.Request[pb.ListSandboxesRequest],

View File

@ -0,0 +1 @@
package lifecycle

View File

@ -0,0 +1 @@
package metrics

View File

@ -0,0 +1 @@
package metrics

View File

@ -57,8 +57,11 @@ func New(cfg Config) *Manager {
}
// Create boots a new sandbox: clone rootfs, set up network, start VM, wait for envd.
func (m *Manager) Create(ctx context.Context, template string, vcpus, memoryMB, timeoutSec int) (*models.Sandbox, error) {
sandboxID := id.NewSandboxID()
// If sandboxID is empty, a new ID is generated.
func (m *Manager) Create(ctx context.Context, sandboxID, template string, vcpus, memoryMB, timeoutSec int) (*models.Sandbox, error) {
if sandboxID == "" {
sandboxID = id.NewSandboxID()
}
if vcpus <= 0 {
vcpus = 1
@ -280,6 +283,18 @@ func (m *Manager) Get(sandboxID string) (*models.Sandbox, error) {
return &sb.Sandbox, nil
}
// GetClient returns the envd client for a sandbox.
func (m *Manager) GetClient(sandboxID string) (*envdclient.Client, error) {
sb, err := m.get(sandboxID)
if err != nil {
return nil, err
}
if sb.Status != models.StatusRunning {
return nil, fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
}
return sb.client, nil
}
func (m *Manager) get(sandboxID string) (*sandboxState, error) {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@ -0,0 +1 @@
package scheduler

View File

@ -0,0 +1 @@
package scheduler

View File

@ -0,0 +1 @@
package scheduler

View File

@ -0,0 +1 @@
package snapshot

View File

@ -0,0 +1 @@
package snapshot

View File

@ -0,0 +1 @@
package snapshot