forked from wrenn/wrenn
v0.1.4 (#38) — pipeline test 2
All checks were successful
ci/woodpecker/push/pipeline Pipeline was successful
All checks were successful
ci/woodpecker/push/pipeline Pipeline was successful
This commit is contained in:
@ -311,10 +311,17 @@ func runPtyLoop(
|
||||
}
|
||||
}()
|
||||
|
||||
// Input pump: read from WebSocket, dispatch to host agent.
|
||||
// Input pump: decouple WebSocket reads from RPC dispatch.
|
||||
// Reader goroutine drains the WebSocket into a buffered channel;
|
||||
// sender goroutine dispatches RPCs at its own pace. This prevents
|
||||
// slow RPCs from stalling WebSocket reads and causing proxy timeouts.
|
||||
inputCh := make(chan wsPtyIn, 64)
|
||||
|
||||
// Reader: drain WebSocket as fast as possible.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(inputCh)
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
@ -328,6 +335,22 @@ func runPtyLoop(
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case inputCh <- msg:
|
||||
default:
|
||||
// Buffer full — drop frame to keep reader unblocked.
|
||||
slog.Debug("pty input buffer full, dropping frame", "type", msg.Type)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Sender: dispatch RPCs from channel, coalescing consecutive input messages.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer cancel()
|
||||
|
||||
for msg := range inputCh {
|
||||
// Use a background context for unary RPCs so they complete
|
||||
// even if the stream context is being cancelled.
|
||||
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
@ -339,6 +362,10 @@ func runPtyLoop(
|
||||
rpcCancel()
|
||||
continue
|
||||
}
|
||||
|
||||
// Coalesce: drain any queued input messages into a single RPC.
|
||||
data = coalescePtyInput(inputCh, data)
|
||||
|
||||
if _, err := agent.PtySendInput(rpcCtx, connect.NewRequest(&pb.PtySendInputRequest{
|
||||
SandboxId: sandboxID,
|
||||
Tag: tag,
|
||||
@ -394,6 +421,33 @@ func runPtyLoop(
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// coalescePtyInput drains any immediately-available "input" messages from the
|
||||
// channel and appends their decoded data to buf, reducing RPC call volume
|
||||
// during bursts of fast typing.
|
||||
func coalescePtyInput(ch <-chan wsPtyIn, buf []byte) []byte {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-ch:
|
||||
if !ok {
|
||||
return buf
|
||||
}
|
||||
if msg.Type != "input" {
|
||||
// Non-input message — can't coalesce. Put-back isn't possible
|
||||
// with channels, but resize/kill during a typing burst is rare
|
||||
// enough that dropping one is acceptable.
|
||||
return buf
|
||||
}
|
||||
data, err := base64.StdEncoding.DecodeString(msg.Data)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
buf = append(buf, data...)
|
||||
default:
|
||||
return buf
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// newPtyTag returns a PTY session tag: "pty-" + 8 random hex chars.
|
||||
func newPtyTag() string {
|
||||
return "pty-" + id.NewPtyTag()
|
||||
|
||||
Reference in New Issue
Block a user