forked from wrenn/wrenn
221 lines
5.7 KiB
Go
221 lines
5.7 KiB
Go
package envdclient
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
|
|
"connectrpc.com/connect"
|
|
|
|
envdpb "git.omukk.dev/wrenn/wrenn/proto/envd/gen"
|
|
)
|
|
|
|
// PtyEvent represents a single event from a PTY output stream.
|
|
type PtyEvent struct {
|
|
Type string // "started", "output", "end"
|
|
PID uint32
|
|
Data []byte
|
|
ExitCode int32
|
|
Error string
|
|
}
|
|
|
|
// PtyStart starts a new PTY process in the guest and returns a channel of events.
|
|
// The tag is the stable identifier used to reconnect via PtyConnect.
|
|
// The channel is closed when the process ends or ctx is cancelled.
|
|
// NOTE: The user parameter from PtyAttachRequest is not yet supported by envd's
|
|
// ProcessConfig proto. When envd adds user support, thread it through here.
|
|
func (c *Client) PtyStart(ctx context.Context, tag, cmd string, args []string, cols, rows uint32, envs map[string]string, cwd string) (<-chan PtyEvent, error) {
|
|
stdin := true
|
|
cfg := &envdpb.ProcessConfig{
|
|
Cmd: cmd,
|
|
Args: args,
|
|
Envs: envs,
|
|
}
|
|
if cwd != "" {
|
|
cfg.Cwd = &cwd
|
|
}
|
|
|
|
req := connect.NewRequest(&envdpb.StartRequest{
|
|
Process: cfg,
|
|
Pty: &envdpb.PTY{
|
|
Size: &envdpb.PTY_Size{
|
|
Cols: cols,
|
|
Rows: rows,
|
|
},
|
|
},
|
|
Tag: &tag,
|
|
Stdin: &stdin,
|
|
})
|
|
|
|
stream, err := c.process.Start(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pty start: %w", err)
|
|
}
|
|
|
|
return drainPtyStream(ctx, &startStream{s: stream}, true), nil
|
|
}
|
|
|
|
// PtyConnect re-attaches to an existing PTY process by tag.
|
|
// Returns a channel of output events starting from the current point.
|
|
func (c *Client) PtyConnect(ctx context.Context, tag string) (<-chan PtyEvent, error) {
|
|
req := connect.NewRequest(&envdpb.ConnectRequest{
|
|
Process: &envdpb.ProcessSelector{
|
|
Selector: &envdpb.ProcessSelector_Tag{Tag: tag},
|
|
},
|
|
})
|
|
|
|
stream, err := c.process.Connect(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pty connect: %w", err)
|
|
}
|
|
|
|
return drainPtyStream(ctx, &connectStream{s: stream}, false), nil
|
|
}
|
|
|
|
// PtySendInput sends raw bytes to the PTY process identified by tag.
|
|
func (c *Client) PtySendInput(ctx context.Context, tag string, data []byte) error {
|
|
req := connect.NewRequest(&envdpb.SendInputRequest{
|
|
Process: &envdpb.ProcessSelector{
|
|
Selector: &envdpb.ProcessSelector_Tag{Tag: tag},
|
|
},
|
|
Input: &envdpb.ProcessInput{
|
|
Input: &envdpb.ProcessInput_Pty{Pty: data},
|
|
},
|
|
})
|
|
|
|
if _, err := c.process.SendInput(ctx, req); err != nil {
|
|
return fmt.Errorf("pty send input: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PtyResize updates the terminal dimensions for the PTY process identified by tag.
|
|
func (c *Client) PtyResize(ctx context.Context, tag string, cols, rows uint32) error {
|
|
req := connect.NewRequest(&envdpb.UpdateRequest{
|
|
Process: &envdpb.ProcessSelector{
|
|
Selector: &envdpb.ProcessSelector_Tag{Tag: tag},
|
|
},
|
|
Pty: &envdpb.PTY{
|
|
Size: &envdpb.PTY_Size{
|
|
Cols: cols,
|
|
Rows: rows,
|
|
},
|
|
},
|
|
})
|
|
|
|
if _, err := c.process.Update(ctx, req); err != nil {
|
|
return fmt.Errorf("pty resize: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PtyKill sends SIGKILL to the PTY process identified by tag.
|
|
func (c *Client) PtyKill(ctx context.Context, tag string) error {
|
|
req := connect.NewRequest(&envdpb.SendSignalRequest{
|
|
Process: &envdpb.ProcessSelector{
|
|
Selector: &envdpb.ProcessSelector_Tag{Tag: tag},
|
|
},
|
|
Signal: envdpb.Signal_SIGNAL_SIGKILL,
|
|
})
|
|
|
|
if _, err := c.process.SendSignal(ctx, req); err != nil {
|
|
return fmt.Errorf("pty kill: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// eventStream is an interface covering both StartResponse and ConnectResponse streams.
|
|
type eventStream interface {
|
|
Receive() bool
|
|
Err() error
|
|
Close() error
|
|
}
|
|
|
|
type startStream struct {
|
|
s *connect.ServerStreamForClient[envdpb.StartResponse]
|
|
}
|
|
|
|
func (s *startStream) Receive() bool { return s.s.Receive() }
|
|
func (s *startStream) Err() error { return s.s.Err() }
|
|
func (s *startStream) Close() error { return s.s.Close() }
|
|
func (s *startStream) Event() *envdpb.ProcessEvent {
|
|
return s.s.Msg().GetEvent()
|
|
}
|
|
|
|
type connectStream struct {
|
|
s *connect.ServerStreamForClient[envdpb.ConnectResponse]
|
|
}
|
|
|
|
func (s *connectStream) Receive() bool { return s.s.Receive() }
|
|
func (s *connectStream) Err() error { return s.s.Err() }
|
|
func (s *connectStream) Close() error { return s.s.Close() }
|
|
func (s *connectStream) Event() *envdpb.ProcessEvent {
|
|
return s.s.Msg().GetEvent()
|
|
}
|
|
|
|
type eventProvider interface {
|
|
eventStream
|
|
Event() *envdpb.ProcessEvent
|
|
}
|
|
|
|
// drainPtyStream reads events from either a Start or Connect stream and maps
|
|
// them into PtyEvent values on a channel.
|
|
func drainPtyStream(ctx context.Context, stream eventProvider, expectStart bool) <-chan PtyEvent {
|
|
ch := make(chan PtyEvent, 16)
|
|
go func() {
|
|
defer close(ch)
|
|
defer stream.Close()
|
|
|
|
for stream.Receive() {
|
|
event := stream.Event()
|
|
if event == nil {
|
|
continue
|
|
}
|
|
|
|
var ev PtyEvent
|
|
switch e := event.GetEvent().(type) {
|
|
case *envdpb.ProcessEvent_Start:
|
|
if expectStart {
|
|
ev = PtyEvent{Type: "started", PID: e.Start.GetPid()}
|
|
} else {
|
|
continue
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_Data:
|
|
switch o := e.Data.GetOutput().(type) {
|
|
case *envdpb.ProcessEvent_DataEvent_Pty:
|
|
ev = PtyEvent{Type: "output", Data: o.Pty}
|
|
case *envdpb.ProcessEvent_DataEvent_Stdout:
|
|
ev = PtyEvent{Type: "output", Data: o.Stdout}
|
|
case *envdpb.ProcessEvent_DataEvent_Stderr:
|
|
ev = PtyEvent{Type: "output", Data: o.Stderr}
|
|
default:
|
|
continue
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_End:
|
|
ev = PtyEvent{Type: "end", ExitCode: e.End.GetExitCode()}
|
|
if e.End.Error != nil {
|
|
ev.Error = e.End.GetError()
|
|
}
|
|
|
|
case *envdpb.ProcessEvent_Keepalive:
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case ch <- ev:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := stream.Err(); err != nil && err != io.EOF {
|
|
slog.Debug("pty stream error", "error", err)
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|