1
0
forked from wrenn/wrenn

Prototype with single host server and no admin panel (#2)

Reviewed-on: wrenn/sandbox#2
Co-authored-by: pptx704 <rafeed@omukk.dev>
Co-committed-by: pptx704 <rafeed@omukk.dev>
This commit is contained in:
2026-03-22 21:01:23 +00:00
committed by Rafeed M. Bhuiyan
parent bd78cc068c
commit 32e5a5a715
293 changed files with 46885 additions and 1033 deletions

View File

@ -0,0 +1,205 @@
package hostagent
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"runtime"
"strings"
"time"
"golang.org/x/sys/unix"
)
// RegistrationConfig holds the configuration for host registration.
type RegistrationConfig struct {
CPURL string // Control plane base URL (e.g., http://localhost:8000)
RegistrationToken string // One-time registration token from the control plane
TokenFile string // Path to persist the host JWT after registration
Address string // Externally-reachable address (ip:port) for this host
}
type registerRequest struct {
Token string `json:"token"`
Arch string `json:"arch"`
CPUCores int32 `json:"cpu_cores"`
MemoryMB int32 `json:"memory_mb"`
DiskGB int32 `json:"disk_gb"`
Address string `json:"address"`
}
type registerResponse struct {
Host json.RawMessage `json:"host"`
Token string `json:"token"`
}
type errorResponse struct {
Error struct {
Code string `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
// Register calls the control plane to register this host agent and persists
// the returned JWT to disk. Returns the host JWT token string.
func Register(ctx context.Context, cfg RegistrationConfig) (string, error) {
// Check if we already have a saved token.
if data, err := os.ReadFile(cfg.TokenFile); err == nil {
token := strings.TrimSpace(string(data))
if token != "" {
slog.Info("loaded existing host token", "file", cfg.TokenFile)
return token, nil
}
}
if cfg.RegistrationToken == "" {
return "", fmt.Errorf("no saved host token and no registration token provided")
}
arch := runtime.GOARCH
cpuCores := int32(runtime.NumCPU())
memoryMB := getMemoryMB()
diskGB := getDiskGB()
reqBody := registerRequest{
Token: cfg.RegistrationToken,
Arch: arch,
CPUCores: cpuCores,
MemoryMB: memoryMB,
DiskGB: diskGB,
Address: cfg.Address,
}
body, err := json.Marshal(reqBody)
if err != nil {
return "", fmt.Errorf("marshal registration request: %w", err)
}
url := strings.TrimRight(cfg.CPURL, "/") + "/v1/hosts/register"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("create registration request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("registration request failed: %w", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read registration response: %w", err)
}
if resp.StatusCode != http.StatusCreated {
var errResp errorResponse
if err := json.Unmarshal(respBody, &errResp); err == nil {
return "", fmt.Errorf("registration failed (%d): %s", resp.StatusCode, errResp.Error.Message)
}
return "", fmt.Errorf("registration failed (%d): %s", resp.StatusCode, string(respBody))
}
var regResp registerResponse
if err := json.Unmarshal(respBody, &regResp); err != nil {
return "", fmt.Errorf("parse registration response: %w", err)
}
if regResp.Token == "" {
return "", fmt.Errorf("registration response missing token")
}
// Persist the token to disk for subsequent startups.
if err := os.WriteFile(cfg.TokenFile, []byte(regResp.Token), 0600); err != nil {
return "", fmt.Errorf("save host token: %w", err)
}
slog.Info("host registered and token saved", "file", cfg.TokenFile)
return regResp.Token, nil
}
// StartHeartbeat launches a background goroutine that sends periodic heartbeats
// to the control plane. It runs until the context is cancelled.
func StartHeartbeat(ctx context.Context, cpURL, hostID, hostToken string, interval time.Duration) {
url := strings.TrimRight(cpURL, "/") + "/v1/hosts/" + hostID + "/heartbeat"
client := &http.Client{Timeout: 10 * time.Second}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
if err != nil {
slog.Warn("heartbeat: failed to create request", "error", err)
continue
}
req.Header.Set("X-Host-Token", hostToken)
resp, err := client.Do(req)
if err != nil {
slog.Warn("heartbeat: request failed", "error", err)
continue
}
resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
slog.Warn("heartbeat: unexpected status", "status", resp.StatusCode)
}
}
}
}()
}
// HostIDFromToken extracts the host_id claim from a host JWT without
// verifying the signature (the agent doesn't have the signing secret).
func HostIDFromToken(token string) (string, error) {
parts := strings.Split(token, ".")
if len(parts) != 3 {
return "", fmt.Errorf("invalid JWT format")
}
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return "", fmt.Errorf("decode JWT payload: %w", err)
}
var claims struct {
HostID string `json:"host_id"`
}
if err := json.Unmarshal(payload, &claims); err != nil {
return "", fmt.Errorf("parse JWT claims: %w", err)
}
if claims.HostID == "" {
return "", fmt.Errorf("host_id claim missing from token")
}
return claims.HostID, nil
}
// getMemoryMB returns total system memory in MB.
func getMemoryMB() int32 {
var info unix.Sysinfo_t
if err := unix.Sysinfo(&info); err != nil {
return 0
}
return int32(info.Totalram * uint64(info.Unit) / (1024 * 1024))
}
// getDiskGB returns total disk space of the root filesystem in GB.
func getDiskGB() int32 {
var stat unix.Statfs_t
if err := unix.Statfs("/", &stat); err != nil {
return 0
}
return int32(stat.Blocks * uint64(stat.Bsize) / (1024 * 1024 * 1024))
}

View File

@ -0,0 +1,414 @@
package hostagent
import (
"context"
"fmt"
"io"
"log/slog"
"mime/multipart"
"net/http"
"net/url"
"strings"
"time"
"connectrpc.com/connect"
pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen"
"git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect"
"git.omukk.dev/wrenn/sandbox/internal/sandbox"
)
// Server implements the HostAgentService Connect RPC handler.
type Server struct {
hostagentv1connect.UnimplementedHostAgentServiceHandler
mgr *sandbox.Manager
}
// NewServer creates a new host agent RPC server.
func NewServer(mgr *sandbox.Manager) *Server {
return &Server{mgr: mgr}
}
func (s *Server) CreateSandbox(
ctx context.Context,
req *connect.Request[pb.CreateSandboxRequest],
) (*connect.Response[pb.CreateSandboxResponse], error) {
msg := req.Msg
sb, err := s.mgr.Create(ctx, msg.SandboxId, msg.Template, int(msg.Vcpus), int(msg.MemoryMb), int(msg.TimeoutSec))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create sandbox: %w", err))
}
return connect.NewResponse(&pb.CreateSandboxResponse{
SandboxId: sb.ID,
Status: string(sb.Status),
HostIp: sb.HostIP.String(),
}), nil
}
func (s *Server) DestroySandbox(
ctx context.Context,
req *connect.Request[pb.DestroySandboxRequest],
) (*connect.Response[pb.DestroySandboxResponse], error) {
if err := s.mgr.Destroy(ctx, req.Msg.SandboxId); err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return connect.NewResponse(&pb.DestroySandboxResponse{}), nil
}
func (s *Server) PauseSandbox(
ctx context.Context,
req *connect.Request[pb.PauseSandboxRequest],
) (*connect.Response[pb.PauseSandboxResponse], error) {
if err := s.mgr.Pause(ctx, req.Msg.SandboxId); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&pb.PauseSandboxResponse{}), nil
}
func (s *Server) ResumeSandbox(
ctx context.Context,
req *connect.Request[pb.ResumeSandboxRequest],
) (*connect.Response[pb.ResumeSandboxResponse], error) {
sb, err := s.mgr.Resume(ctx, req.Msg.SandboxId, int(req.Msg.TimeoutSec))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&pb.ResumeSandboxResponse{
SandboxId: sb.ID,
Status: string(sb.Status),
HostIp: sb.HostIP.String(),
}), nil
}
func (s *Server) CreateSnapshot(
ctx context.Context,
req *connect.Request[pb.CreateSnapshotRequest],
) (*connect.Response[pb.CreateSnapshotResponse], error) {
sizeBytes, err := s.mgr.CreateSnapshot(ctx, req.Msg.SandboxId, req.Msg.Name)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create snapshot: %w", err))
}
return connect.NewResponse(&pb.CreateSnapshotResponse{
Name: req.Msg.Name,
SizeBytes: sizeBytes,
}), nil
}
func (s *Server) DeleteSnapshot(
ctx context.Context,
req *connect.Request[pb.DeleteSnapshotRequest],
) (*connect.Response[pb.DeleteSnapshotResponse], error) {
if err := s.mgr.DeleteSnapshot(req.Msg.Name); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("delete snapshot: %w", err))
}
return connect.NewResponse(&pb.DeleteSnapshotResponse{}), nil
}
func (s *Server) PingSandbox(
ctx context.Context,
req *connect.Request[pb.PingSandboxRequest],
) (*connect.Response[pb.PingSandboxResponse], error) {
if err := s.mgr.Ping(req.Msg.SandboxId); err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeFailedPrecondition, err)
}
return connect.NewResponse(&pb.PingSandboxResponse{}), nil
}
func (s *Server) Exec(
ctx context.Context,
req *connect.Request[pb.ExecRequest],
) (*connect.Response[pb.ExecResponse], error) {
msg := req.Msg
timeout := 30 * time.Second
if msg.TimeoutSec > 0 {
timeout = time.Duration(msg.TimeoutSec) * time.Second
}
execCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
result, err := s.mgr.Exec(execCtx, msg.SandboxId, msg.Cmd, msg.Args...)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("exec: %w", err))
}
return connect.NewResponse(&pb.ExecResponse{
Stdout: result.Stdout,
Stderr: result.Stderr,
ExitCode: result.ExitCode,
}), nil
}
func (s *Server) WriteFile(
ctx context.Context,
req *connect.Request[pb.WriteFileRequest],
) (*connect.Response[pb.WriteFileResponse], error) {
msg := req.Msg
client, err := s.mgr.GetClient(msg.SandboxId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
if err := client.WriteFile(ctx, msg.Path, msg.Content); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write file: %w", err))
}
return connect.NewResponse(&pb.WriteFileResponse{}), nil
}
func (s *Server) ReadFile(
ctx context.Context,
req *connect.Request[pb.ReadFileRequest],
) (*connect.Response[pb.ReadFileResponse], error) {
msg := req.Msg
client, err := s.mgr.GetClient(msg.SandboxId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
content, err := client.ReadFile(ctx, msg.Path)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("read file: %w", err))
}
return connect.NewResponse(&pb.ReadFileResponse{Content: content}), nil
}
func (s *Server) ExecStream(
ctx context.Context,
req *connect.Request[pb.ExecStreamRequest],
stream *connect.ServerStream[pb.ExecStreamResponse],
) error {
msg := req.Msg
// Only apply a timeout if explicitly requested; streaming execs may be long-running.
execCtx := ctx
if msg.TimeoutSec > 0 {
var cancel context.CancelFunc
execCtx, cancel = context.WithTimeout(ctx, time.Duration(msg.TimeoutSec)*time.Second)
defer cancel()
}
events, err := s.mgr.ExecStream(execCtx, msg.SandboxId, msg.Cmd, msg.Args...)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("exec stream: %w", err))
}
for ev := range events {
var resp pb.ExecStreamResponse
switch ev.Type {
case "start":
resp.Event = &pb.ExecStreamResponse_Start{
Start: &pb.ExecStreamStart{Pid: ev.PID},
}
case "stdout":
resp.Event = &pb.ExecStreamResponse_Data{
Data: &pb.ExecStreamData{
Output: &pb.ExecStreamData_Stdout{Stdout: ev.Data},
},
}
case "stderr":
resp.Event = &pb.ExecStreamResponse_Data{
Data: &pb.ExecStreamData{
Output: &pb.ExecStreamData_Stderr{Stderr: ev.Data},
},
}
case "end":
resp.Event = &pb.ExecStreamResponse_End{
End: &pb.ExecStreamEnd{
ExitCode: ev.ExitCode,
Error: ev.Error,
},
}
}
if err := stream.Send(&resp); err != nil {
return err
}
}
return nil
}
func (s *Server) WriteFileStream(
ctx context.Context,
stream *connect.ClientStream[pb.WriteFileStreamRequest],
) (*connect.Response[pb.WriteFileStreamResponse], error) {
// First message must contain metadata.
if !stream.Receive() {
if err := stream.Err(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("empty stream"))
}
first := stream.Msg()
meta := first.GetMeta()
if meta == nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("first message must contain metadata"))
}
client, err := s.mgr.GetClient(meta.SandboxId)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}
// Use io.Pipe to stream chunks into a multipart body for envd's REST endpoint.
pr, pw := io.Pipe()
mpWriter := multipart.NewWriter(pw)
// Write multipart data in a goroutine.
errCh := make(chan error, 1)
go func() {
defer pw.Close()
part, err := mpWriter.CreateFormFile("file", "upload")
if err != nil {
errCh <- fmt.Errorf("create multipart: %w", err)
return
}
for stream.Receive() {
chunk := stream.Msg().GetChunk()
if len(chunk) == 0 {
continue
}
if _, err := part.Write(chunk); err != nil {
errCh <- fmt.Errorf("write chunk: %w", err)
return
}
}
if err := stream.Err(); err != nil {
errCh <- err
return
}
mpWriter.Close()
errCh <- nil
}()
// Send the streaming multipart body to envd.
base := client.BaseURL()
u := fmt.Sprintf("%s/files?%s", base, url.Values{
"path": {meta.Path},
"username": {"root"},
}.Encode())
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u, pr)
if err != nil {
pw.CloseWithError(err)
<-errCh
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
}
httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType())
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
pw.CloseWithError(err)
<-errCh
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write file stream: %w", err))
}
defer resp.Body.Close()
// Wait for the writer goroutine.
if writerErr := <-errCh; writerErr != nil {
return nil, connect.NewError(connect.CodeInternal, writerErr)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("envd write: status %d: %s", resp.StatusCode, string(body)))
}
slog.Debug("streaming file write complete", "sandbox_id", meta.SandboxId, "path", meta.Path)
return connect.NewResponse(&pb.WriteFileStreamResponse{}), nil
}
func (s *Server) ReadFileStream(
ctx context.Context,
req *connect.Request[pb.ReadFileStreamRequest],
stream *connect.ServerStream[pb.ReadFileStreamResponse],
) error {
msg := req.Msg
client, err := s.mgr.GetClient(msg.SandboxId)
if err != nil {
return connect.NewError(connect.CodeNotFound, err)
}
base := client.BaseURL()
u := fmt.Sprintf("%s/files?%s", base, url.Values{
"path": {msg.Path},
"username": {"root"},
}.Encode())
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err))
}
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err))
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return connect.NewError(connect.CodeInternal, fmt.Errorf("envd read: status %d: %s", resp.StatusCode, string(body)))
}
// Stream file content in 64KB chunks.
buf := make([]byte, 64*1024)
for {
n, err := resp.Body.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
if sendErr := stream.Send(&pb.ReadFileStreamResponse{Chunk: chunk}); sendErr != nil {
return sendErr
}
}
if err == io.EOF {
break
}
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("read body: %w", err))
}
}
return nil
}
func (s *Server) ListSandboxes(
ctx context.Context,
req *connect.Request[pb.ListSandboxesRequest],
) (*connect.Response[pb.ListSandboxesResponse], error) {
sandboxes := s.mgr.List()
infos := make([]*pb.SandboxInfo, len(sandboxes))
for i, sb := range sandboxes {
infos[i] = &pb.SandboxInfo{
SandboxId: sb.ID,
Status: string(sb.Status),
Template: sb.Template,
Vcpus: int32(sb.VCPUs),
MemoryMb: int32(sb.MemoryMB),
HostIp: sb.HostIP.String(),
CreatedAtUnix: sb.CreatedAt.Unix(),
LastActiveAtUnix: sb.LastActiveAt.Unix(),
TimeoutSec: int32(sb.TimeoutSec),
}
}
return connect.NewResponse(&pb.ListSandboxesResponse{
Sandboxes: infos,
AutoPausedSandboxIds: s.mgr.DrainAutoPausedIDs(),
}), nil
}