forked from wrenn/wrenn
Add live stats page with metrics sampling and route split
- New sandbox_metrics_snapshots table sampled every 10s (60-day retention) - Background MetricsSampler goroutine wired into control plane startup - GET /v1/sandboxes/stats?range=5m|1h|6h|24h|30d endpoint with adaptive polling intervals; reserved CPU/RAM uses ceil(paused/2) formula - StatsPanel component: 4 stat cards + 2 Chart.js line charts (straight lines, integer y-axis for running count, dual-axis for CPU/RAM) - Range filter persisted in URL query param; polls update data silently (no blink — loading state only shown on initial mount) - Split /dashboard/capsules into /list and /stats sub-routes with shared layout; capsuleRunningCount store syncs badge across routes - CreateCapsuleDialog extracted as reusable component
This commit is contained in:
100
internal/api/handlers_stats.go
Normal file
100
internal/api/handlers_stats.go
Normal file
@ -0,0 +1,100 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/service"
|
||||
)
|
||||
|
||||
type statsHandler struct {
|
||||
svc *service.StatsService
|
||||
}
|
||||
|
||||
func newStatsHandler(svc *service.StatsService) *statsHandler {
|
||||
return &statsHandler{svc: svc}
|
||||
}
|
||||
|
||||
type statsCurrentResponse struct {
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VCPUsReserved int32 `json:"vcpus_reserved"`
|
||||
MemoryMBReserved int32 `json:"memory_mb_reserved"`
|
||||
SampledAt string `json:"sampled_at,omitempty"`
|
||||
}
|
||||
|
||||
type statsPeaksResponse struct {
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VCPUs int32 `json:"vcpus"`
|
||||
MemoryMB int32 `json:"memory_mb"`
|
||||
}
|
||||
|
||||
type statsSeriesResponse struct {
|
||||
Labels []string `json:"labels"`
|
||||
Running []int32 `json:"running"`
|
||||
VCPUs []int32 `json:"vcpus"`
|
||||
MemoryMB []int32 `json:"memory_mb"`
|
||||
}
|
||||
|
||||
type statsResponse struct {
|
||||
Range string `json:"range"`
|
||||
Current statsCurrentResponse `json:"current"`
|
||||
Peaks statsPeaksResponse `json:"peaks"`
|
||||
Series statsSeriesResponse `json:"series"`
|
||||
}
|
||||
|
||||
// GetStats handles GET /v1/sandboxes/stats?range=5m|1h|6h|24h|30d
|
||||
func (h *statsHandler) GetStats(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
|
||||
rangeParam := r.URL.Query().Get("range")
|
||||
if rangeParam == "" {
|
||||
rangeParam = string(service.Range1h)
|
||||
}
|
||||
tr := service.TimeRange(rangeParam)
|
||||
if !service.ValidRange(tr) {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "range must be one of: 5m, 1h, 6h, 24h, 30d")
|
||||
return
|
||||
}
|
||||
|
||||
current, peaks, series, err := h.svc.GetStats(r.Context(), ac.TeamID, tr)
|
||||
if err != nil {
|
||||
slog.Error("stats handler: get stats failed", "team_id", ac.TeamID, "error", err)
|
||||
writeError(w, http.StatusInternalServerError, "internal_error", "failed to retrieve stats")
|
||||
return
|
||||
}
|
||||
|
||||
resp := statsResponse{
|
||||
Range: rangeParam,
|
||||
Current: statsCurrentResponse{
|
||||
RunningCount: current.RunningCount,
|
||||
VCPUsReserved: current.VCPUsReserved,
|
||||
MemoryMBReserved: current.MemoryMBReserved,
|
||||
},
|
||||
Peaks: statsPeaksResponse{
|
||||
RunningCount: peaks.RunningCount,
|
||||
VCPUs: peaks.VCPUs,
|
||||
MemoryMB: peaks.MemoryMB,
|
||||
},
|
||||
Series: statsSeriesResponse{
|
||||
Labels: make([]string, len(series)),
|
||||
Running: make([]int32, len(series)),
|
||||
VCPUs: make([]int32, len(series)),
|
||||
MemoryMB: make([]int32, len(series)),
|
||||
},
|
||||
}
|
||||
|
||||
if !current.SampledAt.IsZero() {
|
||||
resp.Current.SampledAt = current.SampledAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
for i, pt := range series {
|
||||
resp.Series.Labels[i] = pt.Bucket.UTC().Format(time.RFC3339)
|
||||
resp.Series.Running[i] = pt.RunningCount
|
||||
resp.Series.VCPUs[i] = pt.VCPUsReserved
|
||||
resp.Series.MemoryMB[i] = pt.MemoryMBReserved
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
68
internal/api/metrics_sampler.go
Normal file
68
internal/api/metrics_sampler.go
Normal file
@ -0,0 +1,68 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
)
|
||||
|
||||
// MetricsSampler records per-team sandbox resource usage to
|
||||
// sandbox_metrics_snapshots every interval. It also prunes rows older than
|
||||
// 60 days on each tick to keep the table bounded.
|
||||
type MetricsSampler struct {
|
||||
db *db.Queries
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// NewMetricsSampler creates a MetricsSampler.
|
||||
func NewMetricsSampler(queries *db.Queries, interval time.Duration) *MetricsSampler {
|
||||
return &MetricsSampler{db: queries, interval: interval}
|
||||
}
|
||||
|
||||
// Start runs the sampler loop until the context is cancelled.
|
||||
func (s *MetricsSampler) Start(ctx context.Context) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Sample immediately on startup.
|
||||
s.run(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.run(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *MetricsSampler) run(ctx context.Context) {
|
||||
s.prune(ctx)
|
||||
if err := s.sample(ctx); err != nil {
|
||||
slog.Warn("metrics sampler: sample failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetricsSampler) sample(ctx context.Context) error {
|
||||
rows, err := s.db.SampleSandboxMetrics(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, row := range rows {
|
||||
if err := s.db.InsertMetricsSnapshot(ctx, db.InsertMetricsSnapshotParams(row)); err != nil {
|
||||
slog.Warn("metrics sampler: insert snapshot failed", "team_id", row.TeamID, "error", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MetricsSampler) prune(ctx context.Context) {
|
||||
if err := s.db.PruneOldMetrics(ctx); err != nil {
|
||||
slog.Warn("metrics sampler: prune failed", "error", err)
|
||||
}
|
||||
}
|
||||
@ -613,6 +613,32 @@ paths:
|
||||
items:
|
||||
$ref: "#/components/schemas/Sandbox"
|
||||
|
||||
/v1/sandboxes/stats:
|
||||
get:
|
||||
summary: Get sandbox usage stats for your team
|
||||
operationId: getSandboxStats
|
||||
tags: [sandboxes]
|
||||
security:
|
||||
- apiKeyAuth: []
|
||||
parameters:
|
||||
- name: range
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: string
|
||||
enum: [5m, 1h, 6h, 24h, 30d]
|
||||
default: 1h
|
||||
description: Time window for the time-series data.
|
||||
responses:
|
||||
"200":
|
||||
description: Sandbox stats for the team
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SandboxStats"
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
|
||||
/v1/sandboxes/{id}:
|
||||
parameters:
|
||||
- name: id
|
||||
@ -1578,6 +1604,57 @@ components:
|
||||
after this duration of inactivity (no exec or ping). 0 means
|
||||
no auto-pause.
|
||||
|
||||
SandboxStats:
|
||||
type: object
|
||||
properties:
|
||||
range:
|
||||
type: string
|
||||
enum: [5m, 1h, 6h, 24h, 30d]
|
||||
current:
|
||||
type: object
|
||||
properties:
|
||||
running_count:
|
||||
type: integer
|
||||
vcpus_reserved:
|
||||
type: integer
|
||||
memory_mb_reserved:
|
||||
type: integer
|
||||
sampled_at:
|
||||
type: string
|
||||
format: date-time
|
||||
nullable: true
|
||||
peaks:
|
||||
type: object
|
||||
description: Maximum values over the last 30 days.
|
||||
properties:
|
||||
running_count:
|
||||
type: integer
|
||||
vcpus:
|
||||
type: integer
|
||||
memory_mb:
|
||||
type: integer
|
||||
series:
|
||||
type: object
|
||||
description: Parallel arrays for chart rendering.
|
||||
properties:
|
||||
labels:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
format: date-time
|
||||
running:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
vcpus:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
memory_mb:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
|
||||
Sandbox:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@ -46,6 +46,7 @@ func New(
|
||||
hostSvc := &service.HostService{DB: queries, Redis: rdb, JWT: jwtSecret, Pool: pool}
|
||||
teamSvc := &service.TeamService{DB: queries, Pool: pgPool, HostPool: pool}
|
||||
auditSvc := &service.AuditService{DB: queries}
|
||||
statsSvc := &service.StatsService{DB: queries, Pool: pgPool}
|
||||
|
||||
al := audit.New(queries)
|
||||
|
||||
@ -62,6 +63,7 @@ func New(
|
||||
teamH := newTeamHandler(teamSvc, al)
|
||||
usersH := newUsersHandler(teamSvc)
|
||||
auditH := newAuditHandler(auditSvc)
|
||||
statsH := newStatsHandler(statsSvc)
|
||||
|
||||
// OpenAPI spec and docs.
|
||||
r.Get("/openapi.yaml", serveOpenAPI)
|
||||
@ -109,6 +111,7 @@ func New(
|
||||
r.Use(requireAPIKeyOrJWT(queries, jwtSecret))
|
||||
r.Post("/", sandbox.Create)
|
||||
r.Get("/", sandbox.List)
|
||||
r.Get("/stats", statsH.GetStats)
|
||||
|
||||
r.Route("/{id}", func(r chi.Router) {
|
||||
r.Get("/", sandbox.Get)
|
||||
|
||||
141
internal/db/metrics.sql.go
Normal file
141
internal/db/metrics.sql.go
Normal file
@ -0,0 +1,141 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: metrics.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const getCurrentMetrics = `-- name: GetCurrentMetrics :one
|
||||
SELECT running_count, vcpus_reserved, memory_mb_reserved, sampled_at
|
||||
FROM sandbox_metrics_snapshots
|
||||
WHERE team_id = $1
|
||||
ORDER BY sampled_at DESC
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
type GetCurrentMetricsRow struct {
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VcpusReserved int32 `json:"vcpus_reserved"`
|
||||
MemoryMbReserved int32 `json:"memory_mb_reserved"`
|
||||
SampledAt pgtype.Timestamptz `json:"sampled_at"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetCurrentMetrics(ctx context.Context, teamID string) (GetCurrentMetricsRow, error) {
|
||||
row := q.db.QueryRow(ctx, getCurrentMetrics, teamID)
|
||||
var i GetCurrentMetricsRow
|
||||
err := row.Scan(
|
||||
&i.RunningCount,
|
||||
&i.VcpusReserved,
|
||||
&i.MemoryMbReserved,
|
||||
&i.SampledAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getPeakMetrics = `-- name: GetPeakMetrics :one
|
||||
SELECT
|
||||
COALESCE(MAX(running_count), 0)::INTEGER AS peak_running_count,
|
||||
COALESCE(MAX(vcpus_reserved), 0)::INTEGER AS peak_vcpus,
|
||||
COALESCE(MAX(memory_mb_reserved), 0)::INTEGER AS peak_memory_mb
|
||||
FROM sandbox_metrics_snapshots
|
||||
WHERE team_id = $1
|
||||
AND sampled_at > NOW() - INTERVAL '30 days'
|
||||
`
|
||||
|
||||
type GetPeakMetricsRow struct {
|
||||
PeakRunningCount int32 `json:"peak_running_count"`
|
||||
PeakVcpus int32 `json:"peak_vcpus"`
|
||||
PeakMemoryMb int32 `json:"peak_memory_mb"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetPeakMetrics(ctx context.Context, teamID string) (GetPeakMetricsRow, error) {
|
||||
row := q.db.QueryRow(ctx, getPeakMetrics, teamID)
|
||||
var i GetPeakMetricsRow
|
||||
err := row.Scan(&i.PeakRunningCount, &i.PeakVcpus, &i.PeakMemoryMb)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const insertMetricsSnapshot = `-- name: InsertMetricsSnapshot :exec
|
||||
INSERT INTO sandbox_metrics_snapshots (team_id, running_count, vcpus_reserved, memory_mb_reserved)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
`
|
||||
|
||||
type InsertMetricsSnapshotParams struct {
|
||||
TeamID string `json:"team_id"`
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VcpusReserved int32 `json:"vcpus_reserved"`
|
||||
MemoryMbReserved int32 `json:"memory_mb_reserved"`
|
||||
}
|
||||
|
||||
func (q *Queries) InsertMetricsSnapshot(ctx context.Context, arg InsertMetricsSnapshotParams) error {
|
||||
_, err := q.db.Exec(ctx, insertMetricsSnapshot,
|
||||
arg.TeamID,
|
||||
arg.RunningCount,
|
||||
arg.VcpusReserved,
|
||||
arg.MemoryMbReserved,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const pruneOldMetrics = `-- name: PruneOldMetrics :exec
|
||||
DELETE FROM sandbox_metrics_snapshots
|
||||
WHERE sampled_at < NOW() - INTERVAL '60 days'
|
||||
`
|
||||
|
||||
func (q *Queries) PruneOldMetrics(ctx context.Context) error {
|
||||
_, err := q.db.Exec(ctx, pruneOldMetrics)
|
||||
return err
|
||||
}
|
||||
|
||||
const sampleSandboxMetrics = `-- name: SampleSandboxMetrics :many
|
||||
SELECT
|
||||
team_id,
|
||||
(COUNT(*) FILTER (WHERE status IN ('running', 'starting')))::INTEGER AS running_count,
|
||||
(COALESCE(SUM(vcpus) FILTER (WHERE status IN ('running', 'starting')), 0)
|
||||
+ CEIL(COALESCE(SUM(vcpus) FILTER (WHERE status = 'paused'), 0)::NUMERIC / 2))::INTEGER AS vcpus_reserved,
|
||||
(COALESCE(SUM(memory_mb) FILTER (WHERE status IN ('running', 'starting')), 0)
|
||||
+ CEIL(COALESCE(SUM(memory_mb) FILTER (WHERE status = 'paused'), 0)::NUMERIC / 2))::INTEGER AS memory_mb_reserved
|
||||
FROM sandboxes
|
||||
WHERE status IN ('running', 'starting', 'paused')
|
||||
GROUP BY team_id
|
||||
`
|
||||
|
||||
type SampleSandboxMetricsRow struct {
|
||||
TeamID string `json:"team_id"`
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VcpusReserved int32 `json:"vcpus_reserved"`
|
||||
MemoryMbReserved int32 `json:"memory_mb_reserved"`
|
||||
}
|
||||
|
||||
// Aggregates per-team resource usage from the live sandboxes table.
|
||||
// paused sandboxes count at 50% (ceil) for capacity reservation.
|
||||
func (q *Queries) SampleSandboxMetrics(ctx context.Context) ([]SampleSandboxMetricsRow, error) {
|
||||
rows, err := q.db.Query(ctx, sampleSandboxMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []SampleSandboxMetricsRow
|
||||
for rows.Next() {
|
||||
var i SampleSandboxMetricsRow
|
||||
if err := rows.Scan(
|
||||
&i.TeamID,
|
||||
&i.RunningCount,
|
||||
&i.VcpusReserved,
|
||||
&i.MemoryMbReserved,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
@ -99,6 +99,15 @@ type Sandbox struct {
|
||||
TeamID string `json:"team_id"`
|
||||
}
|
||||
|
||||
type SandboxMetricsSnapshot struct {
|
||||
ID int64 `json:"id"`
|
||||
TeamID string `json:"team_id"`
|
||||
SampledAt pgtype.Timestamptz `json:"sampled_at"`
|
||||
RunningCount int32 `json:"running_count"`
|
||||
VcpusReserved int32 `json:"vcpus_reserved"`
|
||||
MemoryMbReserved int32 `json:"memory_mb_reserved"`
|
||||
}
|
||||
|
||||
type Team struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
||||
157
internal/service/stats.go
Normal file
157
internal/service/stats.go
Normal file
@ -0,0 +1,157 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
)
|
||||
|
||||
// TimeRange identifies a chart time window.
|
||||
type TimeRange string
|
||||
|
||||
const (
|
||||
Range5m TimeRange = "5m"
|
||||
Range1h TimeRange = "1h"
|
||||
Range6h TimeRange = "6h"
|
||||
Range24h TimeRange = "24h"
|
||||
Range30d TimeRange = "30d"
|
||||
)
|
||||
|
||||
type rangeConfig struct {
|
||||
bucketSec int // bucket width in seconds for time-series aggregation
|
||||
intervalLiteral string // PostgreSQL interval literal for the lookback window
|
||||
}
|
||||
|
||||
var rangeConfigs = map[TimeRange]rangeConfig{
|
||||
Range5m: {bucketSec: 3, intervalLiteral: "5 minutes"},
|
||||
Range1h: {bucketSec: 30, intervalLiteral: "1 hour"},
|
||||
Range6h: {bucketSec: 180, intervalLiteral: "6 hours"},
|
||||
Range24h: {bucketSec: 720, intervalLiteral: "24 hours"},
|
||||
Range30d: {bucketSec: 21600, intervalLiteral: "30 days"},
|
||||
}
|
||||
|
||||
// ValidRange returns true if r is a known TimeRange value.
|
||||
func ValidRange(r TimeRange) bool {
|
||||
_, ok := rangeConfigs[r]
|
||||
return ok
|
||||
}
|
||||
|
||||
// StatPoint is one bucketed data point in the time-series.
|
||||
type StatPoint struct {
|
||||
Bucket time.Time
|
||||
RunningCount int32
|
||||
VCPUsReserved int32
|
||||
MemoryMBReserved int32
|
||||
}
|
||||
|
||||
// CurrentStats holds the most recent sampled values for a team.
|
||||
type CurrentStats struct {
|
||||
RunningCount int32
|
||||
VCPUsReserved int32
|
||||
MemoryMBReserved int32
|
||||
SampledAt time.Time
|
||||
}
|
||||
|
||||
// PeakStats holds the 30-day maximum values for a team.
|
||||
type PeakStats struct {
|
||||
RunningCount int32
|
||||
VCPUs int32
|
||||
MemoryMB int32
|
||||
}
|
||||
|
||||
// StatsService computes sandbox metrics for the dashboard.
|
||||
type StatsService struct {
|
||||
DB *db.Queries
|
||||
Pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// GetStats returns current stats, 30-day peaks, and a time-series for the
|
||||
// given team and time range. If no snapshots exist yet, zeros are returned.
|
||||
func (s *StatsService) GetStats(ctx context.Context, teamID string, r TimeRange) (CurrentStats, PeakStats, []StatPoint, error) {
|
||||
cfg, ok := rangeConfigs[r]
|
||||
if !ok {
|
||||
return CurrentStats{}, PeakStats{}, nil, fmt.Errorf("unknown range: %s", r)
|
||||
}
|
||||
|
||||
// Current snapshot.
|
||||
var current CurrentStats
|
||||
cur, err := s.DB.GetCurrentMetrics(ctx, teamID)
|
||||
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return CurrentStats{}, PeakStats{}, nil, fmt.Errorf("get current metrics: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
current = CurrentStats{
|
||||
RunningCount: cur.RunningCount,
|
||||
VCPUsReserved: cur.VcpusReserved,
|
||||
MemoryMBReserved: cur.MemoryMbReserved,
|
||||
SampledAt: cur.SampledAt.Time,
|
||||
}
|
||||
}
|
||||
|
||||
// 30-day peaks.
|
||||
var peaks PeakStats
|
||||
pk, err := s.DB.GetPeakMetrics(ctx, teamID)
|
||||
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return CurrentStats{}, PeakStats{}, nil, fmt.Errorf("get peak metrics: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
peaks = PeakStats{
|
||||
RunningCount: pk.PeakRunningCount,
|
||||
VCPUs: pk.PeakVcpus,
|
||||
MemoryMB: pk.PeakMemoryMb,
|
||||
}
|
||||
}
|
||||
|
||||
// Time-series — dynamic bucket width, executed via pgx directly.
|
||||
series, err := s.queryTimeSeries(ctx, teamID, cfg)
|
||||
if err != nil {
|
||||
return CurrentStats{}, PeakStats{}, nil, fmt.Errorf("get time series: %w", err)
|
||||
}
|
||||
|
||||
return current, peaks, series, nil
|
||||
}
|
||||
|
||||
// timeSeriesSQL uses an epoch-floor trick to bucket rows by an arbitrary
|
||||
// integer number of seconds without requiring TimescaleDB.
|
||||
//
|
||||
// $1 = bucket width in seconds (integer)
|
||||
// $2 = team_id
|
||||
// $3 = lookback interval literal (e.g. '1 hour')
|
||||
const timeSeriesSQL = `
|
||||
SELECT
|
||||
to_timestamp(floor(extract(epoch FROM sampled_at) / $1) * $1) AS bucket,
|
||||
AVG(running_count)::INTEGER AS running_count,
|
||||
AVG(vcpus_reserved)::INTEGER AS vcpus_reserved,
|
||||
AVG(memory_mb_reserved)::INTEGER AS memory_mb_reserved
|
||||
FROM sandbox_metrics_snapshots
|
||||
WHERE team_id = $2
|
||||
AND sampled_at >= NOW() - $3::INTERVAL
|
||||
GROUP BY bucket
|
||||
ORDER BY bucket ASC
|
||||
`
|
||||
|
||||
func (s *StatsService) queryTimeSeries(ctx context.Context, teamID string, cfg rangeConfig) ([]StatPoint, error) {
|
||||
rows, err := s.Pool.Query(ctx, timeSeriesSQL, cfg.bucketSec, teamID, cfg.intervalLiteral)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var points []StatPoint
|
||||
for rows.Next() {
|
||||
var p StatPoint
|
||||
var bucket time.Time
|
||||
if err := rows.Scan(&bucket, &p.RunningCount, &p.VCPUsReserved, &p.MemoryMBReserved); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Bucket = bucket
|
||||
points = append(points, p)
|
||||
}
|
||||
return points, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user