Add WebSocket-based streaming exec endpoint and streaming file upload/download endpoints to the control plane API. Includes new host agent RPC methods (ExecStream, StreamWriteFile, StreamReadFile), envd client streaming support, and OpenAPI spec updates.
285 lines
7.1 KiB
Go
285 lines
7.1 KiB
Go
package envdclient
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
|
|
"connectrpc.com/connect"
|
|
|
|
envdpb "git.omukk.dev/wrenn/sandbox/proto/envd/gen"
|
|
"git.omukk.dev/wrenn/sandbox/proto/envd/gen/genconnect"
|
|
)
|
|
|
|
// Client wraps the Connect RPC client for envd's Process and Filesystem services.
|
|
type Client struct {
|
|
hostIP string
|
|
base string
|
|
healthURL string
|
|
httpClient *http.Client
|
|
|
|
process genconnect.ProcessClient
|
|
filesystem genconnect.FilesystemClient
|
|
}
|
|
|
|
// New creates a new envd client that connects to the given host IP.
|
|
func New(hostIP string) *Client {
|
|
base := baseURL(hostIP)
|
|
httpClient := newHTTPClient()
|
|
|
|
return &Client{
|
|
hostIP: hostIP,
|
|
base: base,
|
|
healthURL: base + "/health",
|
|
httpClient: httpClient,
|
|
process: genconnect.NewProcessClient(httpClient, base),
|
|
filesystem: genconnect.NewFilesystemClient(httpClient, base),
|
|
}
|
|
}
|
|
|
|
// BaseURL returns the HTTP base URL for reaching envd.
|
|
func (c *Client) BaseURL() string {
|
|
return c.base
|
|
}
|
|
|
|
// ExecResult holds the output of a command execution.
|
|
type ExecResult struct {
|
|
Stdout []byte
|
|
Stderr []byte
|
|
ExitCode int32
|
|
}
|
|
|
|
// Exec runs a command inside the sandbox and collects all stdout/stderr output.
|
|
// It blocks until the command completes.
|
|
func (c *Client) Exec(ctx context.Context, cmd string, args ...string) (*ExecResult, error) {
|
|
stdin := false
|
|
req := connect.NewRequest(&envdpb.StartRequest{
|
|
Process: &envdpb.ProcessConfig{
|
|
Cmd: cmd,
|
|
Args: args,
|
|
},
|
|
Stdin: &stdin,
|
|
})
|
|
|
|
stream, err := c.process.Start(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("start process: %w", err)
|
|
}
|
|
defer stream.Close()
|
|
|
|
result := &ExecResult{}
|
|
|
|
for stream.Receive() {
|
|
msg := stream.Msg()
|
|
if msg.Event == nil {
|
|
continue
|
|
}
|
|
|
|
event := msg.Event.GetEvent()
|
|
switch e := event.(type) {
|
|
case *envdpb.ProcessEvent_Start:
|
|
slog.Debug("process started", "pid", e.Start.GetPid())
|
|
|
|
case *envdpb.ProcessEvent_Data:
|
|
output := e.Data.GetOutput()
|
|
switch o := output.(type) {
|
|
case *envdpb.ProcessEvent_DataEvent_Stdout:
|
|
result.Stdout = append(result.Stdout, o.Stdout...)
|
|
case *envdpb.ProcessEvent_DataEvent_Stderr:
|
|
result.Stderr = append(result.Stderr, o.Stderr...)
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_End:
|
|
result.ExitCode = e.End.GetExitCode()
|
|
if e.End.Error != nil {
|
|
slog.Debug("process ended with error",
|
|
"exit_code", e.End.GetExitCode(),
|
|
"error", e.End.GetError(),
|
|
)
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_Keepalive:
|
|
// Ignore keepalives.
|
|
}
|
|
}
|
|
|
|
if err := stream.Err(); err != nil && err != io.EOF {
|
|
return result, fmt.Errorf("stream error: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ExecStreamEvent represents a single event from a streaming exec.
|
|
type ExecStreamEvent struct {
|
|
Type string // "start", "stdout", "stderr", "end"
|
|
PID uint32
|
|
Data []byte
|
|
ExitCode int32
|
|
Error string
|
|
}
|
|
|
|
// ExecStream runs a command inside the sandbox and returns a channel of output events.
|
|
// The channel is closed when the process ends or the context is cancelled.
|
|
func (c *Client) ExecStream(ctx context.Context, cmd string, args ...string) (<-chan ExecStreamEvent, error) {
|
|
stdin := false
|
|
req := connect.NewRequest(&envdpb.StartRequest{
|
|
Process: &envdpb.ProcessConfig{
|
|
Cmd: cmd,
|
|
Args: args,
|
|
},
|
|
Stdin: &stdin,
|
|
})
|
|
|
|
stream, err := c.process.Start(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("start process: %w", err)
|
|
}
|
|
|
|
ch := make(chan ExecStreamEvent, 16)
|
|
go func() {
|
|
defer close(ch)
|
|
defer stream.Close()
|
|
|
|
for stream.Receive() {
|
|
msg := stream.Msg()
|
|
if msg.Event == nil {
|
|
continue
|
|
}
|
|
|
|
var ev ExecStreamEvent
|
|
event := msg.Event.GetEvent()
|
|
switch e := event.(type) {
|
|
case *envdpb.ProcessEvent_Start:
|
|
ev = ExecStreamEvent{Type: "start", PID: e.Start.GetPid()}
|
|
|
|
case *envdpb.ProcessEvent_Data:
|
|
output := e.Data.GetOutput()
|
|
switch o := output.(type) {
|
|
case *envdpb.ProcessEvent_DataEvent_Stdout:
|
|
ev = ExecStreamEvent{Type: "stdout", Data: o.Stdout}
|
|
case *envdpb.ProcessEvent_DataEvent_Stderr:
|
|
ev = ExecStreamEvent{Type: "stderr", Data: o.Stderr}
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_End:
|
|
ev = ExecStreamEvent{Type: "end", ExitCode: e.End.GetExitCode()}
|
|
if e.End.Error != nil {
|
|
ev.Error = e.End.GetError()
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_Keepalive:
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case ch <- ev:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := stream.Err(); err != nil && err != io.EOF {
|
|
slog.Debug("exec stream error", "error", err)
|
|
}
|
|
}()
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// WriteFile writes content to a file inside the sandbox via envd's REST endpoint.
|
|
// envd expects POST /files?path=...&username=root with multipart/form-data (field name "file").
|
|
func (c *Client) WriteFile(ctx context.Context, path string, content []byte) error {
|
|
var body bytes.Buffer
|
|
writer := multipart.NewWriter(&body)
|
|
|
|
part, err := writer.CreateFormFile("file", "upload")
|
|
if err != nil {
|
|
return fmt.Errorf("create multipart: %w", err)
|
|
}
|
|
if _, err := part.Write(content); err != nil {
|
|
return fmt.Errorf("write multipart: %w", err)
|
|
}
|
|
writer.Close()
|
|
|
|
u := fmt.Sprintf("%s/files?%s", c.base, url.Values{
|
|
"path": {path},
|
|
"username": {"root"},
|
|
}.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, &body)
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", writer.FormDataContentType())
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("write file: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
|
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
|
return fmt.Errorf("write file %s: status %d: %s", path, resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
slog.Debug("envd write file", "path", path, "status", resp.StatusCode, "response", string(respBody))
|
|
return nil
|
|
}
|
|
|
|
// ReadFile reads a file from inside the sandbox via envd's REST endpoint.
|
|
// envd expects GET /files?path=...&username=root.
|
|
func (c *Client) ReadFile(ctx context.Context, path string) ([]byte, error) {
|
|
u := fmt.Sprintf("%s/files?%s", c.base, url.Values{
|
|
"path": {path},
|
|
"username": {"root"},
|
|
}.Encode())
|
|
|
|
slog.Debug("envd read file", "url", u, "path", path)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read file: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("read file %s: status %d: %s", path, resp.StatusCode, string(respBody))
|
|
}
|
|
|
|
data, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read file body: %w", err)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// ListDir lists directory contents inside the sandbox.
|
|
func (c *Client) ListDir(ctx context.Context, path string, depth uint32) (*envdpb.ListDirResponse, error) {
|
|
req := connect.NewRequest(&envdpb.ListDirRequest{
|
|
Path: path,
|
|
Depth: depth,
|
|
})
|
|
|
|
resp, err := c.filesystem.ListDir(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list dir: %w", err)
|
|
}
|
|
|
|
return resp.Msg, nil
|
|
}
|