forked from wrenn/wrenn
Merge pull request 'Implemented least-loaded host scheduler with bottleneck-first strategy' (#25) from feat/host-scheduler into dev
Reviewed-on: wrenn/wrenn#25
This commit is contained in:
@ -109,8 +109,8 @@ func main() {
|
||||
hostPool := lifecycle.NewHostClientPoolTLS(auth.CPClientTLSConfig(ca, cpCertStore))
|
||||
slog.Info("host client pool: mTLS enabled")
|
||||
|
||||
// Scheduler — picks a host for each new sandbox (round-robin for now).
|
||||
hostScheduler := scheduler.NewRoundRobinScheduler(queries)
|
||||
// Scheduler — picks a host for each new sandbox (least-loaded, bottleneck-first).
|
||||
hostScheduler := scheduler.NewLeastLoadedScheduler(queries)
|
||||
|
||||
// OAuth provider registry.
|
||||
oauthRegistry := oauth.NewRegistry()
|
||||
|
||||
@ -81,6 +81,41 @@ SELECT * FROM hosts WHERE id = $1 AND team_id = $2;
|
||||
-- Returns all hosts that have completed registration (not pending/offline).
|
||||
SELECT * FROM hosts WHERE status NOT IN ('pending', 'offline') ORDER BY created_at;
|
||||
|
||||
-- name: GetHostsWithLoad :many
|
||||
-- Returns all online hosts with raw per-host sandbox resource consumption.
|
||||
-- Separates running and paused sandbox totals so the caller can apply its own formulas.
|
||||
SELECT
|
||||
h.id,
|
||||
h.type,
|
||||
h.team_id,
|
||||
h.provider,
|
||||
h.availability_zone,
|
||||
h.arch,
|
||||
h.cpu_cores,
|
||||
h.memory_mb,
|
||||
h.disk_gb,
|
||||
h.address,
|
||||
h.status,
|
||||
h.last_heartbeat_at,
|
||||
h.metadata,
|
||||
h.created_by,
|
||||
h.created_at,
|
||||
h.updated_at,
|
||||
h.cert_fingerprint,
|
||||
h.cert_expires_at,
|
||||
COALESCE(SUM(s.vcpus) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_vcpus,
|
||||
COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_memory_mb,
|
||||
COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_disk_mb,
|
||||
COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_memory_mb,
|
||||
COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_disk_mb
|
||||
FROM hosts h
|
||||
LEFT JOIN sandboxes s ON s.host_id = h.id
|
||||
AND s.status IN ('running', 'paused', 'starting', 'pending')
|
||||
WHERE h.status = 'online'
|
||||
AND h.address != ''
|
||||
GROUP BY h.id
|
||||
ORDER BY h.created_at;
|
||||
|
||||
-- name: UpdateHostHeartbeatAndStatus :execrows
|
||||
-- Updates last_heartbeat_at and transitions unreachable hosts back to online.
|
||||
-- Returns 0 if no host was found (deleted), which the caller treats as 404.
|
||||
|
||||
@ -154,6 +154,112 @@ func (q *Queries) GetHostTokensByHost(ctx context.Context, hostID pgtype.UUID) (
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getHostsWithLoad = `-- name: GetHostsWithLoad :many
|
||||
SELECT
|
||||
h.id,
|
||||
h.type,
|
||||
h.team_id,
|
||||
h.provider,
|
||||
h.availability_zone,
|
||||
h.arch,
|
||||
h.cpu_cores,
|
||||
h.memory_mb,
|
||||
h.disk_gb,
|
||||
h.address,
|
||||
h.status,
|
||||
h.last_heartbeat_at,
|
||||
h.metadata,
|
||||
h.created_by,
|
||||
h.created_at,
|
||||
h.updated_at,
|
||||
h.cert_fingerprint,
|
||||
h.cert_expires_at,
|
||||
COALESCE(SUM(s.vcpus) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_vcpus,
|
||||
COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_memory_mb,
|
||||
COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status IN ('running', 'starting', 'pending')), 0)::int AS running_disk_mb,
|
||||
COALESCE(SUM(s.memory_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_memory_mb,
|
||||
COALESCE(SUM(s.disk_size_mb) FILTER (WHERE s.status = 'paused'), 0)::int AS paused_disk_mb
|
||||
FROM hosts h
|
||||
LEFT JOIN sandboxes s ON s.host_id = h.id
|
||||
AND s.status IN ('running', 'paused', 'starting', 'pending')
|
||||
WHERE h.status = 'online'
|
||||
AND h.address != ''
|
||||
GROUP BY h.id
|
||||
ORDER BY h.created_at
|
||||
`
|
||||
|
||||
type GetHostsWithLoadRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Type string `json:"type"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
Provider string `json:"provider"`
|
||||
AvailabilityZone string `json:"availability_zone"`
|
||||
Arch string `json:"arch"`
|
||||
CpuCores int32 `json:"cpu_cores"`
|
||||
MemoryMb int32 `json:"memory_mb"`
|
||||
DiskGb int32 `json:"disk_gb"`
|
||||
Address string `json:"address"`
|
||||
Status string `json:"status"`
|
||||
LastHeartbeatAt pgtype.Timestamptz `json:"last_heartbeat_at"`
|
||||
Metadata []byte `json:"metadata"`
|
||||
CreatedBy pgtype.UUID `json:"created_by"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
CertFingerprint string `json:"cert_fingerprint"`
|
||||
CertExpiresAt pgtype.Timestamptz `json:"cert_expires_at"`
|
||||
RunningVcpus int32 `json:"running_vcpus"`
|
||||
RunningMemoryMb int32 `json:"running_memory_mb"`
|
||||
RunningDiskMb int32 `json:"running_disk_mb"`
|
||||
PausedMemoryMb int32 `json:"paused_memory_mb"`
|
||||
PausedDiskMb int32 `json:"paused_disk_mb"`
|
||||
}
|
||||
|
||||
// Returns all online hosts with raw per-host sandbox resource consumption.
|
||||
// Separates running and paused sandbox totals so the caller can apply its own formulas.
|
||||
func (q *Queries) GetHostsWithLoad(ctx context.Context) ([]GetHostsWithLoadRow, error) {
|
||||
rows, err := q.db.Query(ctx, getHostsWithLoad)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetHostsWithLoadRow
|
||||
for rows.Next() {
|
||||
var i GetHostsWithLoadRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.Type,
|
||||
&i.TeamID,
|
||||
&i.Provider,
|
||||
&i.AvailabilityZone,
|
||||
&i.Arch,
|
||||
&i.CpuCores,
|
||||
&i.MemoryMb,
|
||||
&i.DiskGb,
|
||||
&i.Address,
|
||||
&i.Status,
|
||||
&i.LastHeartbeatAt,
|
||||
&i.Metadata,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.CertFingerprint,
|
||||
&i.CertExpiresAt,
|
||||
&i.RunningVcpus,
|
||||
&i.RunningMemoryMb,
|
||||
&i.RunningDiskMb,
|
||||
&i.PausedMemoryMb,
|
||||
&i.PausedDiskMb,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertHost = `-- name: InsertHost :one
|
||||
INSERT INTO hosts (id, type, team_id, provider, availability_zone, created_by)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
|
||||
@ -1 +1,171 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"git.omukk.dev/wrenn/wrenn/internal/db"
|
||||
)
|
||||
|
||||
// Resource overhead reserved for the host OS.
|
||||
const (
|
||||
reservedMemoryMB = 8192
|
||||
reservedCPU = 4
|
||||
reservedDiskMB = 30720 // 30 GB
|
||||
cpuOvercommit = 1.5
|
||||
pausedMemoryFrac = 0.5
|
||||
pausedDiskFrac = 2.0 / 3.0
|
||||
)
|
||||
|
||||
// LeastLoadedScheduler picks the online host with the most headroom at its
|
||||
// tightest resource (bottleneck-first strategy).
|
||||
//
|
||||
// For each eligible host it computes the remaining fraction of each resource:
|
||||
//
|
||||
// RAM: usable / total where total = host.memory_mb - 8192
|
||||
// CPU: usable / total where total = host.cpu_cores * 1.5 - 4
|
||||
// Disk: usable / total where total = host.disk_gb * 1024 - 30720
|
||||
//
|
||||
// The host's score is min(ram_frac, cpu_frac, disk_frac). The host with the
|
||||
// highest score wins. Admission control rejects when no host can fit the
|
||||
// requested sandbox on RAM or disk; CPU overcommit is allowed.
|
||||
type LeastLoadedScheduler struct {
|
||||
db *db.Queries
|
||||
}
|
||||
|
||||
// NewLeastLoadedScheduler creates a LeastLoadedScheduler backed by the given DB.
|
||||
func NewLeastLoadedScheduler(queries *db.Queries) *LeastLoadedScheduler {
|
||||
return &LeastLoadedScheduler{db: queries}
|
||||
}
|
||||
|
||||
// hostResources holds the computed resource availability for a single host.
|
||||
type hostResources struct {
|
||||
host db.Host
|
||||
ramTotal float64
|
||||
ramUsable float64
|
||||
cpuTotal float64
|
||||
cpuUsable float64
|
||||
diskTotal float64
|
||||
diskUsable float64
|
||||
}
|
||||
|
||||
// bottleneckScore returns the fraction of the tightest resource remaining.
|
||||
func (h *hostResources) bottleneckScore() float64 {
|
||||
ramFrac := safeFrac(h.ramUsable, h.ramTotal)
|
||||
cpuFrac := safeFrac(h.cpuUsable, h.cpuTotal)
|
||||
diskFrac := safeFrac(h.diskUsable, h.diskTotal)
|
||||
return min(ramFrac, cpuFrac, diskFrac)
|
||||
}
|
||||
|
||||
// safeFrac returns usable/total, or 0 when total <= 0.
|
||||
func safeFrac(usable, total float64) float64 {
|
||||
if total <= 0 {
|
||||
return 0
|
||||
}
|
||||
return usable / total
|
||||
}
|
||||
|
||||
// SelectHost returns the eligible host with the most resource headroom.
|
||||
func (s *LeastLoadedScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, memoryMb, diskSizeMb int32) (db.Host, error) {
|
||||
rows, err := s.db.GetHostsWithLoad(ctx)
|
||||
if err != nil {
|
||||
return db.Host{}, fmt.Errorf("get hosts with load: %w", err)
|
||||
}
|
||||
|
||||
// Phase 1: filter eligible hosts and compute resources.
|
||||
var candidates []hostResources
|
||||
for i := range rows {
|
||||
row := &rows[i]
|
||||
|
||||
if isByoc {
|
||||
if row.Type != "byoc" || !row.TeamID.Valid || row.TeamID != teamID {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if row.Type != "regular" {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
hr := computeResources(row)
|
||||
candidates = append(candidates, hr)
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
if isByoc {
|
||||
return db.Host{}, fmt.Errorf("no online BYOC hosts available for team")
|
||||
}
|
||||
return db.Host{}, fmt.Errorf("no online platform hosts available")
|
||||
}
|
||||
|
||||
// Phase 2: admission control + selection — pick the highest-scoring host
|
||||
// that can actually fit the requested sandbox (RAM and disk).
|
||||
best := -1
|
||||
bestScore := 0.0
|
||||
for i := range candidates {
|
||||
if memoryMb > 0 && candidates[i].ramUsable < float64(memoryMb) {
|
||||
continue
|
||||
}
|
||||
if diskSizeMb > 0 && candidates[i].diskUsable < float64(diskSizeMb) {
|
||||
continue
|
||||
}
|
||||
score := candidates[i].bottleneckScore()
|
||||
if best == -1 || score > bestScore {
|
||||
best = i
|
||||
bestScore = score
|
||||
}
|
||||
}
|
||||
|
||||
if best == -1 {
|
||||
return db.Host{}, fmt.Errorf("no host has sufficient resources: need %d MB memory, %d MB disk", memoryMb, diskSizeMb)
|
||||
}
|
||||
|
||||
return candidates[best].host, nil
|
||||
}
|
||||
|
||||
// computeResources converts a raw DB row into computed resource availability.
|
||||
func computeResources(row *db.GetHostsWithLoadRow) hostResources {
|
||||
ramTotal := float64(row.MemoryMb) - reservedMemoryMB
|
||||
cpuTotal := float64(row.CpuCores)*cpuOvercommit - reservedCPU
|
||||
diskTotal := float64(row.DiskGb)*1024 - reservedDiskMB
|
||||
|
||||
usedMemory := float64(row.RunningMemoryMb) + pausedMemoryFrac*float64(row.PausedMemoryMb)
|
||||
usedCPU := float64(row.RunningVcpus)
|
||||
usedDisk := float64(row.RunningDiskMb) + pausedDiskFrac*float64(row.PausedDiskMb)
|
||||
|
||||
return hostResources{
|
||||
host: hostFromRow(row),
|
||||
ramTotal: ramTotal,
|
||||
ramUsable: ramTotal - usedMemory,
|
||||
cpuTotal: cpuTotal,
|
||||
cpuUsable: cpuTotal - usedCPU,
|
||||
diskTotal: diskTotal,
|
||||
diskUsable: diskTotal - usedDisk,
|
||||
}
|
||||
}
|
||||
|
||||
// hostFromRow converts the query row back to a plain db.Host.
|
||||
func hostFromRow(r *db.GetHostsWithLoadRow) db.Host {
|
||||
return db.Host{
|
||||
ID: r.ID,
|
||||
Type: r.Type,
|
||||
TeamID: r.TeamID,
|
||||
Provider: r.Provider,
|
||||
AvailabilityZone: r.AvailabilityZone,
|
||||
Arch: r.Arch,
|
||||
CpuCores: r.CpuCores,
|
||||
MemoryMb: r.MemoryMb,
|
||||
DiskGb: r.DiskGb,
|
||||
Address: r.Address,
|
||||
Status: r.Status,
|
||||
LastHeartbeatAt: r.LastHeartbeatAt,
|
||||
Metadata: r.Metadata,
|
||||
CreatedBy: r.CreatedBy,
|
||||
CreatedAt: r.CreatedAt,
|
||||
UpdatedAt: r.UpdatedAt,
|
||||
CertFingerprint: r.CertFingerprint,
|
||||
CertExpiresAt: r.CertExpiresAt,
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,8 +16,11 @@ type HostScheduler interface {
|
||||
// SelectHost returns a host that can accept a new sandbox.
|
||||
// For BYOC teams (isByoc=true), only online BYOC hosts belonging to teamID
|
||||
// are considered. For non-BYOC teams, only online regular (platform) hosts
|
||||
// are considered. Returns an error if no suitable host is available.
|
||||
SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool) (db.Host, error)
|
||||
// are considered.
|
||||
// memoryMb and diskSizeMb describe the sandbox's resource requirements so
|
||||
// the scheduler can perform admission control (reject when no host has
|
||||
// enough RAM or disk). Pass 0 to skip admission checks.
|
||||
SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, memoryMb, diskSizeMb int32) (db.Host, error)
|
||||
}
|
||||
|
||||
// RoundRobinScheduler cycles through eligible online hosts in round-robin order.
|
||||
@ -34,7 +37,9 @@ func NewRoundRobinScheduler(queries *db.Queries) *RoundRobinScheduler {
|
||||
}
|
||||
|
||||
// SelectHost returns the next eligible online host in round-robin order.
|
||||
func (s *RoundRobinScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool) (db.Host, error) {
|
||||
// The memoryMb and diskSizeMb parameters are ignored — round-robin performs
|
||||
// no admission control.
|
||||
func (s *RoundRobinScheduler) SelectHost(ctx context.Context, teamID pgtype.UUID, isByoc bool, _, _ int32) (db.Host, error) {
|
||||
hosts, err := s.db.ListActiveHosts(ctx)
|
||||
if err != nil {
|
||||
return db.Host{}, fmt.Errorf("list hosts: %w", err)
|
||||
|
||||
@ -283,7 +283,7 @@ func (s *BuildService) executeBuild(ctx context.Context, buildIDStr string) {
|
||||
}
|
||||
|
||||
// Pick a platform host and create a sandbox.
|
||||
host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false)
|
||||
host, err := s.Scheduler.SelectHost(buildCtx, id.PlatformTeamID, false, build.MemoryMb, 5120)
|
||||
if err != nil {
|
||||
s.failBuild(buildCtx, buildID, fmt.Sprintf("no host available: %v", err))
|
||||
return
|
||||
|
||||
@ -118,7 +118,7 @@ func (s *SandboxService) Create(ctx context.Context, p SandboxCreateParams) (db.
|
||||
}
|
||||
|
||||
// Pick a host for this sandbox.
|
||||
host, err := s.Scheduler.SelectHost(ctx, p.TeamID, team.IsByoc)
|
||||
host, err := s.Scheduler.SelectHost(ctx, p.TeamID, team.IsByoc, p.MemoryMB, p.DiskSizeMB)
|
||||
if err != nil {
|
||||
return db.Sandbox{}, fmt.Errorf("select host: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user