diff --git a/Makefile b/Makefile index 0fc5ef0..7bce3d5 100644 --- a/Makefile +++ b/Makefile @@ -28,10 +28,10 @@ build-envd: # ═══════════════════════════════════════════════════ # Development # ═══════════════════════════════════════════════════ -.PHONY: dev dev-cp dev-agent dev-envd dev-infra dev-down dev-seed +.PHONY: dev dev-cp dev-agent dev-envd dev-infra dev-down ## One command to start everything for local dev -dev: dev-infra migrate-up dev-seed dev-cp +dev: dev-infra migrate-up dev-cp dev-infra: docker compose -f deploy/docker-compose.dev.yml up -d @@ -52,8 +52,6 @@ dev-agent: dev-envd: cd $(ENVD_DIR) && go run . --debug --listen-tcp :3002 -dev-seed: - go run ./scripts/seed.go # ═══════════════════════════════════════════════════ # Database (goose) @@ -89,8 +87,7 @@ proto: cd $(ENVD_DIR)/spec && buf generate sqlc: - @if command -v sqlc > /dev/null; then sqlc generate; \ - else echo "sqlc not installed, skipping"; fi + sqlc generate # ═══════════════════════════════════════════════════ # Quality & Testing diff --git a/cmd/control-plane/main.go b/cmd/control-plane/main.go index e69de29..80638bb 100644 --- a/cmd/control-plane/main.go +++ b/cmd/control-plane/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + + "git.omukk.dev/wrenn/sandbox/internal/api" + "git.omukk.dev/wrenn/sandbox/internal/config" + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +func main() { + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + + cfg := config.Load() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Database connection pool. + pool, err := pgxpool.New(ctx, cfg.DatabaseURL) + if err != nil { + slog.Error("failed to connect to database", "error", err) + os.Exit(1) + } + defer pool.Close() + + if err := pool.Ping(ctx); err != nil { + slog.Error("failed to ping database", "error", err) + os.Exit(1) + } + slog.Info("connected to database") + + queries := db.New(pool) + + // Connect RPC client for the host agent. + agentHTTP := &http.Client{Timeout: 10 * time.Minute} + agentClient := hostagentv1connect.NewHostAgentServiceClient( + agentHTTP, + cfg.HostAgentAddr, + ) + + // API server. + srv := api.New(queries, agentClient) + + // Start reconciler. + reconciler := api.NewReconciler(queries, agentClient, "default", 30*time.Second) + reconciler.Start(ctx) + + httpServer := &http.Server{ + Addr: cfg.ListenAddr, + Handler: srv.Handler(), + } + + // Graceful shutdown on signal. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigCh + slog.Info("received signal, shutting down", "signal", sig) + cancel() + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + if err := httpServer.Shutdown(shutdownCtx); err != nil { + slog.Error("http server shutdown error", "error", err) + } + }() + + slog.Info("control plane starting", "addr", cfg.ListenAddr, "agent", cfg.HostAgentAddr) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("http server error", "error", err) + os.Exit(1) + } + + slog.Info("control plane stopped") +} diff --git a/db/migrations/20260310094104_initial.sql b/db/migrations/20260310094104_initial.sql new file mode 100644 index 0000000..017b12e --- /dev/null +++ b/db/migrations/20260310094104_initial.sql @@ -0,0 +1,25 @@ +-- +goose Up + +CREATE TABLE sandboxes ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT '', + host_id TEXT NOT NULL DEFAULT 'default', + template TEXT NOT NULL DEFAULT 'minimal', + status TEXT NOT NULL DEFAULT 'pending', + vcpus INTEGER NOT NULL DEFAULT 1, + memory_mb INTEGER NOT NULL DEFAULT 512, + timeout_sec INTEGER NOT NULL DEFAULT 300, + guest_ip TEXT NOT NULL DEFAULT '', + host_ip TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + last_active_at TIMESTAMPTZ, + last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_sandboxes_status ON sandboxes(status); +CREATE INDEX idx_sandboxes_host_status ON sandboxes(host_id, status); + +-- +goose Down + +DROP TABLE sandboxes; diff --git a/db/queries/sandboxes.sql b/db/queries/sandboxes.sql index e69de29..7a964a7 100644 --- a/db/queries/sandboxes.sql +++ b/db/queries/sandboxes.sql @@ -0,0 +1,45 @@ +-- name: InsertSandbox :one +INSERT INTO sandboxes (id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +RETURNING *; + +-- name: GetSandbox :one +SELECT * FROM sandboxes WHERE id = $1; + +-- name: ListSandboxes :many +SELECT * FROM sandboxes ORDER BY created_at DESC; + +-- name: ListSandboxesByHostAndStatus :many +SELECT * FROM sandboxes +WHERE host_id = $1 AND status = ANY($2::text[]) +ORDER BY created_at DESC; + +-- name: UpdateSandboxRunning :one +UPDATE sandboxes +SET status = 'running', + host_ip = $2, + guest_ip = $3, + started_at = $4, + last_active_at = $4, + last_updated = NOW() +WHERE id = $1 +RETURNING *; + +-- name: UpdateSandboxStatus :one +UPDATE sandboxes +SET status = $2, + last_updated = NOW() +WHERE id = $1 +RETURNING *; + +-- name: UpdateLastActive :exec +UPDATE sandboxes +SET last_active_at = $2, + last_updated = NOW() +WHERE id = $1; + +-- name: BulkUpdateStatusByIDs :exec +UPDATE sandboxes +SET status = $2, + last_updated = NOW() +WHERE id = ANY($1::text[]); diff --git a/envd/internal/services/spec/filesystem.pb.go b/envd/internal/services/spec/filesystem.pb.go index 0c654fb..9d3e537 100644 --- a/envd/internal/services/spec/filesystem.pb.go +++ b/envd/internal/services/spec/filesystem.pb.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 diff --git a/envd/internal/services/spec/process.pb.go b/envd/internal/services/spec/process.pb.go index 711d34f..6877dca 100644 --- a/envd/internal/services/spec/process.pb.go +++ b/envd/internal/services/spec/process.pb.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 diff --git a/envd/internal/services/spec/specconnect/filesystem.connect.go b/envd/internal/services/spec/specconnect/filesystem.connect.go index f405b3d..b06df5f 100644 --- a/envd/internal/services/spec/specconnect/filesystem.connect.go +++ b/envd/internal/services/spec/specconnect/filesystem.connect.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-connect-go. DO NOT EDIT. // // Source: filesystem.proto diff --git a/envd/internal/services/spec/specconnect/process.connect.go b/envd/internal/services/spec/specconnect/process.connect.go index effd57b..57f49d5 100644 --- a/envd/internal/services/spec/specconnect/process.connect.go +++ b/envd/internal/services/spec/specconnect/process.connect.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-connect-go. DO NOT EDIT. // // Source: process.proto diff --git a/go.mod b/go.mod index 5f62a92..1ff46b1 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,18 @@ go 1.25.0 require ( connectrpc.com/connect v1.19.1 + github.com/go-chi/chi/v5 v5.2.5 + github.com/jackc/pgx/v5 v5.8.0 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f google.golang.org/protobuf v1.36.11 ) -require golang.org/x/sys v0.41.0 // indirect +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum index edef12e..5180f1b 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,43 @@ connectrpc.com/connect v1.19.1 h1:R5M57z05+90EfEvCY1b7hBxDVOUl45PrtXtAV2fOC14= connectrpc.com/connect v1.19.1/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= +github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 h1:+UB2BJA852UkGH42H+Oee69djmxS3ANzl2b/JtT1YiA= github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/admin/handlers.go b/internal/admin/handlers.go index e69de29..d78da5d 100644 --- a/internal/admin/handlers.go +++ b/internal/admin/handlers.go @@ -0,0 +1 @@ +package admin diff --git a/internal/api/handlers_exec.go b/internal/api/handlers_exec.go index e69de29..18aa723 100644 --- a/internal/api/handlers_exec.go +++ b/internal/api/handlers_exec.go @@ -0,0 +1,124 @@ +package api + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "time" + "unicode/utf8" + + "connectrpc.com/connect" + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" + + "git.omukk.dev/wrenn/sandbox/internal/db" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +type execHandler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient +} + +func newExecHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *execHandler { + return &execHandler{db: db, agent: agent} +} + +type execRequest struct { + Cmd string `json:"cmd"` + Args []string `json:"args"` + TimeoutSec int32 `json:"timeout_sec"` +} + +type execResponse struct { + SandboxID string `json:"sandbox_id"` + Cmd string `json:"cmd"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + ExitCode int32 `json:"exit_code"` + DurationMs int64 `json:"duration_ms"` + // Encoding is "utf-8" for text output, "base64" for binary output. + Encoding string `json:"encoding"` +} + +// Exec handles POST /v1/sandboxes/{id}/exec. +func (h *execHandler) Exec(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")") + return + } + + var req execRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + if req.Cmd == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "cmd is required") + return + } + + start := time.Now() + + resp, err := h.agent.Exec(ctx, connect.NewRequest(&pb.ExecRequest{ + SandboxId: sandboxID, + Cmd: req.Cmd, + Args: req.Args, + TimeoutSec: req.TimeoutSec, + })) + if err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + duration := time.Since(start) + + // Update last active. + h.db.UpdateLastActive(ctx, db.UpdateLastActiveParams{ + ID: sandboxID, + LastActiveAt: pgtype.Timestamptz{ + Time: time.Now(), + Valid: true, + }, + }) + + // Use base64 encoding if output contains non-UTF-8 bytes. + stdout := resp.Msg.Stdout + stderr := resp.Msg.Stderr + encoding := "utf-8" + + if !utf8.Valid(stdout) || !utf8.Valid(stderr) { + encoding = "base64" + writeJSON(w, http.StatusOK, execResponse{ + SandboxID: sandboxID, + Cmd: req.Cmd, + Stdout: base64.StdEncoding.EncodeToString(stdout), + Stderr: base64.StdEncoding.EncodeToString(stderr), + ExitCode: resp.Msg.ExitCode, + DurationMs: duration.Milliseconds(), + Encoding: encoding, + }) + return + } + + writeJSON(w, http.StatusOK, execResponse{ + SandboxID: sandboxID, + Cmd: req.Cmd, + Stdout: string(stdout), + Stderr: string(stderr), + ExitCode: resp.Msg.ExitCode, + DurationMs: duration.Milliseconds(), + Encoding: encoding, + }) +} diff --git a/internal/api/handlers_files.go b/internal/api/handlers_files.go index e69de29..d844d08 100644 --- a/internal/api/handlers_files.go +++ b/internal/api/handlers_files.go @@ -0,0 +1,132 @@ +package api + +import ( + "encoding/json" + "errors" + "io" + "net/http" + + "connectrpc.com/connect" + "github.com/go-chi/chi/v5" + + "git.omukk.dev/wrenn/sandbox/internal/db" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +type filesHandler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient +} + +func newFilesHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *filesHandler { + return &filesHandler{db: db, agent: agent} +} + +// Upload handles POST /v1/sandboxes/{id}/files/write. +// Expects multipart/form-data with: +// - "path" text field: absolute destination path inside the sandbox +// - "file" file field: binary content to write +func (h *filesHandler) Upload(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running") + return + } + + // Limit to 100 MB. + r.Body = http.MaxBytesReader(w, r.Body, 100<<20) + + if err := r.ParseMultipartForm(100 << 20); err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) { + writeError(w, http.StatusRequestEntityTooLarge, "too_large", "file exceeds 100 MB limit") + return + } + writeError(w, http.StatusBadRequest, "invalid_request", "expected multipart/form-data") + return + } + + filePath := r.FormValue("path") + if filePath == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "path field is required") + return + } + + file, _, err := r.FormFile("file") + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "file field is required") + return + } + defer file.Close() + + content, err := io.ReadAll(file) + if err != nil { + writeError(w, http.StatusInternalServerError, "read_error", "failed to read uploaded file") + return + } + + if _, err := h.agent.WriteFile(ctx, connect.NewRequest(&pb.WriteFileRequest{ + SandboxId: sandboxID, + Path: filePath, + Content: content, + })); err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +type readFileRequest struct { + Path string `json:"path"` +} + +// Download handles POST /v1/sandboxes/{id}/files/read. +// Accepts JSON body with path, returns raw file content with Content-Disposition. +func (h *filesHandler) Download(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running") + return + } + + var req readFileRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + if req.Path == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "path is required") + return + } + + resp, err := h.agent.ReadFile(ctx, connect.NewRequest(&pb.ReadFileRequest{ + SandboxId: sandboxID, + Path: req.Path, + })) + if err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(resp.Msg.Content) +} diff --git a/internal/api/handlers_sandbox.go b/internal/api/handlers_sandbox.go index e69de29..0397e17 100644 --- a/internal/api/handlers_sandbox.go +++ b/internal/api/handlers_sandbox.go @@ -0,0 +1,277 @@ +package api + +import ( + "encoding/json" + "log/slog" + "net/http" + "time" + + "connectrpc.com/connect" + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5/pgtype" + + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/internal/id" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +type sandboxHandler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient +} + +func newSandboxHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *sandboxHandler { + return &sandboxHandler{db: db, agent: agent} +} + +type createSandboxRequest struct { + Template string `json:"template"` + VCPUs int32 `json:"vcpus"` + MemoryMB int32 `json:"memory_mb"` + TimeoutSec int32 `json:"timeout_sec"` +} + +type sandboxResponse struct { + ID string `json:"id"` + Status string `json:"status"` + Template string `json:"template"` + VCPUs int32 `json:"vcpus"` + MemoryMB int32 `json:"memory_mb"` + TimeoutSec int32 `json:"timeout_sec"` + GuestIP string `json:"guest_ip,omitempty"` + HostIP string `json:"host_ip,omitempty"` + CreatedAt string `json:"created_at"` + StartedAt *string `json:"started_at,omitempty"` + LastActiveAt *string `json:"last_active_at,omitempty"` + LastUpdated string `json:"last_updated"` +} + +func sandboxToResponse(sb db.Sandbox) sandboxResponse { + resp := sandboxResponse{ + ID: sb.ID, + Status: sb.Status, + Template: sb.Template, + VCPUs: sb.Vcpus, + MemoryMB: sb.MemoryMb, + TimeoutSec: sb.TimeoutSec, + GuestIP: sb.GuestIp, + HostIP: sb.HostIp, + } + if sb.CreatedAt.Valid { + resp.CreatedAt = sb.CreatedAt.Time.Format(time.RFC3339) + } + if sb.StartedAt.Valid { + s := sb.StartedAt.Time.Format(time.RFC3339) + resp.StartedAt = &s + } + if sb.LastActiveAt.Valid { + s := sb.LastActiveAt.Time.Format(time.RFC3339) + resp.LastActiveAt = &s + } + if sb.LastUpdated.Valid { + resp.LastUpdated = sb.LastUpdated.Time.Format(time.RFC3339) + } + return resp +} + +// Create handles POST /v1/sandboxes. +func (h *sandboxHandler) Create(w http.ResponseWriter, r *http.Request) { + var req createSandboxRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + if req.Template == "" { + req.Template = "minimal" + } + if req.VCPUs <= 0 { + req.VCPUs = 1 + } + if req.MemoryMB <= 0 { + req.MemoryMB = 512 + } + if req.TimeoutSec <= 0 { + req.TimeoutSec = 300 + } + + ctx := r.Context() + sandboxID := id.NewSandboxID() + + // Insert pending record. + sb, err := h.db.InsertSandbox(ctx, db.InsertSandboxParams{ + ID: sandboxID, + OwnerID: "", + HostID: "default", + Template: req.Template, + Status: "pending", + Vcpus: req.VCPUs, + MemoryMb: req.MemoryMB, + TimeoutSec: req.TimeoutSec, + }) + if err != nil { + slog.Error("failed to insert sandbox", "error", err) + writeError(w, http.StatusInternalServerError, "db_error", "failed to create sandbox record") + return + } + + // Call host agent to create the sandbox. + resp, err := h.agent.CreateSandbox(ctx, connect.NewRequest(&pb.CreateSandboxRequest{ + SandboxId: sandboxID, + Template: req.Template, + Vcpus: req.VCPUs, + MemoryMb: req.MemoryMB, + TimeoutSec: req.TimeoutSec, + })) + if err != nil { + h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ + ID: sandboxID, Status: "error", + }) + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + // Update to running. + now := time.Now() + sb, err = h.db.UpdateSandboxRunning(ctx, db.UpdateSandboxRunningParams{ + ID: sandboxID, + HostIp: resp.Msg.HostIp, + GuestIp: "", + StartedAt: pgtype.Timestamptz{ + Time: now, + Valid: true, + }, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to update sandbox status") + return + } + + writeJSON(w, http.StatusCreated, sandboxToResponse(sb)) +} + +// List handles GET /v1/sandboxes. +func (h *sandboxHandler) List(w http.ResponseWriter, r *http.Request) { + sandboxes, err := h.db.ListSandboxes(r.Context()) + if err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to list sandboxes") + return + } + + resp := make([]sandboxResponse, len(sandboxes)) + for i, sb := range sandboxes { + resp[i] = sandboxToResponse(sb) + } + + writeJSON(w, http.StatusOK, resp) +} + +// Get handles GET /v1/sandboxes/{id}. +func (h *sandboxHandler) Get(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + + sb, err := h.db.GetSandbox(r.Context(), sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + + writeJSON(w, http.StatusOK, sandboxToResponse(sb)) +} + +// Pause handles POST /v1/sandboxes/{id}/pause. +func (h *sandboxHandler) Pause(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running") + return + } + + if _, err := h.agent.PauseSandbox(ctx, connect.NewRequest(&pb.PauseSandboxRequest{ + SandboxId: sandboxID, + })); err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + sb, err = h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ + ID: sandboxID, Status: "paused", + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to update status") + return + } + + writeJSON(w, http.StatusOK, sandboxToResponse(sb)) +} + +// Resume handles POST /v1/sandboxes/{id}/resume. +func (h *sandboxHandler) Resume(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "paused" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not paused") + return + } + + if _, err := h.agent.ResumeSandbox(ctx, connect.NewRequest(&pb.ResumeSandboxRequest{ + SandboxId: sandboxID, + })); err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + sb, err = h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ + ID: sandboxID, Status: "running", + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to update status") + return + } + + writeJSON(w, http.StatusOK, sandboxToResponse(sb)) +} + +// Destroy handles DELETE /v1/sandboxes/{id}. +func (h *sandboxHandler) Destroy(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + _, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + + // Best-effort destroy on host agent — sandbox may already be gone (TTL reap). + if _, err := h.agent.DestroySandbox(ctx, connect.NewRequest(&pb.DestroySandboxRequest{ + SandboxId: sandboxID, + })); err != nil { + slog.Warn("destroy: agent RPC failed (sandbox may already be gone)", "sandbox_id", sandboxID, "error", err) + } + + if _, err := h.db.UpdateSandboxStatus(ctx, db.UpdateSandboxStatusParams{ + ID: sandboxID, Status: "stopped", + }); err != nil { + slog.Error("destroy: failed to update sandbox status in DB", "sandbox_id", sandboxID, "error", err) + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/api/handlers_terminal.go b/internal/api/handlers_terminal.go index e69de29..778f64e 100644 --- a/internal/api/handlers_terminal.go +++ b/internal/api/handlers_terminal.go @@ -0,0 +1 @@ +package api diff --git a/internal/api/middleware.go b/internal/api/middleware.go index e69de29..5c2c24f 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -0,0 +1,72 @@ +package api + +import ( + "encoding/json" + "log/slog" + "net/http" + "time" + + "connectrpc.com/connect" +) + +type errorResponse struct { + Error errorDetail `json:"error"` +} + +type errorDetail struct { + Code string `json:"code"` + Message string `json:"message"` +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, code, message string) { + writeJSON(w, status, errorResponse{ + Error: errorDetail{Code: code, Message: message}, + }) +} + +// agentErrToHTTP maps a Connect RPC error to an HTTP status, error code, and message. +func agentErrToHTTP(err error) (int, string, string) { + switch connect.CodeOf(err) { + case connect.CodeNotFound: + return http.StatusNotFound, "not_found", err.Error() + case connect.CodeInvalidArgument: + return http.StatusBadRequest, "invalid_request", err.Error() + case connect.CodeFailedPrecondition: + return http.StatusConflict, "conflict", err.Error() + default: + return http.StatusBadGateway, "agent_error", err.Error() + } +} + +// requestLogger returns middleware that logs each request. +func requestLogger() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + sw := &statusWriter{ResponseWriter: w, status: http.StatusOK} + next.ServeHTTP(sw, r) + slog.Info("request", + "method", r.Method, + "path", r.URL.Path, + "status", sw.status, + "duration", time.Since(start), + ) + }) + } +} + +type statusWriter struct { + http.ResponseWriter + status int +} + +func (w *statusWriter) WriteHeader(status int) { + w.status = status + w.ResponseWriter.WriteHeader(status) +} diff --git a/internal/api/openapi.yaml b/internal/api/openapi.yaml new file mode 100644 index 0000000..c8a7fab --- /dev/null +++ b/internal/api/openapi.yaml @@ -0,0 +1,346 @@ +openapi: "3.1.0" +info: + title: Wrenn Sandbox API + description: MicroVM-based code execution platform API. + version: "0.1.0" + +servers: + - url: http://localhost:8080 + description: Local development + +paths: + /v1/sandboxes: + post: + summary: Create a sandbox + operationId: createSandbox + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/CreateSandboxRequest" + responses: + "201": + description: Sandbox created + content: + application/json: + schema: + $ref: "#/components/schemas/Sandbox" + "502": + description: Host agent error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + get: + summary: List all sandboxes + operationId: listSandboxes + responses: + "200": + description: List of sandboxes + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/Sandbox" + + /v1/sandboxes/{id}: + parameters: + - name: id + in: path + required: true + schema: + type: string + + get: + summary: Get sandbox details + operationId: getSandbox + responses: + "200": + description: Sandbox details + content: + application/json: + schema: + $ref: "#/components/schemas/Sandbox" + "404": + description: Sandbox not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + delete: + summary: Destroy a sandbox + operationId: destroySandbox + responses: + "204": + description: Sandbox destroyed + + /v1/sandboxes/{id}/exec: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Execute a command + operationId: execCommand + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ExecRequest" + responses: + "200": + description: Command output + content: + application/json: + schema: + $ref: "#/components/schemas/ExecResponse" + "404": + description: Sandbox not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/pause: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Pause a running sandbox + operationId: pauseSandbox + responses: + "200": + description: Sandbox paused + content: + application/json: + schema: + $ref: "#/components/schemas/Sandbox" + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/resume: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Resume a paused sandbox + operationId: resumeSandbox + responses: + "200": + description: Sandbox resumed + content: + application/json: + schema: + $ref: "#/components/schemas/Sandbox" + "409": + description: Sandbox not paused + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/files/write: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Upload a file + operationId: uploadFile + requestBody: + required: true + content: + multipart/form-data: + schema: + type: object + required: [path, file] + properties: + path: + type: string + description: Absolute destination path inside the sandbox + file: + type: string + format: binary + description: File content + responses: + "204": + description: File uploaded + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "413": + description: File too large + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/files/read: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Download a file + operationId: downloadFile + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ReadFileRequest" + responses: + "200": + description: File content + content: + application/octet-stream: + schema: + type: string + format: binary + "404": + description: Sandbox or file not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + +components: + schemas: + CreateSandboxRequest: + type: object + properties: + template: + type: string + default: minimal + vcpus: + type: integer + default: 1 + memory_mb: + type: integer + default: 512 + timeout_sec: + type: integer + default: 300 + + Sandbox: + type: object + properties: + id: + type: string + status: + type: string + enum: [pending, running, paused, stopped, error] + template: + type: string + vcpus: + type: integer + memory_mb: + type: integer + timeout_sec: + type: integer + guest_ip: + type: string + host_ip: + type: string + created_at: + type: string + format: date-time + started_at: + type: string + format: date-time + nullable: true + last_active_at: + type: string + format: date-time + nullable: true + last_updated: + type: string + format: date-time + + ExecRequest: + type: object + required: [cmd] + properties: + cmd: + type: string + args: + type: array + items: + type: string + timeout_sec: + type: integer + default: 30 + + ExecResponse: + type: object + properties: + sandbox_id: + type: string + cmd: + type: string + stdout: + type: string + stderr: + type: string + exit_code: + type: integer + duration_ms: + type: integer + encoding: + type: string + enum: [utf-8, base64] + description: Output encoding. "base64" when stdout/stderr contain binary data. + + ReadFileRequest: + type: object + required: [path] + properties: + path: + type: string + description: Absolute file path inside the sandbox + + Error: + type: object + properties: + error: + type: object + properties: + code: + type: string + message: + type: string diff --git a/internal/api/reconciler.go b/internal/api/reconciler.go new file mode 100644 index 0000000..17f6d6c --- /dev/null +++ b/internal/api/reconciler.go @@ -0,0 +1,95 @@ +package api + +import ( + "context" + "log/slog" + "time" + + "connectrpc.com/connect" + + "git.omukk.dev/wrenn/sandbox/internal/db" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +// Reconciler periodically compares the host agent's sandbox list with the DB +// and marks sandboxes that no longer exist on the host as stopped. +type Reconciler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient + hostID string + interval time.Duration +} + +// NewReconciler creates a new reconciler. +func NewReconciler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient, hostID string, interval time.Duration) *Reconciler { + return &Reconciler{ + db: db, + agent: agent, + hostID: hostID, + interval: interval, + } +} + +// Start runs the reconciliation loop until the context is cancelled. +func (rc *Reconciler) Start(ctx context.Context) { + go func() { + ticker := time.NewTicker(rc.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rc.reconcile(ctx) + } + } + }() +} + +func (rc *Reconciler) reconcile(ctx context.Context) { + // Get all sandboxes the host agent knows about. + resp, err := rc.agent.ListSandboxes(ctx, connect.NewRequest(&pb.ListSandboxesRequest{})) + if err != nil { + slog.Warn("reconciler: failed to list sandboxes from host agent", "error", err) + return + } + + // Build a set of sandbox IDs that are alive on the host. + alive := make(map[string]struct{}, len(resp.Msg.Sandboxes)) + for _, sb := range resp.Msg.Sandboxes { + alive[sb.SandboxId] = struct{}{} + } + + // Get all DB sandboxes for this host that are running or paused. + dbSandboxes, err := rc.db.ListSandboxesByHostAndStatus(ctx, db.ListSandboxesByHostAndStatusParams{ + HostID: rc.hostID, + Column2: []string{"running", "paused"}, + }) + if err != nil { + slog.Warn("reconciler: failed to list DB sandboxes", "error", err) + return + } + + // Find sandboxes in DB that are no longer on the host. + var stale []string + for _, sb := range dbSandboxes { + if _, ok := alive[sb.ID]; !ok { + stale = append(stale, sb.ID) + } + } + + if len(stale) == 0 { + return + } + + slog.Info("reconciler: marking stale sandboxes as stopped", "count", len(stale), "ids", stale) + + if err := rc.db.BulkUpdateStatusByIDs(ctx, db.BulkUpdateStatusByIDsParams{ + Column1: stale, + Status: "stopped", + }); err != nil { + slog.Warn("reconciler: failed to update stale sandboxes", "error", err) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index e69de29..89b4f1e 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -0,0 +1,90 @@ +package api + +import ( + _ "embed" + "fmt" + "net/http" + + "github.com/go-chi/chi/v5" + + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +//go:embed openapi.yaml +var openapiYAML []byte + +// Server is the control plane HTTP server. +type Server struct { + router chi.Router +} + +// New constructs the chi router and registers all routes. +func New(queries *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *Server { + r := chi.NewRouter() + r.Use(requestLogger()) + + sandbox := newSandboxHandler(queries, agent) + exec := newExecHandler(queries, agent) + files := newFilesHandler(queries, agent) + + // OpenAPI spec and docs. + r.Get("/openapi.yaml", serveOpenAPI) + r.Get("/docs", serveDocs) + + // Sandbox CRUD. + r.Route("/v1/sandboxes", func(r chi.Router) { + r.Post("/", sandbox.Create) + r.Get("/", sandbox.List) + + r.Route("/{id}", func(r chi.Router) { + r.Get("/", sandbox.Get) + r.Delete("/", sandbox.Destroy) + r.Post("/exec", exec.Exec) + r.Post("/pause", sandbox.Pause) + r.Post("/resume", sandbox.Resume) + r.Post("/files/write", files.Upload) + r.Post("/files/read", files.Download) + }) + }) + + return &Server{router: r} +} + +// Handler returns the HTTP handler. +func (s *Server) Handler() http.Handler { + return s.router +} + +func serveOpenAPI(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/yaml") + w.Write(openapiYAML) +} + +func serveDocs(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + fmt.Fprint(w, ` + + + + + Wrenn Sandbox API + + + + +
+ + + +`) +} diff --git a/internal/auth/apikey.go b/internal/auth/apikey.go index e69de29..8832b06 100644 --- a/internal/auth/apikey.go +++ b/internal/auth/apikey.go @@ -0,0 +1 @@ +package auth diff --git a/internal/auth/ratelimit.go b/internal/auth/ratelimit.go index e69de29..8832b06 100644 --- a/internal/auth/ratelimit.go +++ b/internal/auth/ratelimit.go @@ -0,0 +1 @@ +package auth diff --git a/internal/config/config.go b/internal/config/config.go index e69de29..ebf81c1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -0,0 +1,36 @@ +package config + +import ( + "os" + "strings" +) + +// Config holds the control plane configuration. +type Config struct { + DatabaseURL string + ListenAddr string + HostAgentAddr string +} + +// Load reads configuration from environment variables. +func Load() Config { + cfg := Config{ + DatabaseURL: envOrDefault("DATABASE_URL", "postgres://wrenn:wrenn@localhost:5432/wrenn?sslmode=disable"), + ListenAddr: envOrDefault("CP_LISTEN_ADDR", ":8080"), + HostAgentAddr: envOrDefault("CP_HOST_AGENT_ADDR", "http://localhost:50051"), + } + + // Ensure the host agent address has a scheme. + if !strings.HasPrefix(cfg.HostAgentAddr, "http://") && !strings.HasPrefix(cfg.HostAgentAddr, "https://") { + cfg.HostAgentAddr = "http://" + cfg.HostAgentAddr + } + + return cfg +} + +func envOrDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..9d485b5 --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/db/models.go b/internal/db/models.go new file mode 100644 index 0000000..c110408 --- /dev/null +++ b/internal/db/models.go @@ -0,0 +1,26 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package db + +import ( + "github.com/jackc/pgx/v5/pgtype" +) + +type Sandbox struct { + ID string `json:"id"` + OwnerID string `json:"owner_id"` + HostID string `json:"host_id"` + Template string `json:"template"` + Status string `json:"status"` + Vcpus int32 `json:"vcpus"` + MemoryMb int32 `json:"memory_mb"` + TimeoutSec int32 `json:"timeout_sec"` + GuestIp string `json:"guest_ip"` + HostIp string `json:"host_ip"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + StartedAt pgtype.Timestamptz `json:"started_at"` + LastActiveAt pgtype.Timestamptz `json:"last_active_at"` + LastUpdated pgtype.Timestamptz `json:"last_updated"` +} diff --git a/internal/db/sandboxes.sql.go b/internal/db/sandboxes.sql.go new file mode 100644 index 0000000..e11f8f2 --- /dev/null +++ b/internal/db/sandboxes.sql.go @@ -0,0 +1,286 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: sandboxes.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const bulkUpdateStatusByIDs = `-- name: BulkUpdateStatusByIDs :exec +UPDATE sandboxes +SET status = $2, + last_updated = NOW() +WHERE id = ANY($1::text[]) +` + +type BulkUpdateStatusByIDsParams struct { + Column1 []string `json:"column_1"` + Status string `json:"status"` +} + +func (q *Queries) BulkUpdateStatusByIDs(ctx context.Context, arg BulkUpdateStatusByIDsParams) error { + _, err := q.db.Exec(ctx, bulkUpdateStatusByIDs, arg.Column1, arg.Status) + return err +} + +const getSandbox = `-- name: GetSandbox :one +SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes WHERE id = $1 +` + +func (q *Queries) GetSandbox(ctx context.Context, id string) (Sandbox, error) { + row := q.db.QueryRow(ctx, getSandbox, id) + var i Sandbox + err := row.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ) + return i, err +} + +const insertSandbox = `-- name: InsertSandbox :one +INSERT INTO sandboxes (id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated +` + +type InsertSandboxParams struct { + ID string `json:"id"` + OwnerID string `json:"owner_id"` + HostID string `json:"host_id"` + Template string `json:"template"` + Status string `json:"status"` + Vcpus int32 `json:"vcpus"` + MemoryMb int32 `json:"memory_mb"` + TimeoutSec int32 `json:"timeout_sec"` +} + +func (q *Queries) InsertSandbox(ctx context.Context, arg InsertSandboxParams) (Sandbox, error) { + row := q.db.QueryRow(ctx, insertSandbox, + arg.ID, + arg.OwnerID, + arg.HostID, + arg.Template, + arg.Status, + arg.Vcpus, + arg.MemoryMb, + arg.TimeoutSec, + ) + var i Sandbox + err := row.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ) + return i, err +} + +const listSandboxes = `-- name: ListSandboxes :many +SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes ORDER BY created_at DESC +` + +func (q *Queries) ListSandboxes(ctx context.Context) ([]Sandbox, error) { + rows, err := q.db.Query(ctx, listSandboxes) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Sandbox + for rows.Next() { + var i Sandbox + if err := rows.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listSandboxesByHostAndStatus = `-- name: ListSandboxesByHostAndStatus :many +SELECT id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated FROM sandboxes +WHERE host_id = $1 AND status = ANY($2::text[]) +ORDER BY created_at DESC +` + +type ListSandboxesByHostAndStatusParams struct { + HostID string `json:"host_id"` + Column2 []string `json:"column_2"` +} + +func (q *Queries) ListSandboxesByHostAndStatus(ctx context.Context, arg ListSandboxesByHostAndStatusParams) ([]Sandbox, error) { + rows, err := q.db.Query(ctx, listSandboxesByHostAndStatus, arg.HostID, arg.Column2) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Sandbox + for rows.Next() { + var i Sandbox + if err := rows.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateLastActive = `-- name: UpdateLastActive :exec +UPDATE sandboxes +SET last_active_at = $2, + last_updated = NOW() +WHERE id = $1 +` + +type UpdateLastActiveParams struct { + ID string `json:"id"` + LastActiveAt pgtype.Timestamptz `json:"last_active_at"` +} + +func (q *Queries) UpdateLastActive(ctx context.Context, arg UpdateLastActiveParams) error { + _, err := q.db.Exec(ctx, updateLastActive, arg.ID, arg.LastActiveAt) + return err +} + +const updateSandboxRunning = `-- name: UpdateSandboxRunning :one +UPDATE sandboxes +SET status = 'running', + host_ip = $2, + guest_ip = $3, + started_at = $4, + last_active_at = $4, + last_updated = NOW() +WHERE id = $1 +RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated +` + +type UpdateSandboxRunningParams struct { + ID string `json:"id"` + HostIp string `json:"host_ip"` + GuestIp string `json:"guest_ip"` + StartedAt pgtype.Timestamptz `json:"started_at"` +} + +func (q *Queries) UpdateSandboxRunning(ctx context.Context, arg UpdateSandboxRunningParams) (Sandbox, error) { + row := q.db.QueryRow(ctx, updateSandboxRunning, + arg.ID, + arg.HostIp, + arg.GuestIp, + arg.StartedAt, + ) + var i Sandbox + err := row.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ) + return i, err +} + +const updateSandboxStatus = `-- name: UpdateSandboxStatus :one +UPDATE sandboxes +SET status = $2, + last_updated = NOW() +WHERE id = $1 +RETURNING id, owner_id, host_id, template, status, vcpus, memory_mb, timeout_sec, guest_ip, host_ip, created_at, started_at, last_active_at, last_updated +` + +type UpdateSandboxStatusParams struct { + ID string `json:"id"` + Status string `json:"status"` +} + +func (q *Queries) UpdateSandboxStatus(ctx context.Context, arg UpdateSandboxStatusParams) (Sandbox, error) { + row := q.db.QueryRow(ctx, updateSandboxStatus, arg.ID, arg.Status) + var i Sandbox + err := row.Scan( + &i.ID, + &i.OwnerID, + &i.HostID, + &i.Template, + &i.Status, + &i.Vcpus, + &i.MemoryMb, + &i.TimeoutSec, + &i.GuestIp, + &i.HostIp, + &i.CreatedAt, + &i.StartedAt, + &i.LastActiveAt, + &i.LastUpdated, + ) + return i, err +} diff --git a/internal/envdclient/client.go b/internal/envdclient/client.go index 69686d6..c231c05 100644 --- a/internal/envdclient/client.go +++ b/internal/envdclient/client.go @@ -1,11 +1,14 @@ package envdclient import ( + "bytes" "context" "fmt" "io" "log/slog" + "mime/multipart" "net/http" + "net/url" "connectrpc.com/connect" @@ -107,19 +110,80 @@ func (c *Client) Exec(ctx context.Context, cmd string, args ...string) (*ExecRes return result, nil } -// WriteFile writes content to a file inside the sandbox via envd's filesystem service. +// WriteFile writes content to a file inside the sandbox via envd's REST endpoint. +// envd expects POST /files?path=...&username=root with multipart/form-data (field name "file"). func (c *Client) WriteFile(ctx context.Context, path string, content []byte) error { - // envd uses HTTP upload for files, not Connect RPC. - // POST /files with multipart form data. - // For now, use the filesystem MakeDir for directories. - // TODO: Implement file upload via envd's REST endpoint. - return fmt.Errorf("WriteFile not yet implemented") + var body bytes.Buffer + writer := multipart.NewWriter(&body) + + part, err := writer.CreateFormFile("file", "upload") + if err != nil { + return fmt.Errorf("create multipart: %w", err) + } + if _, err := part.Write(content); err != nil { + return fmt.Errorf("write multipart: %w", err) + } + writer.Close() + + u := fmt.Sprintf("%s/files?%s", c.base, url.Values{ + "path": {path}, + "username": {"root"}, + }.Encode()) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, &body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("write file: %w", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("write file %s: status %d: %s", path, resp.StatusCode, string(respBody)) + } + + slog.Debug("envd write file", "path", path, "status", resp.StatusCode, "response", string(respBody)) + return nil } -// ReadFile reads a file from inside the sandbox. +// ReadFile reads a file from inside the sandbox via envd's REST endpoint. +// envd expects GET /files?path=...&username=root. func (c *Client) ReadFile(ctx context.Context, path string) ([]byte, error) { - // TODO: Implement file download via envd's REST endpoint. - return nil, fmt.Errorf("ReadFile not yet implemented") + u := fmt.Sprintf("%s/files?%s", c.base, url.Values{ + "path": {path}, + "username": {"root"}, + }.Encode()) + + slog.Debug("envd read file", "url", u, "path", path) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("read file: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("read file %s: status %d: %s", path, resp.StatusCode, string(respBody)) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read file body: %w", err) + } + + return data, nil } // ListDir lists directory contents inside the sandbox. diff --git a/internal/hostagent/server.go b/internal/hostagent/server.go index 717b509..19b3411 100644 --- a/internal/hostagent/server.go +++ b/internal/hostagent/server.go @@ -30,7 +30,7 @@ func (s *Server) CreateSandbox( ) (*connect.Response[pb.CreateSandboxResponse], error) { msg := req.Msg - sb, err := s.mgr.Create(ctx, msg.Template, int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec)) + sb, err := s.mgr.Create(ctx, msg.SandboxId, msg.Template, int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec)) if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create sandbox: %w", err)) } @@ -98,6 +98,43 @@ func (s *Server) Exec( }), nil } +func (s *Server) WriteFile( + ctx context.Context, + req *connect.Request[pb.WriteFileRequest], +) (*connect.Response[pb.WriteFileResponse], error) { + msg := req.Msg + + client, err := s.mgr.GetClient(msg.SandboxId) + if err != nil { + return nil, connect.NewError(connect.CodeNotFound, err) + } + + if err := client.WriteFile(ctx, msg.Path, msg.Content); err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write file: %w", err)) + } + + return connect.NewResponse(&pb.WriteFileResponse{}), nil +} + +func (s *Server) ReadFile( + ctx context.Context, + req *connect.Request[pb.ReadFileRequest], +) (*connect.Response[pb.ReadFileResponse], error) { + msg := req.Msg + + client, err := s.mgr.GetClient(msg.SandboxId) + if err != nil { + return nil, connect.NewError(connect.CodeNotFound, err) + } + + content, err := client.ReadFile(ctx, msg.Path) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("read file: %w", err)) + } + + return connect.NewResponse(&pb.ReadFileResponse{Content: content}), nil +} + func (s *Server) ListSandboxes( ctx context.Context, req *connect.Request[pb.ListSandboxesRequest], diff --git a/internal/lifecycle/manager.go b/internal/lifecycle/manager.go index e69de29..dd39965 100644 --- a/internal/lifecycle/manager.go +++ b/internal/lifecycle/manager.go @@ -0,0 +1 @@ +package lifecycle diff --git a/internal/metrics/collector.go b/internal/metrics/collector.go index e69de29..1abe097 100644 --- a/internal/metrics/collector.go +++ b/internal/metrics/collector.go @@ -0,0 +1 @@ +package metrics diff --git a/internal/metrics/exporter.go b/internal/metrics/exporter.go index e69de29..1abe097 100644 --- a/internal/metrics/exporter.go +++ b/internal/metrics/exporter.go @@ -0,0 +1 @@ +package metrics diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index b9ca420..935e6e3 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -57,8 +57,11 @@ func New(cfg Config) *Manager { } // Create boots a new sandbox: clone rootfs, set up network, start VM, wait for envd. -func (m *Manager) Create(ctx context.Context, template string, vcpus, memoryMB, timeoutSec int) (*models.Sandbox, error) { - sandboxID := id.NewSandboxID() +// If sandboxID is empty, a new ID is generated. +func (m *Manager) Create(ctx context.Context, sandboxID, template string, vcpus, memoryMB, timeoutSec int) (*models.Sandbox, error) { + if sandboxID == "" { + sandboxID = id.NewSandboxID() + } if vcpus <= 0 { vcpus = 1 @@ -280,6 +283,18 @@ func (m *Manager) Get(sandboxID string) (*models.Sandbox, error) { return &sb.Sandbox, nil } +// GetClient returns the envd client for a sandbox. +func (m *Manager) GetClient(sandboxID string) (*envdclient.Client, error) { + sb, err := m.get(sandboxID) + if err != nil { + return nil, err + } + if sb.Status != models.StatusRunning { + return nil, fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status) + } + return sb.client, nil +} + func (m *Manager) get(sandboxID string) (*sandboxState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/scheduler/least_loaded.go b/internal/scheduler/least_loaded.go index e69de29..6990da0 100644 --- a/internal/scheduler/least_loaded.go +++ b/internal/scheduler/least_loaded.go @@ -0,0 +1 @@ +package scheduler diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index e69de29..6990da0 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -0,0 +1 @@ +package scheduler diff --git a/internal/scheduler/single_host.go b/internal/scheduler/single_host.go index e69de29..6990da0 100644 --- a/internal/scheduler/single_host.go +++ b/internal/scheduler/single_host.go @@ -0,0 +1 @@ +package scheduler diff --git a/internal/snapshot/local.go b/internal/snapshot/local.go index e69de29..8df14bc 100644 --- a/internal/snapshot/local.go +++ b/internal/snapshot/local.go @@ -0,0 +1 @@ +package snapshot diff --git a/internal/snapshot/manager.go b/internal/snapshot/manager.go index e69de29..8df14bc 100644 --- a/internal/snapshot/manager.go +++ b/internal/snapshot/manager.go @@ -0,0 +1 @@ +package snapshot diff --git a/internal/snapshot/remote.go b/internal/snapshot/remote.go index e69de29..8df14bc 100644 --- a/internal/snapshot/remote.go +++ b/internal/snapshot/remote.go @@ -0,0 +1 @@ +package snapshot diff --git a/proto/envd/gen/filesystem.pb.go b/proto/envd/gen/filesystem.pb.go index 1ac207a..ad14655 100644 --- a/proto/envd/gen/filesystem.pb.go +++ b/proto/envd/gen/filesystem.pb.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 diff --git a/proto/envd/gen/genconnect/filesystem.connect.go b/proto/envd/gen/genconnect/filesystem.connect.go index 73f1fb9..adaf84f 100644 --- a/proto/envd/gen/genconnect/filesystem.connect.go +++ b/proto/envd/gen/genconnect/filesystem.connect.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-connect-go. DO NOT EDIT. // // Source: filesystem.proto diff --git a/proto/envd/gen/genconnect/process.connect.go b/proto/envd/gen/genconnect/process.connect.go index b724d24..d3bd13a 100644 --- a/proto/envd/gen/genconnect/process.connect.go +++ b/proto/envd/gen/genconnect/process.connect.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-connect-go. DO NOT EDIT. // // Source: process.proto diff --git a/proto/envd/gen/process.pb.go b/proto/envd/gen/process.pb.go index 475ba18..9b1533f 100644 --- a/proto/envd/gen/process.pb.go +++ b/proto/envd/gen/process.pb.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 + // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 diff --git a/proto/hostagent/gen/hostagent.pb.go b/proto/hostagent/gen/hostagent.pb.go index 0907329..175421e 100644 --- a/proto/hostagent/gen/hostagent.pb.go +++ b/proto/hostagent/gen/hostagent.pb.go @@ -23,6 +23,8 @@ const ( type CreateSandboxRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + // Sandbox ID assigned by the control plane. If empty, the host agent generates one. + SandboxId string `protobuf:"bytes,5,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` // Template name (e.g., "minimal", "python311"). Determines base rootfs. Template string `protobuf:"bytes,1,opt,name=template,proto3" json:"template,omitempty"` // Number of virtual CPUs (default: 1). @@ -66,6 +68,13 @@ func (*CreateSandboxRequest) Descriptor() ([]byte, []int) { return file_hostagent_proto_rawDescGZIP(), []int{0} } +func (x *CreateSandboxRequest) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + func (x *CreateSandboxRequest) GetTemplate() string { if x != nil { return x.Template @@ -711,12 +720,206 @@ func (x *SandboxInfo) GetTimeoutSec() int32 { return 0 } +type WriteFileRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + SandboxId string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + Content []byte `protobuf:"bytes,3,opt,name=content,proto3" json:"content,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteFileRequest) Reset() { + *x = WriteFileRequest{} + mi := &file_hostagent_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteFileRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteFileRequest) ProtoMessage() {} + +func (x *WriteFileRequest) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteFileRequest.ProtoReflect.Descriptor instead. +func (*WriteFileRequest) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{13} +} + +func (x *WriteFileRequest) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + +func (x *WriteFileRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *WriteFileRequest) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + +type WriteFileResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteFileResponse) Reset() { + *x = WriteFileResponse{} + mi := &file_hostagent_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteFileResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteFileResponse) ProtoMessage() {} + +func (x *WriteFileResponse) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteFileResponse.ProtoReflect.Descriptor instead. +func (*WriteFileResponse) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{14} +} + +type ReadFileRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + SandboxId string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadFileRequest) Reset() { + *x = ReadFileRequest{} + mi := &file_hostagent_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadFileRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadFileRequest) ProtoMessage() {} + +func (x *ReadFileRequest) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadFileRequest.ProtoReflect.Descriptor instead. +func (*ReadFileRequest) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{15} +} + +func (x *ReadFileRequest) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + +func (x *ReadFileRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type ReadFileResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadFileResponse) Reset() { + *x = ReadFileResponse{} + mi := &file_hostagent_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadFileResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadFileResponse) ProtoMessage() {} + +func (x *ReadFileResponse) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[16] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadFileResponse.ProtoReflect.Descriptor instead. +func (*ReadFileResponse) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{16} +} + +func (x *ReadFileResponse) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + var File_hostagent_proto protoreflect.FileDescriptor const file_hostagent_proto_rawDesc = "" + "\n" + - "\x0fhostagent.proto\x12\fhostagent.v1\"\x86\x01\n" + - "\x14CreateSandboxRequest\x12\x1a\n" + + "\x0fhostagent.proto\x12\fhostagent.v1\"\xa5\x01\n" + + "\x14CreateSandboxRequest\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x05 \x01(\tR\tsandboxId\x12\x1a\n" + "\btemplate\x18\x01 \x01(\tR\btemplate\x12\x14\n" + "\x05vcpus\x18\x02 \x01(\x05R\x05vcpus\x12\x1b\n" + "\tmemory_mb\x18\x03 \x01(\x05R\bmemoryMb\x12\x1f\n" + @@ -764,14 +967,28 @@ const file_hostagent_proto_rawDesc = "" + "\x0fcreated_at_unix\x18\a \x01(\x03R\rcreatedAtUnix\x12-\n" + "\x13last_active_at_unix\x18\b \x01(\x03R\x10lastActiveAtUnix\x12\x1f\n" + "\vtimeout_sec\x18\t \x01(\x05R\n" + - "timeoutSec2\x93\x04\n" + + "timeoutSec\"_\n" + + "\x10WriteFileRequest\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x12\n" + + "\x04path\x18\x02 \x01(\tR\x04path\x12\x18\n" + + "\acontent\x18\x03 \x01(\fR\acontent\"\x13\n" + + "\x11WriteFileResponse\"D\n" + + "\x0fReadFileRequest\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x12\n" + + "\x04path\x18\x02 \x01(\tR\x04path\",\n" + + "\x10ReadFileResponse\x12\x18\n" + + "\acontent\x18\x01 \x01(\fR\acontent2\xac\x05\n" + "\x10HostAgentService\x12X\n" + "\rCreateSandbox\x12\".hostagent.v1.CreateSandboxRequest\x1a#.hostagent.v1.CreateSandboxResponse\x12[\n" + "\x0eDestroySandbox\x12#.hostagent.v1.DestroySandboxRequest\x1a$.hostagent.v1.DestroySandboxResponse\x12U\n" + "\fPauseSandbox\x12!.hostagent.v1.PauseSandboxRequest\x1a\".hostagent.v1.PauseSandboxResponse\x12X\n" + "\rResumeSandbox\x12\".hostagent.v1.ResumeSandboxRequest\x1a#.hostagent.v1.ResumeSandboxResponse\x12=\n" + "\x04Exec\x12\x19.hostagent.v1.ExecRequest\x1a\x1a.hostagent.v1.ExecResponse\x12X\n" + - "\rListSandboxes\x12\".hostagent.v1.ListSandboxesRequest\x1a#.hostagent.v1.ListSandboxesResponseB\xb0\x01\n" + + "\rListSandboxes\x12\".hostagent.v1.ListSandboxesRequest\x1a#.hostagent.v1.ListSandboxesResponse\x12L\n" + + "\tWriteFile\x12\x1e.hostagent.v1.WriteFileRequest\x1a\x1f.hostagent.v1.WriteFileResponse\x12I\n" + + "\bReadFile\x12\x1d.hostagent.v1.ReadFileRequest\x1a\x1e.hostagent.v1.ReadFileResponseB\xb0\x01\n" + "\x10com.hostagent.v1B\x0eHostagentProtoP\x01Z;git.omukk.dev/wrenn/sandbox/proto/hostagent/gen;hostagentv1\xa2\x02\x03HXX\xaa\x02\fHostagent.V1\xca\x02\fHostagent\\V1\xe2\x02\x18Hostagent\\V1\\GPBMetadata\xea\x02\rHostagent::V1b\x06proto3" var ( @@ -786,7 +1003,7 @@ func file_hostagent_proto_rawDescGZIP() []byte { return file_hostagent_proto_rawDescData } -var file_hostagent_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_hostagent_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_hostagent_proto_goTypes = []any{ (*CreateSandboxRequest)(nil), // 0: hostagent.v1.CreateSandboxRequest (*CreateSandboxResponse)(nil), // 1: hostagent.v1.CreateSandboxResponse @@ -801,6 +1018,10 @@ var file_hostagent_proto_goTypes = []any{ (*ListSandboxesRequest)(nil), // 10: hostagent.v1.ListSandboxesRequest (*ListSandboxesResponse)(nil), // 11: hostagent.v1.ListSandboxesResponse (*SandboxInfo)(nil), // 12: hostagent.v1.SandboxInfo + (*WriteFileRequest)(nil), // 13: hostagent.v1.WriteFileRequest + (*WriteFileResponse)(nil), // 14: hostagent.v1.WriteFileResponse + (*ReadFileRequest)(nil), // 15: hostagent.v1.ReadFileRequest + (*ReadFileResponse)(nil), // 16: hostagent.v1.ReadFileResponse } var file_hostagent_proto_depIdxs = []int32{ 12, // 0: hostagent.v1.ListSandboxesResponse.sandboxes:type_name -> hostagent.v1.SandboxInfo @@ -810,14 +1031,18 @@ var file_hostagent_proto_depIdxs = []int32{ 6, // 4: hostagent.v1.HostAgentService.ResumeSandbox:input_type -> hostagent.v1.ResumeSandboxRequest 8, // 5: hostagent.v1.HostAgentService.Exec:input_type -> hostagent.v1.ExecRequest 10, // 6: hostagent.v1.HostAgentService.ListSandboxes:input_type -> hostagent.v1.ListSandboxesRequest - 1, // 7: hostagent.v1.HostAgentService.CreateSandbox:output_type -> hostagent.v1.CreateSandboxResponse - 3, // 8: hostagent.v1.HostAgentService.DestroySandbox:output_type -> hostagent.v1.DestroySandboxResponse - 5, // 9: hostagent.v1.HostAgentService.PauseSandbox:output_type -> hostagent.v1.PauseSandboxResponse - 7, // 10: hostagent.v1.HostAgentService.ResumeSandbox:output_type -> hostagent.v1.ResumeSandboxResponse - 9, // 11: hostagent.v1.HostAgentService.Exec:output_type -> hostagent.v1.ExecResponse - 11, // 12: hostagent.v1.HostAgentService.ListSandboxes:output_type -> hostagent.v1.ListSandboxesResponse - 7, // [7:13] is the sub-list for method output_type - 1, // [1:7] is the sub-list for method input_type + 13, // 7: hostagent.v1.HostAgentService.WriteFile:input_type -> hostagent.v1.WriteFileRequest + 15, // 8: hostagent.v1.HostAgentService.ReadFile:input_type -> hostagent.v1.ReadFileRequest + 1, // 9: hostagent.v1.HostAgentService.CreateSandbox:output_type -> hostagent.v1.CreateSandboxResponse + 3, // 10: hostagent.v1.HostAgentService.DestroySandbox:output_type -> hostagent.v1.DestroySandboxResponse + 5, // 11: hostagent.v1.HostAgentService.PauseSandbox:output_type -> hostagent.v1.PauseSandboxResponse + 7, // 12: hostagent.v1.HostAgentService.ResumeSandbox:output_type -> hostagent.v1.ResumeSandboxResponse + 9, // 13: hostagent.v1.HostAgentService.Exec:output_type -> hostagent.v1.ExecResponse + 11, // 14: hostagent.v1.HostAgentService.ListSandboxes:output_type -> hostagent.v1.ListSandboxesResponse + 14, // 15: hostagent.v1.HostAgentService.WriteFile:output_type -> hostagent.v1.WriteFileResponse + 16, // 16: hostagent.v1.HostAgentService.ReadFile:output_type -> hostagent.v1.ReadFileResponse + 9, // [9:17] is the sub-list for method output_type + 1, // [1:9] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -834,7 +1059,7 @@ func file_hostagent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_hostagent_proto_rawDesc), len(file_hostagent_proto_rawDesc)), NumEnums: 0, - NumMessages: 13, + NumMessages: 17, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go b/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go index 5a14275..1b182d2 100644 --- a/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go +++ b/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go @@ -50,6 +50,12 @@ const ( // HostAgentServiceListSandboxesProcedure is the fully-qualified name of the HostAgentService's // ListSandboxes RPC. HostAgentServiceListSandboxesProcedure = "/hostagent.v1.HostAgentService/ListSandboxes" + // HostAgentServiceWriteFileProcedure is the fully-qualified name of the HostAgentService's + // WriteFile RPC. + HostAgentServiceWriteFileProcedure = "/hostagent.v1.HostAgentService/WriteFile" + // HostAgentServiceReadFileProcedure is the fully-qualified name of the HostAgentService's ReadFile + // RPC. + HostAgentServiceReadFileProcedure = "/hostagent.v1.HostAgentService/ReadFile" ) // HostAgentServiceClient is a client for the hostagent.v1.HostAgentService service. @@ -66,6 +72,10 @@ type HostAgentServiceClient interface { Exec(context.Context, *connect.Request[gen.ExecRequest]) (*connect.Response[gen.ExecResponse], error) // ListSandboxes returns all sandboxes managed by this host agent. ListSandboxes(context.Context, *connect.Request[gen.ListSandboxesRequest]) (*connect.Response[gen.ListSandboxesResponse], error) + // WriteFile writes content to a file inside a sandbox. + WriteFile(context.Context, *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) + // ReadFile reads a file from inside a sandbox. + ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) } // NewHostAgentServiceClient constructs a client for the hostagent.v1.HostAgentService service. By @@ -115,6 +125,18 @@ func NewHostAgentServiceClient(httpClient connect.HTTPClient, baseURL string, op connect.WithSchema(hostAgentServiceMethods.ByName("ListSandboxes")), connect.WithClientOptions(opts...), ), + writeFile: connect.NewClient[gen.WriteFileRequest, gen.WriteFileResponse]( + httpClient, + baseURL+HostAgentServiceWriteFileProcedure, + connect.WithSchema(hostAgentServiceMethods.ByName("WriteFile")), + connect.WithClientOptions(opts...), + ), + readFile: connect.NewClient[gen.ReadFileRequest, gen.ReadFileResponse]( + httpClient, + baseURL+HostAgentServiceReadFileProcedure, + connect.WithSchema(hostAgentServiceMethods.ByName("ReadFile")), + connect.WithClientOptions(opts...), + ), } } @@ -126,6 +148,8 @@ type hostAgentServiceClient struct { resumeSandbox *connect.Client[gen.ResumeSandboxRequest, gen.ResumeSandboxResponse] exec *connect.Client[gen.ExecRequest, gen.ExecResponse] listSandboxes *connect.Client[gen.ListSandboxesRequest, gen.ListSandboxesResponse] + writeFile *connect.Client[gen.WriteFileRequest, gen.WriteFileResponse] + readFile *connect.Client[gen.ReadFileRequest, gen.ReadFileResponse] } // CreateSandbox calls hostagent.v1.HostAgentService.CreateSandbox. @@ -158,6 +182,16 @@ func (c *hostAgentServiceClient) ListSandboxes(ctx context.Context, req *connect return c.listSandboxes.CallUnary(ctx, req) } +// WriteFile calls hostagent.v1.HostAgentService.WriteFile. +func (c *hostAgentServiceClient) WriteFile(ctx context.Context, req *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) { + return c.writeFile.CallUnary(ctx, req) +} + +// ReadFile calls hostagent.v1.HostAgentService.ReadFile. +func (c *hostAgentServiceClient) ReadFile(ctx context.Context, req *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) { + return c.readFile.CallUnary(ctx, req) +} + // HostAgentServiceHandler is an implementation of the hostagent.v1.HostAgentService service. type HostAgentServiceHandler interface { // CreateSandbox boots a new microVM with the given configuration. @@ -172,6 +206,10 @@ type HostAgentServiceHandler interface { Exec(context.Context, *connect.Request[gen.ExecRequest]) (*connect.Response[gen.ExecResponse], error) // ListSandboxes returns all sandboxes managed by this host agent. ListSandboxes(context.Context, *connect.Request[gen.ListSandboxesRequest]) (*connect.Response[gen.ListSandboxesResponse], error) + // WriteFile writes content to a file inside a sandbox. + WriteFile(context.Context, *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) + // ReadFile reads a file from inside a sandbox. + ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) } // NewHostAgentServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -217,6 +255,18 @@ func NewHostAgentServiceHandler(svc HostAgentServiceHandler, opts ...connect.Han connect.WithSchema(hostAgentServiceMethods.ByName("ListSandboxes")), connect.WithHandlerOptions(opts...), ) + hostAgentServiceWriteFileHandler := connect.NewUnaryHandler( + HostAgentServiceWriteFileProcedure, + svc.WriteFile, + connect.WithSchema(hostAgentServiceMethods.ByName("WriteFile")), + connect.WithHandlerOptions(opts...), + ) + hostAgentServiceReadFileHandler := connect.NewUnaryHandler( + HostAgentServiceReadFileProcedure, + svc.ReadFile, + connect.WithSchema(hostAgentServiceMethods.ByName("ReadFile")), + connect.WithHandlerOptions(opts...), + ) return "/hostagent.v1.HostAgentService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case HostAgentServiceCreateSandboxProcedure: @@ -231,6 +281,10 @@ func NewHostAgentServiceHandler(svc HostAgentServiceHandler, opts ...connect.Han hostAgentServiceExecHandler.ServeHTTP(w, r) case HostAgentServiceListSandboxesProcedure: hostAgentServiceListSandboxesHandler.ServeHTTP(w, r) + case HostAgentServiceWriteFileProcedure: + hostAgentServiceWriteFileHandler.ServeHTTP(w, r) + case HostAgentServiceReadFileProcedure: + hostAgentServiceReadFileHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -263,3 +317,11 @@ func (UnimplementedHostAgentServiceHandler) Exec(context.Context, *connect.Reque func (UnimplementedHostAgentServiceHandler) ListSandboxes(context.Context, *connect.Request[gen.ListSandboxesRequest]) (*connect.Response[gen.ListSandboxesResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.ListSandboxes is not implemented")) } + +func (UnimplementedHostAgentServiceHandler) WriteFile(context.Context, *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.WriteFile is not implemented")) +} + +func (UnimplementedHostAgentServiceHandler) ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.ReadFile is not implemented")) +} diff --git a/proto/hostagent/hostagent.proto b/proto/hostagent/hostagent.proto index f23c8a0..3509015 100644 --- a/proto/hostagent/hostagent.proto +++ b/proto/hostagent/hostagent.proto @@ -22,9 +22,18 @@ service HostAgentService { // ListSandboxes returns all sandboxes managed by this host agent. rpc ListSandboxes(ListSandboxesRequest) returns (ListSandboxesResponse); + + // WriteFile writes content to a file inside a sandbox. + rpc WriteFile(WriteFileRequest) returns (WriteFileResponse); + + // ReadFile reads a file from inside a sandbox. + rpc ReadFile(ReadFileRequest) returns (ReadFileResponse); } message CreateSandboxRequest { + // Sandbox ID assigned by the control plane. If empty, the host agent generates one. + string sandbox_id = 5; + // Template name (e.g., "minimal", "python311"). Determines base rootfs. string template = 1; @@ -94,3 +103,20 @@ message SandboxInfo { int64 last_active_at_unix = 8; int32 timeout_sec = 9; } + +message WriteFileRequest { + string sandbox_id = 1; + string path = 2; + bytes content = 3; +} + +message WriteFileResponse {} + +message ReadFileRequest { + string sandbox_id = 1; + string path = 2; +} + +message ReadFileResponse { + bytes content = 1; +} diff --git a/scripts/seed.go b/scripts/seed.go deleted file mode 100644 index e69de29..0000000 diff --git a/sqlc.yaml b/sqlc.yaml new file mode 100644 index 0000000..eb9298f --- /dev/null +++ b/sqlc.yaml @@ -0,0 +1,11 @@ +version: "2" +sql: + - engine: "postgresql" + queries: "db/queries" + schema: "db/migrations" + gen: + go: + package: "db" + out: "internal/db" + sql_package: "pgx/v5" + emit_json_tags: true