forked from wrenn/wrenn
Add background process execution API
Start long-running processes (web servers, daemons) without blocking the
HTTP request. Leverages envd's existing background process support
(context.Background(), List, Connect, SendSignal RPCs) and wires it
through the host agent and control plane layers.
New API surface:
- POST /v1/capsules/{id}/exec with background:true → 202 {pid, tag}
- GET /v1/capsules/{id}/processes → list running processes
- DELETE /v1/capsules/{id}/processes/{selector} → kill by PID or tag
- WS /v1/capsules/{id}/processes/{selector}/stream → reconnect to output
The {selector} param auto-detects: numeric = PID, string = tag.
Tags are auto-generated as "proc-" + 8 hex chars if not provided.
This commit is contained in:
@ -29,9 +29,13 @@ func newExecHandler(db *db.Queries, pool *lifecycle.HostClientPool) *execHandler
|
||||
}
|
||||
|
||||
type execRequest struct {
|
||||
Cmd string `json:"cmd"`
|
||||
Args []string `json:"args"`
|
||||
TimeoutSec int32 `json:"timeout_sec"`
|
||||
Cmd string `json:"cmd"`
|
||||
Args []string `json:"args"`
|
||||
TimeoutSec int32 `json:"timeout_sec"`
|
||||
Background bool `json:"background"`
|
||||
Tag string `json:"tag"`
|
||||
Envs map[string]string `json:"envs"`
|
||||
Cwd string `json:"cwd"`
|
||||
}
|
||||
|
||||
type execResponse struct {
|
||||
@ -45,6 +49,13 @@ type execResponse struct {
|
||||
Encoding string `json:"encoding"`
|
||||
}
|
||||
|
||||
type backgroundExecResponse struct {
|
||||
SandboxID string `json:"sandbox_id"`
|
||||
Cmd string `json:"cmd"`
|
||||
PID uint32 `json:"pid"`
|
||||
Tag string `json:"tag"`
|
||||
}
|
||||
|
||||
// Exec handles POST /v1/capsules/{id}/exec.
|
||||
func (h *execHandler) Exec(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxIDStr := chi.URLParam(r, "id")
|
||||
@ -78,14 +89,54 @@ func (h *execHandler) Exec(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
agent, err := agentForHost(ctx, h.db, h.pool, sb.HostID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "host_unavailable", "sandbox host is not reachable")
|
||||
return
|
||||
}
|
||||
|
||||
// Background mode: start process and return immediately.
|
||||
if req.Background {
|
||||
tag := req.Tag
|
||||
if tag == "" {
|
||||
tag = "proc-" + id.NewPtyTag()
|
||||
}
|
||||
|
||||
bgResp, err := agent.StartBackground(ctx, connect.NewRequest(&pb.StartBackgroundRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Tag: tag,
|
||||
Cmd: req.Cmd,
|
||||
Args: req.Args,
|
||||
Envs: req.Envs,
|
||||
Cwd: req.Cwd,
|
||||
}))
|
||||
if err != nil {
|
||||
status, code, msg := agentErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.db.UpdateLastActive(ctx, db.UpdateLastActiveParams{
|
||||
ID: sandboxID,
|
||||
LastActiveAt: pgtype.Timestamptz{
|
||||
Time: time.Now(),
|
||||
Valid: true,
|
||||
},
|
||||
}); err != nil {
|
||||
slog.Warn("failed to update last_active_at", "id", sandboxIDStr, "error", err)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusAccepted, backgroundExecResponse{
|
||||
SandboxID: sandboxIDStr,
|
||||
Cmd: req.Cmd,
|
||||
PID: bgResp.Msg.Pid,
|
||||
Tag: bgResp.Msg.Tag,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
resp, err := agent.Exec(ctx, connect.NewRequest(&pb.ExecRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Cmd: req.Cmd,
|
||||
|
||||
266
internal/api/handlers_process.go
Normal file
266
internal/api/handlers_process.go
Normal file
@ -0,0 +1,266 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/internal/auth"
|
||||
"git.omukk.dev/wrenn/wrenn/internal/db"
|
||||
"git.omukk.dev/wrenn/wrenn/internal/id"
|
||||
"git.omukk.dev/wrenn/wrenn/internal/lifecycle"
|
||||
pb "git.omukk.dev/wrenn/wrenn/proto/hostagent/gen"
|
||||
)
|
||||
|
||||
type processHandler struct {
|
||||
db *db.Queries
|
||||
pool *lifecycle.HostClientPool
|
||||
}
|
||||
|
||||
func newProcessHandler(db *db.Queries, pool *lifecycle.HostClientPool) *processHandler {
|
||||
return &processHandler{db: db, pool: pool}
|
||||
}
|
||||
|
||||
// processResponse is a single entry in the process list.
|
||||
type processResponse struct {
|
||||
PID uint32 `json:"pid"`
|
||||
Tag string `json:"tag,omitempty"`
|
||||
Cmd string `json:"cmd"`
|
||||
Args []string `json:"args,omitempty"`
|
||||
}
|
||||
|
||||
// processListResponse wraps the list of processes.
|
||||
type processListResponse struct {
|
||||
Processes []processResponse `json:"processes"`
|
||||
}
|
||||
|
||||
// ListProcesses handles GET /v1/capsules/{id}/processes.
|
||||
func (h *processHandler) ListProcesses(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxIDStr := chi.URLParam(r, "id")
|
||||
ctx := r.Context()
|
||||
ac := auth.MustFromContext(ctx)
|
||||
|
||||
sandboxID, err := id.ParseSandboxID(sandboxIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid sandbox ID")
|
||||
return
|
||||
}
|
||||
|
||||
sb, err := h.db.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: ac.TeamID})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
|
||||
return
|
||||
}
|
||||
if sb.Status != "running" {
|
||||
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")")
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := agentForHost(ctx, h.db, h.pool, sb.HostID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "host_unavailable", "sandbox host is not reachable")
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := agent.ListProcesses(ctx, connect.NewRequest(&pb.ListProcessesRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
}))
|
||||
if err != nil {
|
||||
status, code, msg := agentErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
procs := make([]processResponse, 0, len(resp.Msg.Processes))
|
||||
for _, p := range resp.Msg.Processes {
|
||||
procs = append(procs, processResponse{
|
||||
PID: p.Pid,
|
||||
Tag: p.Tag,
|
||||
Cmd: p.Cmd,
|
||||
Args: p.Args,
|
||||
})
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, processListResponse{Processes: procs})
|
||||
}
|
||||
|
||||
// KillProcess handles DELETE /v1/capsules/{id}/processes/{selector}.
|
||||
// The selector can be a numeric PID or a string tag.
|
||||
func (h *processHandler) KillProcess(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxIDStr := chi.URLParam(r, "id")
|
||||
selectorStr := chi.URLParam(r, "selector")
|
||||
ctx := r.Context()
|
||||
ac := auth.MustFromContext(ctx)
|
||||
|
||||
sandboxID, err := id.ParseSandboxID(sandboxIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid sandbox ID")
|
||||
return
|
||||
}
|
||||
|
||||
sb, err := h.db.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: ac.TeamID})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
|
||||
return
|
||||
}
|
||||
if sb.Status != "running" {
|
||||
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")")
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := agentForHost(ctx, h.db, h.pool, sb.HostID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "host_unavailable", "sandbox host is not reachable")
|
||||
return
|
||||
}
|
||||
|
||||
// Build the kill request with PID or tag selector.
|
||||
killReq := &pb.KillProcessRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
Signal: "SIGKILL",
|
||||
}
|
||||
if sig := r.URL.Query().Get("signal"); sig == "SIGTERM" {
|
||||
killReq.Signal = "SIGTERM"
|
||||
}
|
||||
|
||||
if pid, err := strconv.ParseUint(selectorStr, 10, 32); err == nil {
|
||||
killReq.Selector = &pb.KillProcessRequest_Pid{Pid: uint32(pid)}
|
||||
} else {
|
||||
killReq.Selector = &pb.KillProcessRequest_Tag{Tag: selectorStr}
|
||||
}
|
||||
|
||||
if _, err := agent.KillProcess(ctx, connect.NewRequest(killReq)); err != nil {
|
||||
status, code, msg := agentErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// wsProcessOut is the JSON message sent to the WebSocket client.
|
||||
type wsProcessOut struct {
|
||||
Type string `json:"type"` // "start", "stdout", "stderr", "exit", "error"
|
||||
PID uint32 `json:"pid,omitempty"` // only for "start"
|
||||
Data string `json:"data,omitempty"` // only for "stdout", "stderr", "error"
|
||||
ExitCode *int32 `json:"exit_code,omitempty"` // only for "exit"
|
||||
}
|
||||
|
||||
// ConnectProcess handles WS /v1/capsules/{id}/processes/{selector}/stream.
|
||||
func (h *processHandler) ConnectProcess(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxIDStr := chi.URLParam(r, "id")
|
||||
selectorStr := chi.URLParam(r, "selector")
|
||||
ctx := r.Context()
|
||||
ac := auth.MustFromContext(ctx)
|
||||
|
||||
sandboxID, err := id.ParseSandboxID(sandboxIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid sandbox ID")
|
||||
return
|
||||
}
|
||||
|
||||
sb, err := h.db.GetSandboxByTeam(ctx, db.GetSandboxByTeamParams{ID: sandboxID, TeamID: ac.TeamID})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "not_found", "sandbox not found")
|
||||
return
|
||||
}
|
||||
if sb.Status != "running" {
|
||||
writeError(w, http.StatusConflict, "invalid_state", "sandbox is not running (status: "+sb.Status+")")
|
||||
return
|
||||
}
|
||||
|
||||
agent, err := agentForHost(ctx, h.db, h.pool, sb.HostID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "host_unavailable", "sandbox host is not reachable")
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
slog.Error("process stream websocket upgrade failed", "error", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Build the connect request with PID or tag selector.
|
||||
connectReq := &pb.ConnectProcessRequest{
|
||||
SandboxId: sandboxIDStr,
|
||||
}
|
||||
if pid, err := strconv.ParseUint(selectorStr, 10, 32); err == nil {
|
||||
connectReq.Selector = &pb.ConnectProcessRequest_Pid{Pid: uint32(pid)}
|
||||
} else {
|
||||
connectReq.Selector = &pb.ConnectProcessRequest_Tag{Tag: selectorStr}
|
||||
}
|
||||
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
stream, err := agent.ConnectProcess(streamCtx, connect.NewRequest(connectReq))
|
||||
if err != nil {
|
||||
sendProcessWSError(conn, "failed to connect to process: "+err.Error())
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
// Listen for client disconnect in a goroutine.
|
||||
go func() {
|
||||
for {
|
||||
_, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Forward stream events to WebSocket.
|
||||
for stream.Receive() {
|
||||
resp := stream.Msg()
|
||||
switch ev := resp.Event.(type) {
|
||||
case *pb.ConnectProcessResponse_Start:
|
||||
writeWSJSON(conn, wsProcessOut{Type: "start", PID: ev.Start.Pid})
|
||||
|
||||
case *pb.ConnectProcessResponse_Data:
|
||||
switch o := ev.Data.Output.(type) {
|
||||
case *pb.ExecStreamData_Stdout:
|
||||
writeWSJSON(conn, wsProcessOut{Type: "stdout", Data: string(o.Stdout)})
|
||||
case *pb.ExecStreamData_Stderr:
|
||||
writeWSJSON(conn, wsProcessOut{Type: "stderr", Data: string(o.Stderr)})
|
||||
}
|
||||
|
||||
case *pb.ConnectProcessResponse_End:
|
||||
exitCode := ev.End.ExitCode
|
||||
writeWSJSON(conn, wsProcessOut{Type: "exit", ExitCode: &exitCode})
|
||||
}
|
||||
}
|
||||
|
||||
if err := stream.Err(); err != nil {
|
||||
if streamCtx.Err() == nil {
|
||||
sendProcessWSError(conn, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Update last active using a fresh context.
|
||||
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer updateCancel()
|
||||
if err := h.db.UpdateLastActive(updateCtx, db.UpdateLastActiveParams{
|
||||
ID: sandboxID,
|
||||
LastActiveAt: pgtype.Timestamptz{
|
||||
Time: time.Now(),
|
||||
Valid: true,
|
||||
},
|
||||
}); err != nil {
|
||||
slog.Warn("failed to update last active after process stream", "sandbox_id", sandboxIDStr, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func sendProcessWSError(conn *websocket.Conn, msg string) {
|
||||
writeWSJSON(conn, wsProcessOut{Type: "error", Data: msg})
|
||||
}
|
||||
@ -699,11 +699,17 @@ paths:
|
||||
$ref: "#/components/schemas/ExecRequest"
|
||||
responses:
|
||||
"200":
|
||||
description: Command output
|
||||
description: Command output (foreground exec)
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ExecResponse"
|
||||
"202":
|
||||
description: Background process started
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/BackgroundExecResponse"
|
||||
"404":
|
||||
description: Capsule not found
|
||||
content:
|
||||
@ -717,6 +723,122 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/capsules/{id}/processes:
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
|
||||
get:
|
||||
summary: List running processes
|
||||
operationId: listProcesses
|
||||
tags: [capsules]
|
||||
security:
|
||||
- apiKeyAuth: []
|
||||
description: |
|
||||
Returns all running processes inside the capsule, including background
|
||||
processes and any processes started by templates or init scripts.
|
||||
responses:
|
||||
"200":
|
||||
description: Process list
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ProcessListResponse"
|
||||
"404":
|
||||
description: Capsule not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"409":
|
||||
description: Capsule not running
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/capsules/{id}/processes/{selector}:
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: selector
|
||||
in: path
|
||||
required: true
|
||||
description: Process PID (numeric) or tag (string)
|
||||
schema:
|
||||
type: string
|
||||
|
||||
delete:
|
||||
summary: Kill a process
|
||||
operationId: killProcess
|
||||
tags: [capsules]
|
||||
security:
|
||||
- apiKeyAuth: []
|
||||
parameters:
|
||||
- name: signal
|
||||
in: query
|
||||
required: false
|
||||
description: Signal to send (SIGKILL or SIGTERM, default SIGKILL)
|
||||
schema:
|
||||
type: string
|
||||
enum: [SIGKILL, SIGTERM]
|
||||
default: SIGKILL
|
||||
responses:
|
||||
"204":
|
||||
description: Process killed
|
||||
"404":
|
||||
description: Capsule or process not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"409":
|
||||
description: Capsule not running
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/capsules/{id}/processes/{selector}/stream:
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: selector
|
||||
in: path
|
||||
required: true
|
||||
description: Process PID (numeric) or tag (string)
|
||||
schema:
|
||||
type: string
|
||||
|
||||
get:
|
||||
summary: Stream process output via WebSocket
|
||||
operationId: connectProcess
|
||||
tags: [capsules]
|
||||
security:
|
||||
- apiKeyAuth: []
|
||||
description: |
|
||||
Opens a WebSocket connection to stream stdout/stderr from a running
|
||||
background process. The selector can be a numeric PID or a string tag.
|
||||
|
||||
Server sends JSON messages:
|
||||
- `{"type": "start", "pid": 42}` — connected to process
|
||||
- `{"type": "stdout", "data": "..."}` — stdout output
|
||||
- `{"type": "stderr", "data": "..."}` — stderr output
|
||||
- `{"type": "exit", "exit_code": 0}` — process exited
|
||||
- `{"type": "error", "data": "..."}` — error message
|
||||
responses:
|
||||
"101":
|
||||
description: WebSocket upgrade
|
||||
|
||||
/v1/capsules/{id}/ping:
|
||||
parameters:
|
||||
- name: id
|
||||
@ -2153,6 +2275,56 @@ components:
|
||||
timeout_sec:
|
||||
type: integer
|
||||
default: 30
|
||||
description: Timeout in seconds (foreground exec only, default 30)
|
||||
background:
|
||||
type: boolean
|
||||
default: false
|
||||
description: If true, starts the process in the background and returns immediately with a PID and tag (HTTP 202)
|
||||
tag:
|
||||
type: string
|
||||
description: Optional user-chosen tag for the background process. Auto-generated if omitted. Only used when background is true.
|
||||
envs:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: Environment variables for the process (background exec only)
|
||||
cwd:
|
||||
type: string
|
||||
description: Working directory for the process (background exec only)
|
||||
|
||||
BackgroundExecResponse:
|
||||
type: object
|
||||
properties:
|
||||
sandbox_id:
|
||||
type: string
|
||||
cmd:
|
||||
type: string
|
||||
pid:
|
||||
type: integer
|
||||
tag:
|
||||
type: string
|
||||
|
||||
ProcessEntry:
|
||||
type: object
|
||||
properties:
|
||||
pid:
|
||||
type: integer
|
||||
tag:
|
||||
type: string
|
||||
cmd:
|
||||
type: string
|
||||
args:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
|
||||
ProcessListResponse:
|
||||
type: object
|
||||
properties:
|
||||
processes:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/ProcessEntry"
|
||||
|
||||
ExecResponse:
|
||||
type: object
|
||||
|
||||
@ -74,6 +74,7 @@ func New(
|
||||
buildH := newBuildHandler(buildSvc, queries, pool)
|
||||
channelH := newChannelHandler(channelSvc, al)
|
||||
ptyH := newPtyHandler(queries, pool)
|
||||
processH := newProcessHandler(queries, pool)
|
||||
adminCapsules := newAdminCapsuleHandler(sandboxSvc, queries, pool, al)
|
||||
|
||||
// OpenAPI spec and docs.
|
||||
@ -141,6 +142,9 @@ func New(
|
||||
r.Post("/files/remove", fsH.Remove)
|
||||
r.Get("/metrics", metricsH.GetMetrics)
|
||||
r.Get("/pty", ptyH.PtySession)
|
||||
r.Get("/processes", processH.ListProcesses)
|
||||
r.Delete("/processes/{selector}", processH.KillProcess)
|
||||
r.Get("/processes/{selector}/stream", processH.ConnectProcess)
|
||||
})
|
||||
})
|
||||
|
||||
@ -224,6 +228,9 @@ func New(
|
||||
r.Post("/files/remove", fsH.Remove)
|
||||
r.Get("/metrics", metricsH.GetMetrics)
|
||||
r.Get("/pty", ptyH.PtySession)
|
||||
r.Get("/processes", processH.ListProcesses)
|
||||
r.Delete("/processes/{selector}", processH.KillProcess)
|
||||
r.Get("/processes/{selector}/stream", processH.ConnectProcess)
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user