From b4d8edb65b76f7e17367556f6f43b56a7408e9cc Mon Sep 17 00:00:00 2001 From: pptx704 Date: Wed, 11 Mar 2026 05:42:42 +0600 Subject: [PATCH] Add streaming exec and file transfer endpoints Add WebSocket-based streaming exec endpoint and streaming file upload/download endpoints to the control plane API. Includes new host agent RPC methods (ExecStream, StreamWriteFile, StreamReadFile), envd client streaming support, and OpenAPI spec updates. --- go.mod | 1 + go.sum | 2 + internal/api/handlers_exec_stream.go | 169 ++++ internal/api/handlers_files_stream.go | 194 +++++ internal/api/middleware.go | 22 + internal/api/openapi.yaml | 137 ++++ internal/api/server.go | 5 + internal/envdclient/client.go | 82 ++ internal/hostagent/server.go | 208 +++++ internal/sandbox/manager.go | 18 + proto/hostagent/gen/hostagent.pb.go | 767 +++++++++++++++++- .../hostagentv1connect/hostagent.connect.go | 111 ++- proto/hostagent/hostagent.proto | 68 ++ 13 files changed, 1734 insertions(+), 50 deletions(-) create mode 100644 internal/api/handlers_exec_stream.go create mode 100644 internal/api/handlers_files_stream.go diff --git a/go.mod b/go.mod index 1ff46b1..8d905b6 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.0 require ( connectrpc.com/connect v1.19.1 github.com/go-chi/chi/v5 v5.2.5 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.8.0 github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f diff --git a/go.sum b/go.sum index 5180f1b..2795a09 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/api/handlers_exec_stream.go b/internal/api/handlers_exec_stream.go new file mode 100644 index 0000000..7f7bdf6 --- /dev/null +++ b/internal/api/handlers_exec_stream.go @@ -0,0 +1,169 @@ +package api + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "time" + + "connectrpc.com/connect" + "github.com/go-chi/chi/v5" + "github.com/gorilla/websocket" + "github.com/jackc/pgx/v5/pgtype" + + "git.omukk.dev/wrenn/sandbox/internal/db" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +type execStreamHandler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient +} + +func newExecStreamHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *execStreamHandler { + return &execStreamHandler{db: db, agent: agent} +} + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// wsStartMsg is the first message the client sends to start a process. +type wsStartMsg struct { + Type string `json:"type"` // "start" + Cmd string `json:"cmd"` + Args []string `json:"args"` +} + +// wsStopMsg is sent by the client to stop the process. +type wsStopMsg struct { + Type string `json:"type"` // "stop" +} + +// wsOutMsg is sent by the server for process events. +type wsOutMsg struct { + Type string `json:"type"` // "start", "stdout", "stderr", "exit", "error" + PID uint32 `json:"pid,omitempty"` // only for "start" + Data string `json:"data,omitempty"` // only for "stdout", "stderr", "error" + ExitCode *int32 `json:"exit_code,omitempty"` // only for "exit" +} + +// ExecStream handles WS /v1/sandboxes/{id}/exec/stream. +func (h *execStreamHandler) ExecStream(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")") + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + slog.Error("websocket upgrade failed", "error", err) + return + } + defer conn.Close() + + // Read the start message. + var startMsg wsStartMsg + if err := conn.ReadJSON(&startMsg); err != nil { + sendWSError(conn, "failed to read start message: "+err.Error()) + return + } + if startMsg.Type != "start" || startMsg.Cmd == "" { + sendWSError(conn, "first message must be type 'start' with a 'cmd' field") + return + } + + // Open streaming exec to host agent. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := h.agent.ExecStream(streamCtx, connect.NewRequest(&pb.ExecStreamRequest{ + SandboxId: sandboxID, + Cmd: startMsg.Cmd, + Args: startMsg.Args, + })) + if err != nil { + sendWSError(conn, "failed to start exec stream: "+err.Error()) + return + } + defer stream.Close() + + // Listen for stop messages from the client in a goroutine. + go func() { + for { + _, msg, err := conn.ReadMessage() + if err != nil { + cancel() + return + } + var parsed struct { + Type string `json:"type"` + } + if json.Unmarshal(msg, &parsed) == nil && parsed.Type == "stop" { + cancel() + return + } + } + }() + + // Forward stream events to WebSocket. + for stream.Receive() { + resp := stream.Msg() + switch ev := resp.Event.(type) { + case *pb.ExecStreamResponse_Start: + writeWSJSON(conn, wsOutMsg{Type: "start", PID: ev.Start.Pid}) + + case *pb.ExecStreamResponse_Data: + switch o := ev.Data.Output.(type) { + case *pb.ExecStreamData_Stdout: + writeWSJSON(conn, wsOutMsg{Type: "stdout", Data: string(o.Stdout)}) + case *pb.ExecStreamData_Stderr: + writeWSJSON(conn, wsOutMsg{Type: "stderr", Data: string(o.Stderr)}) + } + + case *pb.ExecStreamResponse_End: + exitCode := ev.End.ExitCode + writeWSJSON(conn, wsOutMsg{Type: "exit", ExitCode: &exitCode}) + } + } + + if err := stream.Err(); err != nil { + // Only send if the connection is still alive (not a normal close). + if streamCtx.Err() == nil { + sendWSError(conn, err.Error()) + } + } + + // Update last active using a fresh context (the request context may be cancelled). + updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer updateCancel() + if err := h.db.UpdateLastActive(updateCtx, db.UpdateLastActiveParams{ + ID: sandboxID, + LastActiveAt: pgtype.Timestamptz{ + Time: time.Now(), + Valid: true, + }, + }); err != nil { + slog.Warn("failed to update last active after stream exec", "sandbox_id", sandboxID, "error", err) + } +} + +func sendWSError(conn *websocket.Conn, msg string) { + writeWSJSON(conn, wsOutMsg{Type: "error", Data: msg}) +} + +func writeWSJSON(conn *websocket.Conn, v any) { + if err := conn.WriteJSON(v); err != nil { + slog.Debug("websocket write error", "error", err) + } +} diff --git a/internal/api/handlers_files_stream.go b/internal/api/handlers_files_stream.go new file mode 100644 index 0000000..999bbd5 --- /dev/null +++ b/internal/api/handlers_files_stream.go @@ -0,0 +1,194 @@ +package api + +import ( + "io" + "mime" + "mime/multipart" + "net/http" + + "connectrpc.com/connect" + "github.com/go-chi/chi/v5" + + "git.omukk.dev/wrenn/sandbox/internal/db" + pb "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen" + "git.omukk.dev/wrenn/sandbox/proto/hostagent/gen/hostagentv1connect" +) + +type filesStreamHandler struct { + db *db.Queries + agent hostagentv1connect.HostAgentServiceClient +} + +func newFilesStreamHandler(db *db.Queries, agent hostagentv1connect.HostAgentServiceClient) *filesStreamHandler { + return &filesStreamHandler{db: db, agent: agent} +} + +// StreamUpload handles POST /v1/sandboxes/{id}/files/stream/write. +// Expects multipart/form-data with "path" text field and "file" file field. +// Streams file content directly from the request body to the host agent without buffering. +func (h *filesStreamHandler) StreamUpload(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running") + return + } + + // Parse boundary from Content-Type without buffering the body. + contentType := r.Header.Get("Content-Type") + _, params, err := mime.ParseMediaType(contentType) + if err != nil || params["boundary"] == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "expected multipart/form-data with boundary") + return + } + + // Read parts manually from the multipart stream. + mr := multipart.NewReader(r.Body, params["boundary"]) + + var filePath string + var filePart *multipart.Part + + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "failed to parse multipart") + return + } + switch part.FormName() { + case "path": + data, _ := io.ReadAll(part) + filePath = string(data) + case "file": + filePart = part + } + if filePath != "" && filePart != nil { + break + } + } + + if filePath == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "path field is required") + return + } + if filePart == nil { + writeError(w, http.StatusBadRequest, "invalid_request", "file field is required") + return + } + defer filePart.Close() + + // Open client-streaming RPC to host agent. + stream := h.agent.WriteFileStream(ctx) + + // Send metadata first. + if err := stream.Send(&pb.WriteFileStreamRequest{ + Content: &pb.WriteFileStreamRequest_Meta{ + Meta: &pb.WriteFileStreamMeta{ + SandboxId: sandboxID, + Path: filePath, + }, + }, + }); err != nil { + writeError(w, http.StatusBadGateway, "agent_error", "failed to send file metadata") + return + } + + // Stream file content in 64KB chunks directly from the multipart part. + buf := make([]byte, 64*1024) + for { + n, err := filePart.Read(buf) + if n > 0 { + chunk := make([]byte, n) + copy(chunk, buf[:n]) + if sendErr := stream.Send(&pb.WriteFileStreamRequest{ + Content: &pb.WriteFileStreamRequest_Chunk{Chunk: chunk}, + }); sendErr != nil { + writeError(w, http.StatusBadGateway, "agent_error", "failed to stream file chunk") + return + } + } + if err == io.EOF { + break + } + if err != nil { + writeError(w, http.StatusInternalServerError, "read_error", "failed to read uploaded file") + return + } + } + + // Close and receive response. + if _, err := stream.CloseAndReceive(); err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// StreamDownload handles POST /v1/sandboxes/{id}/files/stream/read. +// Accepts JSON body with path, streams file content back without buffering. +func (h *filesStreamHandler) StreamDownload(w http.ResponseWriter, r *http.Request) { + sandboxID := chi.URLParam(r, "id") + ctx := r.Context() + + sb, err := h.db.GetSandbox(ctx, sandboxID) + if err != nil { + writeError(w, http.StatusNotFound, "not_found", "sandbox not found") + return + } + if sb.Status != "running" { + writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running") + return + } + + var req readFileRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + if req.Path == "" { + writeError(w, http.StatusBadRequest, "invalid_request", "path is required") + return + } + + // Open server-streaming RPC to host agent. + stream, err := h.agent.ReadFileStream(ctx, connect.NewRequest(&pb.ReadFileStreamRequest{ + SandboxId: sandboxID, + Path: req.Path, + })) + if err != nil { + status, code, msg := agentErrToHTTP(err) + writeError(w, status, code, msg) + return + } + defer stream.Close() + + w.Header().Set("Content-Type", "application/octet-stream") + + flusher, canFlush := w.(http.Flusher) + for stream.Receive() { + chunk := stream.Msg().Chunk + if len(chunk) > 0 { + if _, err := w.Write(chunk); err != nil { + return + } + if canFlush { + flusher.Flush() + } + } + } + + if err := stream.Err(); err != nil { + // Headers already sent, nothing we can do but log. + // The client will see a truncated response. + } +} diff --git a/internal/api/middleware.go b/internal/api/middleware.go index 5c2c24f..3e132e8 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -1,8 +1,11 @@ package api import ( + "bufio" "encoding/json" + "fmt" "log/slog" + "net" "net/http" "time" @@ -61,6 +64,10 @@ func requestLogger() func(http.Handler) http.Handler { } } +func decodeJSON(r *http.Request, v any) error { + return json.NewDecoder(r.Body).Decode(v) +} + type statusWriter struct { http.ResponseWriter status int @@ -70,3 +77,18 @@ func (w *statusWriter) WriteHeader(status int) { w.status = status w.ResponseWriter.WriteHeader(status) } + +// Hijack implements http.Hijacker, required for WebSocket upgrade. +func (w *statusWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if hj, ok := w.ResponseWriter.(http.Hijacker); ok { + return hj.Hijack() + } + return nil, nil, fmt.Errorf("underlying ResponseWriter does not implement http.Hijacker") +} + +// Flush implements http.Flusher, required for streaming responses. +func (w *statusWriter) Flush() { + if fl, ok := w.ResponseWriter.(http.Flusher); ok { + fl.Flush() + } +} diff --git a/internal/api/openapi.yaml b/internal/api/openapi.yaml index c8a7fab..cb6dc02 100644 --- a/internal/api/openapi.yaml +++ b/internal/api/openapi.yaml @@ -239,6 +239,143 @@ paths: schema: $ref: "#/components/schemas/Error" + /v1/sandboxes/{id}/exec/stream: + parameters: + - name: id + in: path + required: true + schema: + type: string + + get: + summary: Stream command execution via WebSocket + operationId: execStream + description: | + Opens a WebSocket connection for streaming command execution. + + **Client sends** (first message to start the process): + ```json + {"type": "start", "cmd": "tail", "args": ["-f", "/var/log/syslog"]} + ``` + + **Client sends** (to stop the process): + ```json + {"type": "stop"} + ``` + + **Server sends** (process events as they arrive): + ```json + {"type": "start", "pid": 1234} + {"type": "stdout", "data": "line of output\n"} + {"type": "stderr", "data": "warning message\n"} + {"type": "exit", "exit_code": 0} + {"type": "error", "data": "description of error"} + ``` + + The connection closes automatically after the process exits. + responses: + "101": + description: WebSocket upgrade + "404": + description: Sandbox not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/files/stream/write: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Upload a file (streaming) + operationId: streamUploadFile + description: | + Streams file content to the sandbox without buffering in memory. + Suitable for large files. Uses the same multipart/form-data format + as the non-streaming upload endpoint. + requestBody: + required: true + content: + multipart/form-data: + schema: + type: object + required: [path, file] + properties: + path: + type: string + description: Absolute destination path inside the sandbox + file: + type: string + format: binary + description: File content + responses: + "204": + description: File uploaded + "404": + description: Sandbox not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + + /v1/sandboxes/{id}/files/stream/read: + parameters: + - name: id + in: path + required: true + schema: + type: string + + post: + summary: Download a file (streaming) + operationId: streamDownloadFile + description: | + Streams file content from the sandbox without buffering in memory. + Suitable for large files. Returns raw bytes with chunked transfer encoding. + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ReadFileRequest" + responses: + "200": + description: File content streamed in chunks + content: + application/octet-stream: + schema: + type: string + format: binary + "404": + description: Sandbox or file not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "409": + description: Sandbox not running + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + components: schemas: CreateSandboxRequest: diff --git a/internal/api/server.go b/internal/api/server.go index 89b4f1e..693f4d1 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -26,7 +26,9 @@ func New(queries *db.Queries, agent hostagentv1connect.HostAgentServiceClient) * sandbox := newSandboxHandler(queries, agent) exec := newExecHandler(queries, agent) + execStream := newExecStreamHandler(queries, agent) files := newFilesHandler(queries, agent) + filesStream := newFilesStreamHandler(queries, agent) // OpenAPI spec and docs. r.Get("/openapi.yaml", serveOpenAPI) @@ -41,10 +43,13 @@ func New(queries *db.Queries, agent hostagentv1connect.HostAgentServiceClient) * r.Get("/", sandbox.Get) r.Delete("/", sandbox.Destroy) r.Post("/exec", exec.Exec) + r.Get("/exec/stream", execStream.ExecStream) r.Post("/pause", sandbox.Pause) r.Post("/resume", sandbox.Resume) r.Post("/files/write", files.Upload) r.Post("/files/read", files.Download) + r.Post("/files/stream/write", filesStream.StreamUpload) + r.Post("/files/stream/read", filesStream.StreamDownload) }) }) diff --git a/internal/envdclient/client.go b/internal/envdclient/client.go index c231c05..4976569 100644 --- a/internal/envdclient/client.go +++ b/internal/envdclient/client.go @@ -42,6 +42,11 @@ func New(hostIP string) *Client { } } +// BaseURL returns the HTTP base URL for reaching envd. +func (c *Client) BaseURL() string { + return c.base +} + // ExecResult holds the output of a command execution. type ExecResult struct { Stdout []byte @@ -110,6 +115,83 @@ func (c *Client) Exec(ctx context.Context, cmd string, args ...string) (*ExecRes return result, nil } +// ExecStreamEvent represents a single event from a streaming exec. +type ExecStreamEvent struct { + Type string // "start", "stdout", "stderr", "end" + PID uint32 + Data []byte + ExitCode int32 + Error string +} + +// ExecStream runs a command inside the sandbox and returns a channel of output events. +// The channel is closed when the process ends or the context is cancelled. +func (c *Client) ExecStream(ctx context.Context, cmd string, args ...string) (<-chan ExecStreamEvent, error) { + stdin := false + req := connect.NewRequest(&envdpb.StartRequest{ + Process: &envdpb.ProcessConfig{ + Cmd: cmd, + Args: args, + }, + Stdin: &stdin, + }) + + stream, err := c.process.Start(ctx, req) + if err != nil { + return nil, fmt.Errorf("start process: %w", err) + } + + ch := make(chan ExecStreamEvent, 16) + go func() { + defer close(ch) + defer stream.Close() + + for stream.Receive() { + msg := stream.Msg() + if msg.Event == nil { + continue + } + + var ev ExecStreamEvent + event := msg.Event.GetEvent() + switch e := event.(type) { + case *envdpb.ProcessEvent_Start: + ev = ExecStreamEvent{Type: "start", PID: e.Start.GetPid()} + + case *envdpb.ProcessEvent_Data: + output := e.Data.GetOutput() + switch o := output.(type) { + case *envdpb.ProcessEvent_DataEvent_Stdout: + ev = ExecStreamEvent{Type: "stdout", Data: o.Stdout} + case *envdpb.ProcessEvent_DataEvent_Stderr: + ev = ExecStreamEvent{Type: "stderr", Data: o.Stderr} + } + + case *envdpb.ProcessEvent_End: + ev = ExecStreamEvent{Type: "end", ExitCode: e.End.GetExitCode()} + if e.End.Error != nil { + ev.Error = e.End.GetError() + } + + case *envdpb.ProcessEvent_Keepalive: + continue + } + + select { + case ch <- ev: + case <-ctx.Done(): + return + } + } + + if err := stream.Err(); err != nil && err != io.EOF { + slog.Debug("exec stream error", "error", err) + } + }() + + return ch, nil +} + // WriteFile writes content to a file inside the sandbox via envd's REST endpoint. // envd expects POST /files?path=...&username=root with multipart/form-data (field name "file"). func (c *Client) WriteFile(ctx context.Context, path string, content []byte) error { diff --git a/internal/hostagent/server.go b/internal/hostagent/server.go index 19b3411..084b58c 100644 --- a/internal/hostagent/server.go +++ b/internal/hostagent/server.go @@ -3,6 +3,11 @@ package hostagent import ( "context" "fmt" + "io" + "log/slog" + "mime/multipart" + "net/http" + "net/url" "time" "connectrpc.com/connect" @@ -135,6 +140,209 @@ func (s *Server) ReadFile( return connect.NewResponse(&pb.ReadFileResponse{Content: content}), nil } +func (s *Server) ExecStream( + ctx context.Context, + req *connect.Request[pb.ExecStreamRequest], + stream *connect.ServerStream[pb.ExecStreamResponse], +) error { + msg := req.Msg + + // Only apply a timeout if explicitly requested; streaming execs may be long-running. + execCtx := ctx + if msg.TimeoutSec > 0 { + var cancel context.CancelFunc + execCtx, cancel = context.WithTimeout(ctx, time.Duration(msg.TimeoutSec)*time.Second) + defer cancel() + } + + events, err := s.mgr.ExecStream(execCtx, msg.SandboxId, msg.Cmd, msg.Args...) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("exec stream: %w", err)) + } + + for ev := range events { + var resp pb.ExecStreamResponse + switch ev.Type { + case "start": + resp.Event = &pb.ExecStreamResponse_Start{ + Start: &pb.ExecStreamStart{Pid: ev.PID}, + } + case "stdout": + resp.Event = &pb.ExecStreamResponse_Data{ + Data: &pb.ExecStreamData{ + Output: &pb.ExecStreamData_Stdout{Stdout: ev.Data}, + }, + } + case "stderr": + resp.Event = &pb.ExecStreamResponse_Data{ + Data: &pb.ExecStreamData{ + Output: &pb.ExecStreamData_Stderr{Stderr: ev.Data}, + }, + } + case "end": + resp.Event = &pb.ExecStreamResponse_End{ + End: &pb.ExecStreamEnd{ + ExitCode: ev.ExitCode, + Error: ev.Error, + }, + } + } + if err := stream.Send(&resp); err != nil { + return err + } + } + + return nil +} + +func (s *Server) WriteFileStream( + ctx context.Context, + stream *connect.ClientStream[pb.WriteFileStreamRequest], +) (*connect.Response[pb.WriteFileStreamResponse], error) { + // First message must contain metadata. + if !stream.Receive() { + if err := stream.Err(); err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("empty stream")) + } + + first := stream.Msg() + meta := first.GetMeta() + if meta == nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("first message must contain metadata")) + } + + client, err := s.mgr.GetClient(meta.SandboxId) + if err != nil { + return nil, connect.NewError(connect.CodeNotFound, err) + } + + // Use io.Pipe to stream chunks into a multipart body for envd's REST endpoint. + pr, pw := io.Pipe() + mpWriter := multipart.NewWriter(pw) + + // Write multipart data in a goroutine. + errCh := make(chan error, 1) + go func() { + defer pw.Close() + part, err := mpWriter.CreateFormFile("file", "upload") + if err != nil { + errCh <- fmt.Errorf("create multipart: %w", err) + return + } + + for stream.Receive() { + chunk := stream.Msg().GetChunk() + if len(chunk) == 0 { + continue + } + if _, err := part.Write(chunk); err != nil { + errCh <- fmt.Errorf("write chunk: %w", err) + return + } + } + if err := stream.Err(); err != nil { + errCh <- err + return + } + mpWriter.Close() + errCh <- nil + }() + + // Send the streaming multipart body to envd. + base := client.BaseURL() + u := fmt.Sprintf("%s/files?%s", base, url.Values{ + "path": {meta.Path}, + "username": {"root"}, + }.Encode()) + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u, pr) + if err != nil { + pw.CloseWithError(err) + <-errCh + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err)) + } + httpReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + pw.CloseWithError(err) + <-errCh + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write file stream: %w", err)) + } + defer resp.Body.Close() + + // Wait for the writer goroutine. + if writerErr := <-errCh; writerErr != nil { + return nil, connect.NewError(connect.CodeInternal, writerErr) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("envd write: status %d: %s", resp.StatusCode, string(body))) + } + + slog.Debug("streaming file write complete", "sandbox_id", meta.SandboxId, "path", meta.Path) + return connect.NewResponse(&pb.WriteFileStreamResponse{}), nil +} + +func (s *Server) ReadFileStream( + ctx context.Context, + req *connect.Request[pb.ReadFileStreamRequest], + stream *connect.ServerStream[pb.ReadFileStreamResponse], +) error { + msg := req.Msg + + client, err := s.mgr.GetClient(msg.SandboxId) + if err != nil { + return connect.NewError(connect.CodeNotFound, err) + } + + base := client.BaseURL() + u := fmt.Sprintf("%s/files?%s", base, url.Values{ + "path": {msg.Path}, + "username": {"root"}, + }.Encode()) + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("create request: %w", err)) + } + + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("read file stream: %w", err)) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return connect.NewError(connect.CodeInternal, fmt.Errorf("envd read: status %d: %s", resp.StatusCode, string(body))) + } + + // Stream file content in 64KB chunks. + buf := make([]byte, 64*1024) + for { + n, err := resp.Body.Read(buf) + if n > 0 { + chunk := make([]byte, n) + copy(chunk, buf[:n]) + if sendErr := stream.Send(&pb.ReadFileStreamResponse{Chunk: chunk}); sendErr != nil { + return sendErr + } + } + if err == io.EOF { + break + } + if err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("read body: %w", err)) + } + } + + return nil +} + func (s *Server) ListSandboxes( ctx context.Context, req *connect.Request[pb.ListSandboxesRequest], diff --git a/internal/sandbox/manager.go b/internal/sandbox/manager.go index 935e6e3..20f6394 100644 --- a/internal/sandbox/manager.go +++ b/internal/sandbox/manager.go @@ -262,6 +262,24 @@ func (m *Manager) Exec(ctx context.Context, sandboxID string, cmd string, args . return sb.client.Exec(ctx, cmd, args...) } +// ExecStream runs a command inside a sandbox and returns a channel of streaming events. +func (m *Manager) ExecStream(ctx context.Context, sandboxID string, cmd string, args ...string) (<-chan envdclient.ExecStreamEvent, error) { + sb, err := m.get(sandboxID) + if err != nil { + return nil, err + } + + if sb.Status != models.StatusRunning { + return nil, fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status) + } + + m.mu.Lock() + sb.LastActiveAt = time.Now() + m.mu.Unlock() + + return sb.client.ExecStream(ctx, cmd, args...) +} + // List returns all sandboxes. func (m *Manager) List() []models.Sandbox { m.mu.RLock() diff --git a/proto/hostagent/gen/hostagent.pb.go b/proto/hostagent/gen/hostagent.pb.go index 175421e..a0d6d6e 100644 --- a/proto/hostagent/gen/hostagent.pb.go +++ b/proto/hostagent/gen/hostagent.pb.go @@ -912,6 +912,616 @@ func (x *ReadFileResponse) GetContent() []byte { return nil } +type ExecStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + SandboxId string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + Cmd string `protobuf:"bytes,2,opt,name=cmd,proto3" json:"cmd,omitempty"` + Args []string `protobuf:"bytes,3,rep,name=args,proto3" json:"args,omitempty"` + TimeoutSec int32 `protobuf:"varint,4,opt,name=timeout_sec,json=timeoutSec,proto3" json:"timeout_sec,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExecStreamRequest) Reset() { + *x = ExecStreamRequest{} + mi := &file_hostagent_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExecStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecStreamRequest) ProtoMessage() {} + +func (x *ExecStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecStreamRequest.ProtoReflect.Descriptor instead. +func (*ExecStreamRequest) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{17} +} + +func (x *ExecStreamRequest) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + +func (x *ExecStreamRequest) GetCmd() string { + if x != nil { + return x.Cmd + } + return "" +} + +func (x *ExecStreamRequest) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + +func (x *ExecStreamRequest) GetTimeoutSec() int32 { + if x != nil { + return x.TimeoutSec + } + return 0 +} + +type ExecStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Event: + // + // *ExecStreamResponse_Start + // *ExecStreamResponse_Data + // *ExecStreamResponse_End + Event isExecStreamResponse_Event `protobuf_oneof:"event"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExecStreamResponse) Reset() { + *x = ExecStreamResponse{} + mi := &file_hostagent_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExecStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecStreamResponse) ProtoMessage() {} + +func (x *ExecStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecStreamResponse.ProtoReflect.Descriptor instead. +func (*ExecStreamResponse) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{18} +} + +func (x *ExecStreamResponse) GetEvent() isExecStreamResponse_Event { + if x != nil { + return x.Event + } + return nil +} + +func (x *ExecStreamResponse) GetStart() *ExecStreamStart { + if x != nil { + if x, ok := x.Event.(*ExecStreamResponse_Start); ok { + return x.Start + } + } + return nil +} + +func (x *ExecStreamResponse) GetData() *ExecStreamData { + if x != nil { + if x, ok := x.Event.(*ExecStreamResponse_Data); ok { + return x.Data + } + } + return nil +} + +func (x *ExecStreamResponse) GetEnd() *ExecStreamEnd { + if x != nil { + if x, ok := x.Event.(*ExecStreamResponse_End); ok { + return x.End + } + } + return nil +} + +type isExecStreamResponse_Event interface { + isExecStreamResponse_Event() +} + +type ExecStreamResponse_Start struct { + Start *ExecStreamStart `protobuf:"bytes,1,opt,name=start,proto3,oneof"` +} + +type ExecStreamResponse_Data struct { + Data *ExecStreamData `protobuf:"bytes,2,opt,name=data,proto3,oneof"` +} + +type ExecStreamResponse_End struct { + End *ExecStreamEnd `protobuf:"bytes,3,opt,name=end,proto3,oneof"` +} + +func (*ExecStreamResponse_Start) isExecStreamResponse_Event() {} + +func (*ExecStreamResponse_Data) isExecStreamResponse_Event() {} + +func (*ExecStreamResponse_End) isExecStreamResponse_Event() {} + +type ExecStreamStart struct { + state protoimpl.MessageState `protogen:"open.v1"` + Pid uint32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExecStreamStart) Reset() { + *x = ExecStreamStart{} + mi := &file_hostagent_proto_msgTypes[19] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExecStreamStart) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecStreamStart) ProtoMessage() {} + +func (x *ExecStreamStart) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[19] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecStreamStart.ProtoReflect.Descriptor instead. +func (*ExecStreamStart) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{19} +} + +func (x *ExecStreamStart) GetPid() uint32 { + if x != nil { + return x.Pid + } + return 0 +} + +type ExecStreamData struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Output: + // + // *ExecStreamData_Stdout + // *ExecStreamData_Stderr + Output isExecStreamData_Output `protobuf_oneof:"output"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExecStreamData) Reset() { + *x = ExecStreamData{} + mi := &file_hostagent_proto_msgTypes[20] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExecStreamData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecStreamData) ProtoMessage() {} + +func (x *ExecStreamData) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[20] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecStreamData.ProtoReflect.Descriptor instead. +func (*ExecStreamData) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{20} +} + +func (x *ExecStreamData) GetOutput() isExecStreamData_Output { + if x != nil { + return x.Output + } + return nil +} + +func (x *ExecStreamData) GetStdout() []byte { + if x != nil { + if x, ok := x.Output.(*ExecStreamData_Stdout); ok { + return x.Stdout + } + } + return nil +} + +func (x *ExecStreamData) GetStderr() []byte { + if x != nil { + if x, ok := x.Output.(*ExecStreamData_Stderr); ok { + return x.Stderr + } + } + return nil +} + +type isExecStreamData_Output interface { + isExecStreamData_Output() +} + +type ExecStreamData_Stdout struct { + Stdout []byte `protobuf:"bytes,1,opt,name=stdout,proto3,oneof"` +} + +type ExecStreamData_Stderr struct { + Stderr []byte `protobuf:"bytes,2,opt,name=stderr,proto3,oneof"` +} + +func (*ExecStreamData_Stdout) isExecStreamData_Output() {} + +func (*ExecStreamData_Stderr) isExecStreamData_Output() {} + +type ExecStreamEnd struct { + state protoimpl.MessageState `protogen:"open.v1"` + ExitCode int32 `protobuf:"varint,1,opt,name=exit_code,json=exitCode,proto3" json:"exit_code,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExecStreamEnd) Reset() { + *x = ExecStreamEnd{} + mi := &file_hostagent_proto_msgTypes[21] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExecStreamEnd) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecStreamEnd) ProtoMessage() {} + +func (x *ExecStreamEnd) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[21] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecStreamEnd.ProtoReflect.Descriptor instead. +func (*ExecStreamEnd) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{21} +} + +func (x *ExecStreamEnd) GetExitCode() int32 { + if x != nil { + return x.ExitCode + } + return 0 +} + +func (x *ExecStreamEnd) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +type WriteFileStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Content: + // + // *WriteFileStreamRequest_Meta + // *WriteFileStreamRequest_Chunk + Content isWriteFileStreamRequest_Content `protobuf_oneof:"content"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteFileStreamRequest) Reset() { + *x = WriteFileStreamRequest{} + mi := &file_hostagent_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteFileStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteFileStreamRequest) ProtoMessage() {} + +func (x *WriteFileStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[22] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteFileStreamRequest.ProtoReflect.Descriptor instead. +func (*WriteFileStreamRequest) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{22} +} + +func (x *WriteFileStreamRequest) GetContent() isWriteFileStreamRequest_Content { + if x != nil { + return x.Content + } + return nil +} + +func (x *WriteFileStreamRequest) GetMeta() *WriteFileStreamMeta { + if x != nil { + if x, ok := x.Content.(*WriteFileStreamRequest_Meta); ok { + return x.Meta + } + } + return nil +} + +func (x *WriteFileStreamRequest) GetChunk() []byte { + if x != nil { + if x, ok := x.Content.(*WriteFileStreamRequest_Chunk); ok { + return x.Chunk + } + } + return nil +} + +type isWriteFileStreamRequest_Content interface { + isWriteFileStreamRequest_Content() +} + +type WriteFileStreamRequest_Meta struct { + Meta *WriteFileStreamMeta `protobuf:"bytes,1,opt,name=meta,proto3,oneof"` +} + +type WriteFileStreamRequest_Chunk struct { + Chunk []byte `protobuf:"bytes,2,opt,name=chunk,proto3,oneof"` +} + +func (*WriteFileStreamRequest_Meta) isWriteFileStreamRequest_Content() {} + +func (*WriteFileStreamRequest_Chunk) isWriteFileStreamRequest_Content() {} + +type WriteFileStreamMeta struct { + state protoimpl.MessageState `protogen:"open.v1"` + SandboxId string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteFileStreamMeta) Reset() { + *x = WriteFileStreamMeta{} + mi := &file_hostagent_proto_msgTypes[23] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteFileStreamMeta) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteFileStreamMeta) ProtoMessage() {} + +func (x *WriteFileStreamMeta) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[23] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteFileStreamMeta.ProtoReflect.Descriptor instead. +func (*WriteFileStreamMeta) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{23} +} + +func (x *WriteFileStreamMeta) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + +func (x *WriteFileStreamMeta) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type WriteFileStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteFileStreamResponse) Reset() { + *x = WriteFileStreamResponse{} + mi := &file_hostagent_proto_msgTypes[24] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteFileStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteFileStreamResponse) ProtoMessage() {} + +func (x *WriteFileStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[24] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteFileStreamResponse.ProtoReflect.Descriptor instead. +func (*WriteFileStreamResponse) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{24} +} + +type ReadFileStreamRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + SandboxId string `protobuf:"bytes,1,opt,name=sandbox_id,json=sandboxId,proto3" json:"sandbox_id,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadFileStreamRequest) Reset() { + *x = ReadFileStreamRequest{} + mi := &file_hostagent_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadFileStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadFileStreamRequest) ProtoMessage() {} + +func (x *ReadFileStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[25] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadFileStreamRequest.ProtoReflect.Descriptor instead. +func (*ReadFileStreamRequest) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{25} +} + +func (x *ReadFileStreamRequest) GetSandboxId() string { + if x != nil { + return x.SandboxId + } + return "" +} + +func (x *ReadFileStreamRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type ReadFileStreamResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Chunk []byte `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadFileStreamResponse) Reset() { + *x = ReadFileStreamResponse{} + mi := &file_hostagent_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadFileStreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadFileStreamResponse) ProtoMessage() {} + +func (x *ReadFileStreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_hostagent_proto_msgTypes[26] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadFileStreamResponse.ProtoReflect.Descriptor instead. +func (*ReadFileStreamResponse) Descriptor() ([]byte, []int) { + return file_hostagent_proto_rawDescGZIP(), []int{26} +} + +func (x *ReadFileStreamResponse) GetChunk() []byte { + if x != nil { + return x.Chunk + } + return nil +} + var File_hostagent_proto protoreflect.FileDescriptor const file_hostagent_proto_rawDesc = "" + @@ -979,7 +1589,43 @@ const file_hostagent_proto_rawDesc = "" + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x12\n" + "\x04path\x18\x02 \x01(\tR\x04path\",\n" + "\x10ReadFileResponse\x12\x18\n" + - "\acontent\x18\x01 \x01(\fR\acontent2\xac\x05\n" + + "\acontent\x18\x01 \x01(\fR\acontent\"y\n" + + "\x11ExecStreamRequest\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x10\n" + + "\x03cmd\x18\x02 \x01(\tR\x03cmd\x12\x12\n" + + "\x04args\x18\x03 \x03(\tR\x04args\x12\x1f\n" + + "\vtimeout_sec\x18\x04 \x01(\x05R\n" + + "timeoutSec\"\xb9\x01\n" + + "\x12ExecStreamResponse\x125\n" + + "\x05start\x18\x01 \x01(\v2\x1d.hostagent.v1.ExecStreamStartH\x00R\x05start\x122\n" + + "\x04data\x18\x02 \x01(\v2\x1c.hostagent.v1.ExecStreamDataH\x00R\x04data\x12/\n" + + "\x03end\x18\x03 \x01(\v2\x1b.hostagent.v1.ExecStreamEndH\x00R\x03endB\a\n" + + "\x05event\"#\n" + + "\x0fExecStreamStart\x12\x10\n" + + "\x03pid\x18\x01 \x01(\rR\x03pid\"N\n" + + "\x0eExecStreamData\x12\x18\n" + + "\x06stdout\x18\x01 \x01(\fH\x00R\x06stdout\x12\x18\n" + + "\x06stderr\x18\x02 \x01(\fH\x00R\x06stderrB\b\n" + + "\x06output\"B\n" + + "\rExecStreamEnd\x12\x1b\n" + + "\texit_code\x18\x01 \x01(\x05R\bexitCode\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\"t\n" + + "\x16WriteFileStreamRequest\x127\n" + + "\x04meta\x18\x01 \x01(\v2!.hostagent.v1.WriteFileStreamMetaH\x00R\x04meta\x12\x16\n" + + "\x05chunk\x18\x02 \x01(\fH\x00R\x05chunkB\t\n" + + "\acontent\"H\n" + + "\x13WriteFileStreamMeta\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x12\n" + + "\x04path\x18\x02 \x01(\tR\x04path\"\x19\n" + + "\x17WriteFileStreamResponse\"J\n" + + "\x15ReadFileStreamRequest\x12\x1d\n" + + "\n" + + "sandbox_id\x18\x01 \x01(\tR\tsandboxId\x12\x12\n" + + "\x04path\x18\x02 \x01(\tR\x04path\".\n" + + "\x16ReadFileStreamResponse\x12\x14\n" + + "\x05chunk\x18\x01 \x01(\fR\x05chunk2\xc0\a\n" + "\x10HostAgentService\x12X\n" + "\rCreateSandbox\x12\".hostagent.v1.CreateSandboxRequest\x1a#.hostagent.v1.CreateSandboxResponse\x12[\n" + "\x0eDestroySandbox\x12#.hostagent.v1.DestroySandboxRequest\x1a$.hostagent.v1.DestroySandboxResponse\x12U\n" + @@ -988,7 +1634,11 @@ const file_hostagent_proto_rawDesc = "" + "\x04Exec\x12\x19.hostagent.v1.ExecRequest\x1a\x1a.hostagent.v1.ExecResponse\x12X\n" + "\rListSandboxes\x12\".hostagent.v1.ListSandboxesRequest\x1a#.hostagent.v1.ListSandboxesResponse\x12L\n" + "\tWriteFile\x12\x1e.hostagent.v1.WriteFileRequest\x1a\x1f.hostagent.v1.WriteFileResponse\x12I\n" + - "\bReadFile\x12\x1d.hostagent.v1.ReadFileRequest\x1a\x1e.hostagent.v1.ReadFileResponseB\xb0\x01\n" + + "\bReadFile\x12\x1d.hostagent.v1.ReadFileRequest\x1a\x1e.hostagent.v1.ReadFileResponse\x12Q\n" + + "\n" + + "ExecStream\x12\x1f.hostagent.v1.ExecStreamRequest\x1a .hostagent.v1.ExecStreamResponse0\x01\x12`\n" + + "\x0fWriteFileStream\x12$.hostagent.v1.WriteFileStreamRequest\x1a%.hostagent.v1.WriteFileStreamResponse(\x01\x12]\n" + + "\x0eReadFileStream\x12#.hostagent.v1.ReadFileStreamRequest\x1a$.hostagent.v1.ReadFileStreamResponse0\x01B\xb0\x01\n" + "\x10com.hostagent.v1B\x0eHostagentProtoP\x01Z;git.omukk.dev/wrenn/sandbox/proto/hostagent/gen;hostagentv1\xa2\x02\x03HXX\xaa\x02\fHostagent.V1\xca\x02\fHostagent\\V1\xe2\x02\x18Hostagent\\V1\\GPBMetadata\xea\x02\rHostagent::V1b\x06proto3" var ( @@ -1003,49 +1653,69 @@ func file_hostagent_proto_rawDescGZIP() []byte { return file_hostagent_proto_rawDescData } -var file_hostagent_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_hostagent_proto_msgTypes = make([]protoimpl.MessageInfo, 27) var file_hostagent_proto_goTypes = []any{ - (*CreateSandboxRequest)(nil), // 0: hostagent.v1.CreateSandboxRequest - (*CreateSandboxResponse)(nil), // 1: hostagent.v1.CreateSandboxResponse - (*DestroySandboxRequest)(nil), // 2: hostagent.v1.DestroySandboxRequest - (*DestroySandboxResponse)(nil), // 3: hostagent.v1.DestroySandboxResponse - (*PauseSandboxRequest)(nil), // 4: hostagent.v1.PauseSandboxRequest - (*PauseSandboxResponse)(nil), // 5: hostagent.v1.PauseSandboxResponse - (*ResumeSandboxRequest)(nil), // 6: hostagent.v1.ResumeSandboxRequest - (*ResumeSandboxResponse)(nil), // 7: hostagent.v1.ResumeSandboxResponse - (*ExecRequest)(nil), // 8: hostagent.v1.ExecRequest - (*ExecResponse)(nil), // 9: hostagent.v1.ExecResponse - (*ListSandboxesRequest)(nil), // 10: hostagent.v1.ListSandboxesRequest - (*ListSandboxesResponse)(nil), // 11: hostagent.v1.ListSandboxesResponse - (*SandboxInfo)(nil), // 12: hostagent.v1.SandboxInfo - (*WriteFileRequest)(nil), // 13: hostagent.v1.WriteFileRequest - (*WriteFileResponse)(nil), // 14: hostagent.v1.WriteFileResponse - (*ReadFileRequest)(nil), // 15: hostagent.v1.ReadFileRequest - (*ReadFileResponse)(nil), // 16: hostagent.v1.ReadFileResponse + (*CreateSandboxRequest)(nil), // 0: hostagent.v1.CreateSandboxRequest + (*CreateSandboxResponse)(nil), // 1: hostagent.v1.CreateSandboxResponse + (*DestroySandboxRequest)(nil), // 2: hostagent.v1.DestroySandboxRequest + (*DestroySandboxResponse)(nil), // 3: hostagent.v1.DestroySandboxResponse + (*PauseSandboxRequest)(nil), // 4: hostagent.v1.PauseSandboxRequest + (*PauseSandboxResponse)(nil), // 5: hostagent.v1.PauseSandboxResponse + (*ResumeSandboxRequest)(nil), // 6: hostagent.v1.ResumeSandboxRequest + (*ResumeSandboxResponse)(nil), // 7: hostagent.v1.ResumeSandboxResponse + (*ExecRequest)(nil), // 8: hostagent.v1.ExecRequest + (*ExecResponse)(nil), // 9: hostagent.v1.ExecResponse + (*ListSandboxesRequest)(nil), // 10: hostagent.v1.ListSandboxesRequest + (*ListSandboxesResponse)(nil), // 11: hostagent.v1.ListSandboxesResponse + (*SandboxInfo)(nil), // 12: hostagent.v1.SandboxInfo + (*WriteFileRequest)(nil), // 13: hostagent.v1.WriteFileRequest + (*WriteFileResponse)(nil), // 14: hostagent.v1.WriteFileResponse + (*ReadFileRequest)(nil), // 15: hostagent.v1.ReadFileRequest + (*ReadFileResponse)(nil), // 16: hostagent.v1.ReadFileResponse + (*ExecStreamRequest)(nil), // 17: hostagent.v1.ExecStreamRequest + (*ExecStreamResponse)(nil), // 18: hostagent.v1.ExecStreamResponse + (*ExecStreamStart)(nil), // 19: hostagent.v1.ExecStreamStart + (*ExecStreamData)(nil), // 20: hostagent.v1.ExecStreamData + (*ExecStreamEnd)(nil), // 21: hostagent.v1.ExecStreamEnd + (*WriteFileStreamRequest)(nil), // 22: hostagent.v1.WriteFileStreamRequest + (*WriteFileStreamMeta)(nil), // 23: hostagent.v1.WriteFileStreamMeta + (*WriteFileStreamResponse)(nil), // 24: hostagent.v1.WriteFileStreamResponse + (*ReadFileStreamRequest)(nil), // 25: hostagent.v1.ReadFileStreamRequest + (*ReadFileStreamResponse)(nil), // 26: hostagent.v1.ReadFileStreamResponse } var file_hostagent_proto_depIdxs = []int32{ 12, // 0: hostagent.v1.ListSandboxesResponse.sandboxes:type_name -> hostagent.v1.SandboxInfo - 0, // 1: hostagent.v1.HostAgentService.CreateSandbox:input_type -> hostagent.v1.CreateSandboxRequest - 2, // 2: hostagent.v1.HostAgentService.DestroySandbox:input_type -> hostagent.v1.DestroySandboxRequest - 4, // 3: hostagent.v1.HostAgentService.PauseSandbox:input_type -> hostagent.v1.PauseSandboxRequest - 6, // 4: hostagent.v1.HostAgentService.ResumeSandbox:input_type -> hostagent.v1.ResumeSandboxRequest - 8, // 5: hostagent.v1.HostAgentService.Exec:input_type -> hostagent.v1.ExecRequest - 10, // 6: hostagent.v1.HostAgentService.ListSandboxes:input_type -> hostagent.v1.ListSandboxesRequest - 13, // 7: hostagent.v1.HostAgentService.WriteFile:input_type -> hostagent.v1.WriteFileRequest - 15, // 8: hostagent.v1.HostAgentService.ReadFile:input_type -> hostagent.v1.ReadFileRequest - 1, // 9: hostagent.v1.HostAgentService.CreateSandbox:output_type -> hostagent.v1.CreateSandboxResponse - 3, // 10: hostagent.v1.HostAgentService.DestroySandbox:output_type -> hostagent.v1.DestroySandboxResponse - 5, // 11: hostagent.v1.HostAgentService.PauseSandbox:output_type -> hostagent.v1.PauseSandboxResponse - 7, // 12: hostagent.v1.HostAgentService.ResumeSandbox:output_type -> hostagent.v1.ResumeSandboxResponse - 9, // 13: hostagent.v1.HostAgentService.Exec:output_type -> hostagent.v1.ExecResponse - 11, // 14: hostagent.v1.HostAgentService.ListSandboxes:output_type -> hostagent.v1.ListSandboxesResponse - 14, // 15: hostagent.v1.HostAgentService.WriteFile:output_type -> hostagent.v1.WriteFileResponse - 16, // 16: hostagent.v1.HostAgentService.ReadFile:output_type -> hostagent.v1.ReadFileResponse - 9, // [9:17] is the sub-list for method output_type - 1, // [1:9] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 19, // 1: hostagent.v1.ExecStreamResponse.start:type_name -> hostagent.v1.ExecStreamStart + 20, // 2: hostagent.v1.ExecStreamResponse.data:type_name -> hostagent.v1.ExecStreamData + 21, // 3: hostagent.v1.ExecStreamResponse.end:type_name -> hostagent.v1.ExecStreamEnd + 23, // 4: hostagent.v1.WriteFileStreamRequest.meta:type_name -> hostagent.v1.WriteFileStreamMeta + 0, // 5: hostagent.v1.HostAgentService.CreateSandbox:input_type -> hostagent.v1.CreateSandboxRequest + 2, // 6: hostagent.v1.HostAgentService.DestroySandbox:input_type -> hostagent.v1.DestroySandboxRequest + 4, // 7: hostagent.v1.HostAgentService.PauseSandbox:input_type -> hostagent.v1.PauseSandboxRequest + 6, // 8: hostagent.v1.HostAgentService.ResumeSandbox:input_type -> hostagent.v1.ResumeSandboxRequest + 8, // 9: hostagent.v1.HostAgentService.Exec:input_type -> hostagent.v1.ExecRequest + 10, // 10: hostagent.v1.HostAgentService.ListSandboxes:input_type -> hostagent.v1.ListSandboxesRequest + 13, // 11: hostagent.v1.HostAgentService.WriteFile:input_type -> hostagent.v1.WriteFileRequest + 15, // 12: hostagent.v1.HostAgentService.ReadFile:input_type -> hostagent.v1.ReadFileRequest + 17, // 13: hostagent.v1.HostAgentService.ExecStream:input_type -> hostagent.v1.ExecStreamRequest + 22, // 14: hostagent.v1.HostAgentService.WriteFileStream:input_type -> hostagent.v1.WriteFileStreamRequest + 25, // 15: hostagent.v1.HostAgentService.ReadFileStream:input_type -> hostagent.v1.ReadFileStreamRequest + 1, // 16: hostagent.v1.HostAgentService.CreateSandbox:output_type -> hostagent.v1.CreateSandboxResponse + 3, // 17: hostagent.v1.HostAgentService.DestroySandbox:output_type -> hostagent.v1.DestroySandboxResponse + 5, // 18: hostagent.v1.HostAgentService.PauseSandbox:output_type -> hostagent.v1.PauseSandboxResponse + 7, // 19: hostagent.v1.HostAgentService.ResumeSandbox:output_type -> hostagent.v1.ResumeSandboxResponse + 9, // 20: hostagent.v1.HostAgentService.Exec:output_type -> hostagent.v1.ExecResponse + 11, // 21: hostagent.v1.HostAgentService.ListSandboxes:output_type -> hostagent.v1.ListSandboxesResponse + 14, // 22: hostagent.v1.HostAgentService.WriteFile:output_type -> hostagent.v1.WriteFileResponse + 16, // 23: hostagent.v1.HostAgentService.ReadFile:output_type -> hostagent.v1.ReadFileResponse + 18, // 24: hostagent.v1.HostAgentService.ExecStream:output_type -> hostagent.v1.ExecStreamResponse + 24, // 25: hostagent.v1.HostAgentService.WriteFileStream:output_type -> hostagent.v1.WriteFileStreamResponse + 26, // 26: hostagent.v1.HostAgentService.ReadFileStream:output_type -> hostagent.v1.ReadFileStreamResponse + 16, // [16:27] is the sub-list for method output_type + 5, // [5:16] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_hostagent_proto_init() } @@ -1053,13 +1723,26 @@ func file_hostagent_proto_init() { if File_hostagent_proto != nil { return } + file_hostagent_proto_msgTypes[18].OneofWrappers = []any{ + (*ExecStreamResponse_Start)(nil), + (*ExecStreamResponse_Data)(nil), + (*ExecStreamResponse_End)(nil), + } + file_hostagent_proto_msgTypes[20].OneofWrappers = []any{ + (*ExecStreamData_Stdout)(nil), + (*ExecStreamData_Stderr)(nil), + } + file_hostagent_proto_msgTypes[22].OneofWrappers = []any{ + (*WriteFileStreamRequest_Meta)(nil), + (*WriteFileStreamRequest_Chunk)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_hostagent_proto_rawDesc), len(file_hostagent_proto_rawDesc)), NumEnums: 0, - NumMessages: 17, + NumMessages: 27, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go b/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go index 1b182d2..748ef32 100644 --- a/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go +++ b/proto/hostagent/gen/hostagentv1connect/hostagent.connect.go @@ -56,6 +56,15 @@ const ( // HostAgentServiceReadFileProcedure is the fully-qualified name of the HostAgentService's ReadFile // RPC. HostAgentServiceReadFileProcedure = "/hostagent.v1.HostAgentService/ReadFile" + // HostAgentServiceExecStreamProcedure is the fully-qualified name of the HostAgentService's + // ExecStream RPC. + HostAgentServiceExecStreamProcedure = "/hostagent.v1.HostAgentService/ExecStream" + // HostAgentServiceWriteFileStreamProcedure is the fully-qualified name of the HostAgentService's + // WriteFileStream RPC. + HostAgentServiceWriteFileStreamProcedure = "/hostagent.v1.HostAgentService/WriteFileStream" + // HostAgentServiceReadFileStreamProcedure is the fully-qualified name of the HostAgentService's + // ReadFileStream RPC. + HostAgentServiceReadFileStreamProcedure = "/hostagent.v1.HostAgentService/ReadFileStream" ) // HostAgentServiceClient is a client for the hostagent.v1.HostAgentService service. @@ -76,6 +85,13 @@ type HostAgentServiceClient interface { WriteFile(context.Context, *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) // ReadFile reads a file from inside a sandbox. ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) + // ExecStream runs a command inside a sandbox and streams output events as they arrive. + ExecStream(context.Context, *connect.Request[gen.ExecStreamRequest]) (*connect.ServerStreamForClient[gen.ExecStreamResponse], error) + // WriteFileStream writes a file to a sandbox using chunked streaming. + // First message must contain metadata (sandbox_id, path). Subsequent messages contain data chunks. + WriteFileStream(context.Context) *connect.ClientStreamForClient[gen.WriteFileStreamRequest, gen.WriteFileStreamResponse] + // ReadFileStream reads a file from a sandbox and streams it back in chunks. + ReadFileStream(context.Context, *connect.Request[gen.ReadFileStreamRequest]) (*connect.ServerStreamForClient[gen.ReadFileStreamResponse], error) } // NewHostAgentServiceClient constructs a client for the hostagent.v1.HostAgentService service. By @@ -137,19 +153,40 @@ func NewHostAgentServiceClient(httpClient connect.HTTPClient, baseURL string, op connect.WithSchema(hostAgentServiceMethods.ByName("ReadFile")), connect.WithClientOptions(opts...), ), + execStream: connect.NewClient[gen.ExecStreamRequest, gen.ExecStreamResponse]( + httpClient, + baseURL+HostAgentServiceExecStreamProcedure, + connect.WithSchema(hostAgentServiceMethods.ByName("ExecStream")), + connect.WithClientOptions(opts...), + ), + writeFileStream: connect.NewClient[gen.WriteFileStreamRequest, gen.WriteFileStreamResponse]( + httpClient, + baseURL+HostAgentServiceWriteFileStreamProcedure, + connect.WithSchema(hostAgentServiceMethods.ByName("WriteFileStream")), + connect.WithClientOptions(opts...), + ), + readFileStream: connect.NewClient[gen.ReadFileStreamRequest, gen.ReadFileStreamResponse]( + httpClient, + baseURL+HostAgentServiceReadFileStreamProcedure, + connect.WithSchema(hostAgentServiceMethods.ByName("ReadFileStream")), + connect.WithClientOptions(opts...), + ), } } // hostAgentServiceClient implements HostAgentServiceClient. type hostAgentServiceClient struct { - createSandbox *connect.Client[gen.CreateSandboxRequest, gen.CreateSandboxResponse] - destroySandbox *connect.Client[gen.DestroySandboxRequest, gen.DestroySandboxResponse] - pauseSandbox *connect.Client[gen.PauseSandboxRequest, gen.PauseSandboxResponse] - resumeSandbox *connect.Client[gen.ResumeSandboxRequest, gen.ResumeSandboxResponse] - exec *connect.Client[gen.ExecRequest, gen.ExecResponse] - listSandboxes *connect.Client[gen.ListSandboxesRequest, gen.ListSandboxesResponse] - writeFile *connect.Client[gen.WriteFileRequest, gen.WriteFileResponse] - readFile *connect.Client[gen.ReadFileRequest, gen.ReadFileResponse] + createSandbox *connect.Client[gen.CreateSandboxRequest, gen.CreateSandboxResponse] + destroySandbox *connect.Client[gen.DestroySandboxRequest, gen.DestroySandboxResponse] + pauseSandbox *connect.Client[gen.PauseSandboxRequest, gen.PauseSandboxResponse] + resumeSandbox *connect.Client[gen.ResumeSandboxRequest, gen.ResumeSandboxResponse] + exec *connect.Client[gen.ExecRequest, gen.ExecResponse] + listSandboxes *connect.Client[gen.ListSandboxesRequest, gen.ListSandboxesResponse] + writeFile *connect.Client[gen.WriteFileRequest, gen.WriteFileResponse] + readFile *connect.Client[gen.ReadFileRequest, gen.ReadFileResponse] + execStream *connect.Client[gen.ExecStreamRequest, gen.ExecStreamResponse] + writeFileStream *connect.Client[gen.WriteFileStreamRequest, gen.WriteFileStreamResponse] + readFileStream *connect.Client[gen.ReadFileStreamRequest, gen.ReadFileStreamResponse] } // CreateSandbox calls hostagent.v1.HostAgentService.CreateSandbox. @@ -192,6 +229,21 @@ func (c *hostAgentServiceClient) ReadFile(ctx context.Context, req *connect.Requ return c.readFile.CallUnary(ctx, req) } +// ExecStream calls hostagent.v1.HostAgentService.ExecStream. +func (c *hostAgentServiceClient) ExecStream(ctx context.Context, req *connect.Request[gen.ExecStreamRequest]) (*connect.ServerStreamForClient[gen.ExecStreamResponse], error) { + return c.execStream.CallServerStream(ctx, req) +} + +// WriteFileStream calls hostagent.v1.HostAgentService.WriteFileStream. +func (c *hostAgentServiceClient) WriteFileStream(ctx context.Context) *connect.ClientStreamForClient[gen.WriteFileStreamRequest, gen.WriteFileStreamResponse] { + return c.writeFileStream.CallClientStream(ctx) +} + +// ReadFileStream calls hostagent.v1.HostAgentService.ReadFileStream. +func (c *hostAgentServiceClient) ReadFileStream(ctx context.Context, req *connect.Request[gen.ReadFileStreamRequest]) (*connect.ServerStreamForClient[gen.ReadFileStreamResponse], error) { + return c.readFileStream.CallServerStream(ctx, req) +} + // HostAgentServiceHandler is an implementation of the hostagent.v1.HostAgentService service. type HostAgentServiceHandler interface { // CreateSandbox boots a new microVM with the given configuration. @@ -210,6 +262,13 @@ type HostAgentServiceHandler interface { WriteFile(context.Context, *connect.Request[gen.WriteFileRequest]) (*connect.Response[gen.WriteFileResponse], error) // ReadFile reads a file from inside a sandbox. ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) + // ExecStream runs a command inside a sandbox and streams output events as they arrive. + ExecStream(context.Context, *connect.Request[gen.ExecStreamRequest], *connect.ServerStream[gen.ExecStreamResponse]) error + // WriteFileStream writes a file to a sandbox using chunked streaming. + // First message must contain metadata (sandbox_id, path). Subsequent messages contain data chunks. + WriteFileStream(context.Context, *connect.ClientStream[gen.WriteFileStreamRequest]) (*connect.Response[gen.WriteFileStreamResponse], error) + // ReadFileStream reads a file from a sandbox and streams it back in chunks. + ReadFileStream(context.Context, *connect.Request[gen.ReadFileStreamRequest], *connect.ServerStream[gen.ReadFileStreamResponse]) error } // NewHostAgentServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -267,6 +326,24 @@ func NewHostAgentServiceHandler(svc HostAgentServiceHandler, opts ...connect.Han connect.WithSchema(hostAgentServiceMethods.ByName("ReadFile")), connect.WithHandlerOptions(opts...), ) + hostAgentServiceExecStreamHandler := connect.NewServerStreamHandler( + HostAgentServiceExecStreamProcedure, + svc.ExecStream, + connect.WithSchema(hostAgentServiceMethods.ByName("ExecStream")), + connect.WithHandlerOptions(opts...), + ) + hostAgentServiceWriteFileStreamHandler := connect.NewClientStreamHandler( + HostAgentServiceWriteFileStreamProcedure, + svc.WriteFileStream, + connect.WithSchema(hostAgentServiceMethods.ByName("WriteFileStream")), + connect.WithHandlerOptions(opts...), + ) + hostAgentServiceReadFileStreamHandler := connect.NewServerStreamHandler( + HostAgentServiceReadFileStreamProcedure, + svc.ReadFileStream, + connect.WithSchema(hostAgentServiceMethods.ByName("ReadFileStream")), + connect.WithHandlerOptions(opts...), + ) return "/hostagent.v1.HostAgentService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case HostAgentServiceCreateSandboxProcedure: @@ -285,6 +362,12 @@ func NewHostAgentServiceHandler(svc HostAgentServiceHandler, opts ...connect.Han hostAgentServiceWriteFileHandler.ServeHTTP(w, r) case HostAgentServiceReadFileProcedure: hostAgentServiceReadFileHandler.ServeHTTP(w, r) + case HostAgentServiceExecStreamProcedure: + hostAgentServiceExecStreamHandler.ServeHTTP(w, r) + case HostAgentServiceWriteFileStreamProcedure: + hostAgentServiceWriteFileStreamHandler.ServeHTTP(w, r) + case HostAgentServiceReadFileStreamProcedure: + hostAgentServiceReadFileStreamHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -325,3 +408,15 @@ func (UnimplementedHostAgentServiceHandler) WriteFile(context.Context, *connect. func (UnimplementedHostAgentServiceHandler) ReadFile(context.Context, *connect.Request[gen.ReadFileRequest]) (*connect.Response[gen.ReadFileResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.ReadFile is not implemented")) } + +func (UnimplementedHostAgentServiceHandler) ExecStream(context.Context, *connect.Request[gen.ExecStreamRequest], *connect.ServerStream[gen.ExecStreamResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.ExecStream is not implemented")) +} + +func (UnimplementedHostAgentServiceHandler) WriteFileStream(context.Context, *connect.ClientStream[gen.WriteFileStreamRequest]) (*connect.Response[gen.WriteFileStreamResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.WriteFileStream is not implemented")) +} + +func (UnimplementedHostAgentServiceHandler) ReadFileStream(context.Context, *connect.Request[gen.ReadFileStreamRequest], *connect.ServerStream[gen.ReadFileStreamResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("hostagent.v1.HostAgentService.ReadFileStream is not implemented")) +} diff --git a/proto/hostagent/hostagent.proto b/proto/hostagent/hostagent.proto index 3509015..76b2d2f 100644 --- a/proto/hostagent/hostagent.proto +++ b/proto/hostagent/hostagent.proto @@ -28,6 +28,16 @@ service HostAgentService { // ReadFile reads a file from inside a sandbox. rpc ReadFile(ReadFileRequest) returns (ReadFileResponse); + + // ExecStream runs a command inside a sandbox and streams output events as they arrive. + rpc ExecStream(ExecStreamRequest) returns (stream ExecStreamResponse); + + // WriteFileStream writes a file to a sandbox using chunked streaming. + // First message must contain metadata (sandbox_id, path). Subsequent messages contain data chunks. + rpc WriteFileStream(stream WriteFileStreamRequest) returns (WriteFileStreamResponse); + + // ReadFileStream reads a file from a sandbox and streams it back in chunks. + rpc ReadFileStream(ReadFileStreamRequest) returns (stream ReadFileStreamResponse); } message CreateSandboxRequest { @@ -120,3 +130,61 @@ message ReadFileRequest { message ReadFileResponse { bytes content = 1; } + +// ── Streaming Exec ────────────────────────────────────────────────── + +message ExecStreamRequest { + string sandbox_id = 1; + string cmd = 2; + repeated string args = 3; + int32 timeout_sec = 4; +} + +message ExecStreamResponse { + oneof event { + ExecStreamStart start = 1; + ExecStreamData data = 2; + ExecStreamEnd end = 3; + } +} + +message ExecStreamStart { + uint32 pid = 1; +} + +message ExecStreamData { + oneof output { + bytes stdout = 1; + bytes stderr = 2; + } +} + +message ExecStreamEnd { + int32 exit_code = 1; + string error = 2; +} + +// ── Streaming File Transfer ───────────────────────────────────────── + +message WriteFileStreamRequest { + oneof content { + WriteFileStreamMeta meta = 1; + bytes chunk = 2; + } +} + +message WriteFileStreamMeta { + string sandbox_id = 1; + string path = 2; +} + +message WriteFileStreamResponse {} + +message ReadFileStreamRequest { + string sandbox_id = 1; + string path = 2; +} + +message ReadFileStreamResponse { + bytes chunk = 1; +}