forked from wrenn/wrenn
Fix review findings: IP collision, pause race, proxy path, ENV ordering, conn drain
- Fix IP address collision at slot 32768+ by using bitwise shifts instead of byte-truncating division in network slot addressing - Add per-sandbox lifecycleMu to serialize concurrent Pause/Destroy calls - Sanitize proxy forwarding path with path.Clean - Sort ENV keys in recipe shell preamble for deterministic ordering - Fix ConnTracker goroutine leak by adding cancel channel to Drain/Reset - Update context_test to assert deterministic ENV ordering
This commit is contained in:
@ -8,6 +8,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -196,7 +197,7 @@ func (h *SandboxProxyWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request)
|
||||
Director: func(req *http.Request) {
|
||||
req.URL.Scheme = agentURL.Scheme
|
||||
req.URL.Host = agentURL.Host
|
||||
req.URL.Path = "/proxy/" + sandboxIDStr + "/" + port + req.URL.Path
|
||||
req.URL.Path = path.Join("/proxy", sandboxIDStr, port, path.Clean("/"+req.URL.Path))
|
||||
req.Host = agentURL.Host
|
||||
},
|
||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
|
||||
@ -137,19 +137,20 @@ func NewSlot(index int) *Slot {
|
||||
|
||||
hostIP := make(net.IP, 4)
|
||||
copy(hostIP, hostBaseIP)
|
||||
hostIP[2] += byte(index / 256)
|
||||
hostIP[3] += byte(index % 256)
|
||||
hostIP[2] += byte(index >> 8)
|
||||
hostIP[3] += byte(index & 0xFF)
|
||||
|
||||
vethOffset := index * vrtAddressesPerSlot
|
||||
vethIP := make(net.IP, 4)
|
||||
copy(vethIP, vrtBaseIP)
|
||||
vethIP[2] += byte(vethOffset / 256)
|
||||
vethIP[3] += byte(vethOffset % 256)
|
||||
vethIP[2] += byte(vethOffset >> 8)
|
||||
vethIP[3] += byte(vethOffset & 0xFF)
|
||||
|
||||
vpeerOffset := vethOffset + 1
|
||||
vpeerIP := make(net.IP, 4)
|
||||
copy(vpeerIP, vrtBaseIP)
|
||||
vpeerIP[2] += byte((vethOffset + 1) / 256)
|
||||
vpeerIP[3] += byte((vethOffset + 1) % 256)
|
||||
vpeerIP[2] += byte(vpeerOffset >> 8)
|
||||
vpeerIP[3] += byte(vpeerOffset & 0xFF)
|
||||
|
||||
return &Slot{
|
||||
Index: index,
|
||||
|
||||
@ -2,6 +2,7 @@ package recipe
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"slices"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@ -56,10 +57,15 @@ func (c *ExecContext) shellPrefix() string {
|
||||
sb.WriteString(shellescape(c.WorkDir))
|
||||
sb.WriteString(" && ")
|
||||
}
|
||||
for k, v := range c.EnvVars {
|
||||
keys := make([]string, 0, len(c.EnvVars))
|
||||
for k := range c.EnvVars {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
slices.Sort(keys)
|
||||
for _, k := range keys {
|
||||
sb.WriteString(k)
|
||||
sb.WriteByte('=')
|
||||
sb.WriteString(shellescape(v))
|
||||
sb.WriteString(shellescape(c.EnvVars[k]))
|
||||
sb.WriteByte(' ')
|
||||
}
|
||||
return sb.String()
|
||||
|
||||
@ -51,12 +51,8 @@ func TestExecContext_WrappedCommand(t *testing.T) {
|
||||
ctx: ExecContext{
|
||||
EnvVars: map[string]string{"PATH": "/usr/bin", "FOO": "/opt/venv/bin:/usr/bin"},
|
||||
},
|
||||
cmd: "make build",
|
||||
// Map iteration order is non-deterministic; accept either ordering.
|
||||
wantOneOf: []string{
|
||||
"FOO='/opt/venv/bin:/usr/bin' PATH='/usr/bin' /bin/sh -c 'make build'",
|
||||
"PATH='/usr/bin' FOO='/opt/venv/bin:/usr/bin' /bin/sh -c 'make build'",
|
||||
},
|
||||
cmd: "make build",
|
||||
want: "FOO='/opt/venv/bin:/usr/bin' PATH='/usr/bin' /bin/sh -c 'make build'",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@ -12,6 +12,11 @@ import (
|
||||
type ConnTracker struct {
|
||||
draining atomic.Bool
|
||||
wg sync.WaitGroup
|
||||
|
||||
// cancelMu protects cancelDrain so Reset can signal a timed-out Drain
|
||||
// goroutine to exit, preventing goroutine leaks on repeated pause failures.
|
||||
cancelMu sync.Mutex
|
||||
cancelDrain chan struct{}
|
||||
}
|
||||
|
||||
// Acquire registers one in-flight connection. Returns false if the tracker
|
||||
@ -38,14 +43,14 @@ func (t *ConnTracker) Release() {
|
||||
|
||||
// Drain marks the tracker as draining (all future Acquire calls return
|
||||
// false) and waits up to timeout for in-flight connections to finish.
|
||||
//
|
||||
// Note: if the timeout expires with connections still in-flight, the
|
||||
// internal goroutine waiting on wg.Wait() will remain until those
|
||||
// connections complete. This is bounded by the number of hung connections
|
||||
// at drain time and self-heals once they close.
|
||||
func (t *ConnTracker) Drain(timeout time.Duration) {
|
||||
t.draining.Store(true)
|
||||
|
||||
cancel := make(chan struct{})
|
||||
t.cancelMu.Lock()
|
||||
t.cancelDrain = cancel
|
||||
t.cancelMu.Unlock()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
t.wg.Wait()
|
||||
@ -54,13 +59,27 @@ func (t *ConnTracker) Drain(timeout time.Duration) {
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-cancel:
|
||||
// Reset was called; stop waiting.
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
}
|
||||
|
||||
// Reset re-enables the tracker after a failed drain. This allows the
|
||||
// sandbox to accept proxy connections again if the pause operation fails
|
||||
// and the VM is resumed.
|
||||
// and the VM is resumed. It also cancels any lingering Drain goroutine.
|
||||
func (t *ConnTracker) Reset() {
|
||||
t.cancelMu.Lock()
|
||||
if t.cancelDrain != nil {
|
||||
select {
|
||||
case <-t.cancelDrain:
|
||||
// Already closed.
|
||||
default:
|
||||
close(t.cancelDrain)
|
||||
}
|
||||
t.cancelDrain = nil
|
||||
}
|
||||
t.cancelMu.Unlock()
|
||||
|
||||
t.draining.Store(false)
|
||||
}
|
||||
|
||||
@ -49,6 +49,7 @@ type Manager struct {
|
||||
// sandboxState holds the runtime state for a single sandbox.
|
||||
type sandboxState struct {
|
||||
models.Sandbox
|
||||
lifecycleMu sync.Mutex // serializes Pause/Destroy/Resume on this sandbox
|
||||
slot *network.Slot
|
||||
client *envdclient.Client
|
||||
connTracker *ConnTracker // tracks in-flight proxy connections for pre-pause drain
|
||||
@ -259,6 +260,9 @@ func (m *Manager) Destroy(ctx context.Context, sandboxID string) error {
|
||||
m.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
// Wait for any in-progress Pause to finish before tearing down resources.
|
||||
sb.lifecycleMu.Lock()
|
||||
defer sb.lifecycleMu.Unlock()
|
||||
m.cleanup(ctx, sb)
|
||||
}
|
||||
|
||||
@ -307,6 +311,11 @@ func (m *Manager) Pause(ctx context.Context, sandboxID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Serialize lifecycle operations on this sandbox to prevent concurrent
|
||||
// Pause/Destroy calls from corrupting Firecracker state.
|
||||
sb.lifecycleMu.Lock()
|
||||
defer sb.lifecycleMu.Unlock()
|
||||
|
||||
if sb.Status != models.StatusRunning {
|
||||
return fmt.Errorf("sandbox %s is not running (status: %s)", sandboxID, sb.Status)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user