From 82d281b5b55ae454322e37594b13a88b87fe1601 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Wed, 15 Apr 2026 03:02:29 +0600 Subject: [PATCH] Implement least-loaded host scheduler with bottleneck-first strategy Replace round-robin scheduling with resource-aware host selection that picks the host with the most headroom at its tightest resource. Extends the HostScheduler interface with memory/disk params for admission control. --- cmd/control-plane/main.go | 4 +- db/queries/hosts.sql | 35 ++++++ internal/db/hosts.sql.go | 106 ++++++++++++++++++ internal/scheduler/least_loaded.go | 170 +++++++++++++++++++++++++++++ internal/scheduler/round_robin.go | 11 +- internal/service/build.go | 2 +- internal/service/sandbox.go | 2 +- 7 files changed, 323 insertions(+), 7 deletions(-) diff --git a/cmd/control-plane/main.go b/cmd/control-plane/main.go index d7ae18d..a7235a7 100644 --- a/cmd/control-plane/main.go +++ b/cmd/control-plane/main.go @@ -109,8 +109,8 @@ func main() { hostPool := lifecycle.NewHostClientPoolTLS(auth.CPClientTLSConfig(ca, cpCertStore)) slog.Info("host client pool: mTLS enabled") - // Scheduler — picks a host for each new sandbox (round-robin for now). - hostScheduler := scheduler.NewRoundRobinScheduler(queries) + // Scheduler — picks a host for each new sandbox (least-loaded, bottleneck-first). + hostScheduler := scheduler.NewLeastLoadedScheduler(queries) // OAuth provider registry. oauthRegistry := oauth.NewRegistry() diff --git a/db/queries/hosts.sql b/db/queries/hosts.sql index 0a5a150..3e133cb 100644 --- a/db/queries/hosts.sql +++ b/db/queries/hosts.sql @@ -81,6 +81,41 @@ SELECT * FROM hosts WHERE id = $1 AND team_id = $2; -- Returns all hosts that have completed registration (not pending/offline). SELECT * FROM hosts WHERE status NOT IN ('pending', 'offline') ORDER BY created_at; +-- name: GetHostsWithLoad :many +-- Returns all online hosts with raw per-host sandbox resource consumption. +-- Separates running and paused sandbox totals so the caller can apply its own formulas. +SELECT + h.id, + h.type, + h.team_id, + h.provider, + h.availability_zone, + h.arch, + h.cpu_cores, + h.memory_mb, + h.disk_gb, + h.address, + h.status, + h.last_heartbeat_at, + h.metadata, + h.created_by, + h.created_at, + h.updated_at, + h.cert_fingerprint, + h.cert_expires_at, + COALESCE(SUM(s.vcpus) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_vcpus, + COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_memory_mb, + COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_disk_mb, + COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_memory_mb, + COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_disk_mb +FROM hosts h +LEFT JOIN sandboxes s ON s.host_id = h.id + AND s.status IN ('running', 'paused', 'starting', 'pending') +WHERE h.status = 'online' + AND h.address != '' +GROUP BY h.id +ORDER BY h.created_at; + -- name: UpdateHostHeartbeatAndStatus :execrows -- Updates last_heartbeat_at and transitions unreachable hosts back to online. -- Returns 0 if no host was found (deleted), which the caller treats as 404. diff --git a/internal/db/hosts.sql.go b/internal/db/hosts.sql.go index 2e3962b..0e8f415 100644 --- a/internal/db/hosts.sql.go +++ b/internal/db/hosts.sql.go @@ -154,6 +154,112 @@ func (q *Queries) GetHostTokensByHost(ctx context.Context, hostID pgtype.UUID) ( return items, nil } +const getHostsWithLoad = `-- name: GetHostsWithLoad :many +SELECT + h.id, + h.type, + h.team_id, + h.provider, + h.availability_zone, + h.arch, + h.cpu_cores, + h.memory_mb, + h.disk_gb, + h.address, + h.status, + h.last_heartbeat_at, + h.metadata, + h.created_by, + h.created_at, + h.updated_at, + h.cert_fingerprint, + h.cert_expires_at, + COALESCE(SUM(s.vcpus) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_vcpus, + COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_memory_mb, + COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_disk_mb, + COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_memory_mb, + COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_disk_mb +FROM hosts h +LEFT JOIN sandboxes s ON s.host_id = h.id + AND s.status IN ('running', 'paused', 'starting', 'pending') +WHERE h.status = 'online' + AND h.address != '' +GROUP BY h.id +ORDER BY h.created_at +` + +type GetHostsWithLoadRow struct { + ID pgtype.UUID `json:"id"` + Type string `json:"type"` + TeamID pgtype.UUID `json:"team_id"` + Provider string `json:"provider"` + AvailabilityZone string `json:"availability_zone"` + Arch string `json:"arch"` + CpuCores int32 `json:"cpu_cores"` + MemoryMb int32 `json:"memory_mb"` + DiskGb int32 `json:"disk_gb"` + Address string `json:"address"` + Status string `json:"status"` + LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"` + Metadata []byte `json:"metadata"` + CreatedBy pgtype.UUID `json:"created_by"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + CertFingerprint string `json:"cert_fingerprint"` + CertExpiresAt pgtype.Timestamptz `json:"cert_expires_at"` + RunningVcpus int32 `json:"running_vcpus"` + RunningMemoryMb int32 `json:"running_memory_mb"` + RunningDiskMb int32 `json:"running_disk_mb"` + PausedMemoryMb int32 `json:"paused_memory_mb"` + PausedDiskMb int32 `json:"paused_disk_mb"` +} + +// Returns all online hosts with raw per-host sandbox resource consumption. +// Separates running and paused sandbox totals so the caller can apply its own formulas. +func (q *Queries) GetHostsWithLoad(ctx context.Context) ([]GetHostsWithLoadRow, error) { + rows, err := q.db.Query(ctx, getHostsWithLoad) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetHostsWithLoadRow + for rows.Next() { + var i GetHostsWithLoadRow + if err := rows.Scan( + &i.ID, + &i.Type, + &i.TeamID, + &i.Provider, + &i.AvailabilityZone, + &i.Arch, + &i.CpuCores, + &i.MemoryMb, + &i.DiskGb, + &i.Address, + &i.Status, + &i.LastHeartbeatAt, + &i.Metadata, + &i.CreatedBy, + &i.CreatedAt, + &i.UpdatedAt, + &i.CertFingerprint, + &i.CertExpiresAt, + &i.RunningVcpus, + &i.RunningMemoryMb, + &i.RunningDiskMb, + &i.PausedMemoryMb, + &i.PausedDiskMb, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertHost = `-- name: InsertHost :one INSERT INTO hosts (id, type, team_id, provider, availability_zone, created_by) VALUES ($1, $2, $3, $4, $5, $6) diff --git a/internal/scheduler/least_loaded.go b/internal/scheduler/least_loaded.go index 6990da0..6bc2838 100644 --- a/internal/scheduler/least_loaded.go +++ b/internal/scheduler/least_loaded.go @@ -1 +1,171 @@ package scheduler + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgtype" + + "git.omukk.dev/wrenn/wrenn/internal/db" +) + +// Resource overhead reserved for the host OS. +const ( + reservedMemoryMB = 8192 + reservedCPU = 4 + reservedDiskMB = 30720 // 30 GB + cpuOvercommit = 1.5 + pausedMemoryFrac = 0.5 + pausedDiskFrac = 2.0 / 3.0 +) + +// LeastLoadedScheduler picks the online host with the most headroom at its +// tightest resource (bottleneck-first strategy). +// +// For each eligible host it computes the remaining fraction of each resource: +// +// RAM: usable / total where total = host.memory_mb - 8192 +// CPU: usable / total where total = host.cpu_cores * 1.5 - 4 +// Disk: usable / total where total = host.disk_gb * 1024 - 30720 +// +// The host's score is min(ram_frac, cpu_frac, disk_frac). The host with the +// highest score wins. Admission control rejects when no host can fit the +// requested sandbox on RAM or disk; CPU overcommit is allowed. +type LeastLoadedScheduler struct { + db *db.Queries +} + +// NewLeastLoadedScheduler creates a LeastLoadedScheduler backed by the given DB. +func NewLeastLoadedScheduler(queries *db.Queries) *LeastLoadedScheduler { + return &LeastLoadedScheduler{db: queries} +} + +// hostResources holds the computed resource availability for a single host. +type hostResources struct { + host db.Host + ramTotal float64 + ramUsable float64 + cpuTotal float64 + cpuUsable float64 + diskTotal float64 + diskUsable float64 +} + +// bottleneckScore returns the fraction of the tightest resource remaining. +func (h *hostResources) bottleneckScore() float64 { + ramFrac := safeFrac(h.ramUsable, h.ramTotal) + cpuFrac := safeFrac(h.cpuUsable, h.cpuTotal) + diskFrac := safeFrac(h.diskUsable, h.diskTotal) + return min(ramFrac, cpuFrac, diskFrac) +} + +// safeFrac returns usable/total, or 0 when total <= 0. +func safeFrac(usable, total float64) float64 { + if total <= 0 { + return 0 + } + return usable / total +} + +// SelectHost returns the eligible host with the most resource headroom. +func (s *LeastLoadedScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, memoryMb, diskSizeMb int32) (db.Host, error) { + rows, err := s.db.GetHostsWithLoad(ctx) + if err != nil { + return db.Host{}, fmt.Errorf("get hosts with load: %w", err) + } + + // Phase 1: filter eligible hosts and compute resources. + var candidates []hostResources + for i := range rows { + row := &rows[i] + + if isByoc { + if row.Type != "byoc" || !row.TeamID.Valid || row.TeamID != teamID { + continue + } + } else { + if row.Type != "regular" { + continue + } + } + + hr := computeResources(row) + candidates = append(candidates, hr) + } + + if len(candidates) == 0 { + if isByoc { + return db.Host{}, fmt.Errorf("no online BYOC hosts available for team") + } + return db.Host{}, fmt.Errorf("no online platform hosts available") + } + + // Phase 2: admission control + selection — pick the highest-scoring host + // that can actually fit the requested sandbox (RAM and disk). + best := -1 + bestScore := 0.0 + for i := range candidates { + if memoryMb > 0 && candidates[i].ramUsable < float64(memoryMb) { + continue + } + if diskSizeMb > 0 && candidates[i].diskUsable < float64(diskSizeMb) { + continue + } + score := candidates[i].bottleneckScore() + if best == -1 || score > bestScore { + best = i + bestScore = score + } + } + + if best == -1 { + return db.Host{}, fmt.Errorf("no host has sufficient resources: need %d MB memory, %d MB disk", memoryMb, diskSizeMb) + } + + return candidates[best].host, nil +} + +// computeResources converts a raw DB row into computed resource availability. +func computeResources(row *db.GetHostsWithLoadRow) hostResources { + ramTotal := float64(row.MemoryMb) - reservedMemoryMB + cpuTotal := float64(row.CpuCores)*cpuOvercommit - reservedCPU + diskTotal := float64(row.DiskGb)*1024 - reservedDiskMB + + usedMemory := float64(row.RunningMemoryMb) + pausedMemoryFrac*float64(row.PausedMemoryMb) + usedCPU := float64(row.RunningVcpus) + usedDisk := float64(row.RunningDiskMb) + pausedDiskFrac*float64(row.PausedDiskMb) + + return hostResources{ + host: hostFromRow(row), + ramTotal: ramTotal, + ramUsable: ramTotal - usedMemory, + cpuTotal: cpuTotal, + cpuUsable: cpuTotal - usedCPU, + diskTotal: diskTotal, + diskUsable: diskTotal - usedDisk, + } +} + +// hostFromRow converts the query row back to a plain db.Host. +func hostFromRow(r *db.GetHostsWithLoadRow) db.Host { + return db.Host{ + ID: r.ID, + Type: r.Type, + TeamID: r.TeamID, + Provider: r.Provider, + AvailabilityZone: r.AvailabilityZone, + Arch: r.Arch, + CpuCores: r.CpuCores, + MemoryMb: r.MemoryMb, + DiskGb: r.DiskGb, + Address: r.Address, + Status: r.Status, + LastHeartbeatAt: r.LastHeartbeatAt, + Metadata: r.Metadata, + CreatedBy: r.CreatedBy, + CreatedAt: r.CreatedAt, + UpdatedAt: r.UpdatedAt, + CertFingerprint: r.CertFingerprint, + CertExpiresAt: r.CertExpiresAt, + } +} diff --git a/internal/scheduler/round_robin.go b/internal/scheduler/round_robin.go index 7e4962d..f2f47ad 100644 --- a/internal/scheduler/round_robin.go +++ b/internal/scheduler/round_robin.go @@ -16,8 +16,11 @@ type HostScheduler interface { // SelectHost returns a host that can accept a new sandbox. // For BYOC teams (isByoc=true), only online BYOC hosts belonging to teamID // are considered. For non-BYOC teams, only online regular (platform) hosts - // are considered. Returns an error if no suitable host is available. - SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool) (db.Host, error) + // are considered. + // memoryMb and diskSizeMb describe the sandbox's resource requirements so + // the scheduler can perform admission control (reject when no host has + // enough RAM or disk). Pass 0 to skip admission checks. + SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, memoryMb, diskSizeMb int32) (db.Host, error) } // RoundRobinScheduler cycles through eligible online hosts in round-robin order. @@ -34,7 +37,9 @@ func NewRoundRobinScheduler(queries *db.Queries) *RoundRobinScheduler { } // SelectHost returns the next eligible online host in round-robin order. -func (s *RoundRobinScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool) (db.Host, error) { +// The memoryMb and diskSizeMb parameters are ignored — round-robin performs +// no admission control. +func (s *RoundRobinScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, _, _ int32) (db.Host, error) { hosts, err := s.db.ListActiveHosts(ctx) if err != nil { return db.Host{}, fmt.Errorf("list hosts: %w", err) diff --git a/internal/service/build.go b/internal/service/build.go index 97a7523..563c8cd 100644 --- a/internal/service/build.go +++ b/internal/service/build.go @@ -283,7 +283,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) { } // Pick a platform host and create a sandbox. - host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false) + host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false, build.MemoryMb, 5120) if err != nil { s.failBuild(buildCtx, buildID, fmt.Sprintf("no host available: %v", err)) return diff --git a/internal/service/sandbox.go b/internal/service/sandbox.go index 5ebd4d2..9af480b 100644 --- a/internal/service/sandbox.go +++ b/internal/service/sandbox.go @@ -118,7 +118,7 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db. } // Pick a host for this sandbox. - host, err := s.Scheduler.SelectHost(ctx, p.TeamID, team.IsByoc) + host, err := s.Scheduler.SelectHost(ctx, p.TeamID, team.IsByoc, p.MemoryMB, p.DiskSizeMB) if err != nil { return db.Sandbox{}, fmt.Errorf("select host: %w", err) }