From 84dd15d22b91f790dadc63ce8f462ad62c542af1 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Thu, 9 Apr 2026 17:06:06 +0600 Subject: [PATCH 1/2] feat: add notification channels with provider integrations and retry Implement a channels system for notifying teams via external providers (Discord, Slack, Teams, Google Chat, Telegram, Matrix, webhook) when lifecycle events occur (capsule/template/host state changes). - Channel CRUD API under /v1/channels (JWT-only auth) - Test endpoint to verify config before saving (POST /v1/channels/test) - Secret rotation endpoint (PUT /v1/channels/{id}/config) - AES-256-GCM encryption for provider secrets (WRENN_ENCRYPTION_KEY) - Redis stream event publishing from audit logger - Background dispatcher with consumer group and retry (10s, 30s) - Webhook delivery with HMAC-SHA256 signing (X-WRENN-SIGNATURE) - shoutrrr integration for chat providers - Secrets never exposed in API responses --- .env.example | 4 + cmd/control-plane/main.go | 20 +- db/migrations/20260409103357_add_channels.sql | 19 ++ db/queries/channels.sql | 29 ++ go.mod | 4 + go.sum | 28 ++ internal/api/handlers_channels.go | 235 +++++++++++++++ internal/api/middleware.go | 2 + internal/api/openapi.yaml | 276 +++++++++++++++++ internal/api/server.go | 20 +- internal/audit/logger.go | 92 +++++- internal/channels/crypto.go | 63 ++++ internal/channels/deliver.go | 36 +++ internal/channels/dispatcher.go | 183 ++++++++++++ internal/channels/message.go | 31 ++ internal/channels/publisher.go | 44 +++ internal/channels/service.go | 280 ++++++++++++++++++ internal/channels/shoutrrr.go | 119 ++++++++ internal/channels/webhook.go | 62 ++++ internal/config/config.go | 18 +- internal/db/channels.sql.go | 225 ++++++++++++++ internal/db/models.go | 11 + internal/events/event.go | 73 +++++ internal/id/id.go | 4 + 24 files changed, 1871 insertions(+), 7 deletions(-) create mode 100644 db/migrations/20260409103357_add_channels.sql create mode 100644 db/queries/channels.sql create mode 100644 internal/api/handlers_channels.go create mode 100644 internal/channels/crypto.go create mode 100644 internal/channels/deliver.go create mode 100644 internal/channels/dispatcher.go create mode 100644 internal/channels/message.go create mode 100644 internal/channels/publisher.go create mode 100644 internal/channels/service.go create mode 100644 internal/channels/shoutrrr.go create mode 100644 internal/channels/webhook.go create mode 100644 internal/db/channels.sql.go create mode 100644 internal/events/event.go diff --git a/.env.example b/.env.example index 539650b..f9318cc 100644 --- a/.env.example +++ b/.env.example @@ -35,6 +35,10 @@ JWT_SECRET= WRENN_CA_CERT= WRENN_CA_KEY= +# Channels (notification destinations) +# AES-256-GCM key for encrypting channel secrets. Generate with: openssl rand -hex 32 +WRENN_ENCRYPTION_KEY= + # OAuth OAUTH_GITHUB_CLIENT_ID= OAUTH_GITHUB_CLIENT_SECRET= diff --git a/cmd/control-plane/main.go b/cmd/control-plane/main.go index b66eb13..942469c 100644 --- a/cmd/control-plane/main.go +++ b/cmd/control-plane/main.go @@ -17,6 +17,7 @@ import ( "git.omukk.dev/wrenn/sandbox/internal/audit" "git.omukk.dev/wrenn/sandbox/internal/auth" "git.omukk.dev/wrenn/sandbox/internal/auth/oauth" + "git.omukk.dev/wrenn/sandbox/internal/channels" "git.omukk.dev/wrenn/sandbox/internal/config" "git.omukk.dev/wrenn/sandbox/internal/db" "git.omukk.dev/wrenn/sandbox/internal/lifecycle" @@ -124,15 +125,30 @@ func main() { slog.Info("registered OAuth provider", "provider", "github") } + // Channels: publisher, service, dispatcher. + if len(cfg.EncryptionKeyHex) != 64 { + slog.Error("WRENN_ENCRYPTION_KEY must be a hex-encoded 32-byte key (64 hex chars)") + os.Exit(1) + } + channelPub := channels.NewPublisher(rdb) + channelSvc := &channels.Service{DB: queries, EncKey: cfg.EncryptionKey} + channelDispatcher := channels.NewDispatcher(rdb, queries, cfg.EncryptionKey) + + // Shared audit logger with event publishing. + al := audit.NewWithPublisher(queries, channelPub) + // API server. - srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca) + srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc) // Start template build workers (2 concurrent). stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2) defer stopBuildWorkers() + // Start channel event dispatcher. + channelDispatcher.Start(ctx) + // Start host monitor (passive + active reconciliation every 30s). - monitor := api.NewHostMonitor(queries, hostPool, audit.New(queries), 30*time.Second) + monitor := api.NewHostMonitor(queries, hostPool, al, 30*time.Second) monitor.Start(ctx) // Start metrics sampler (records per-team sandbox stats every 10s). diff --git a/db/migrations/20260409103357_add_channels.sql b/db/migrations/20260409103357_add_channels.sql new file mode 100644 index 0000000..dbe3cc3 --- /dev/null +++ b/db/migrations/20260409103357_add_channels.sql @@ -0,0 +1,19 @@ +-- +goose Up + +CREATE TABLE channels ( + id UUID PRIMARY KEY, + team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE, + name TEXT NOT NULL, + provider TEXT NOT NULL, + config JSONB NOT NULL DEFAULT '{}', + event_types TEXT[] NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (team_id, name) +); + +CREATE INDEX idx_channels_team ON channels(team_id); + +-- +goose Down + +DROP TABLE IF EXISTS channels; diff --git a/db/queries/channels.sql b/db/queries/channels.sql new file mode 100644 index 0000000..5772c99 --- /dev/null +++ b/db/queries/channels.sql @@ -0,0 +1,29 @@ +-- name: InsertChannel :one +INSERT INTO channels (id, team_id, name, provider, config, event_types) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING *; + +-- name: ListChannelsByTeam :many +SELECT * FROM channels WHERE team_id = $1 ORDER BY created_at DESC; + +-- name: GetChannelByTeam :one +SELECT * FROM channels WHERE id = $1 AND team_id = $2; + +-- name: UpdateChannel :one +UPDATE channels SET name = $3, event_types = $4, updated_at = NOW() +WHERE id = $1 AND team_id = $2 +RETURNING *; + +-- name: UpdateChannelConfig :one +UPDATE channels SET config = $3, updated_at = NOW() +WHERE id = $1 AND team_id = $2 +RETURNING *; + +-- name: DeleteChannelByTeam :exec +DELETE FROM channels WHERE id = $1 AND team_id = $2; + +-- name: ListChannelsForEvent :many +SELECT * FROM channels +WHERE team_id = $1 + AND sqlc.arg(event_type)::text = ANY(event_types) +ORDER BY created_at; diff --git a/go.mod b/go.mod index 8698f0c..ffb70c2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.8 require ( connectrpc.com/connect v1.19.1 + github.com/containrrr/shoutrrr v0.8.0 github.com/go-chi/chi/v5 v5.2.5 github.com/golang-jwt/jwt/v5 v5.3.1 github.com/google/uuid v1.6.0 @@ -22,9 +23,12 @@ require ( require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fatih/color v1.15.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/text v0.35.0 // indirect diff --git a/go.sum b/go.sum index 752cbd6..27ca5de 100644 --- a/go.sum +++ b/go.sum @@ -6,17 +6,29 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJUSec= +github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= +github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= @@ -29,10 +41,21 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= +github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/onsi/ginkgo/v2 v2.9.2 h1:BA2GMJOtfGAfagzYtrAlufIP0lq6QERkFmHLMLPwFSU= +github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= @@ -53,16 +76,21 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/api/handlers_channels.go b/internal/api/handlers_channels.go new file mode 100644 index 0000000..3a13998 --- /dev/null +++ b/internal/api/handlers_channels.go @@ -0,0 +1,235 @@ +package api + +import ( + "errors" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/jackc/pgx/v5" + + "git.omukk.dev/wrenn/sandbox/internal/auth" + "git.omukk.dev/wrenn/sandbox/internal/channels" + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/internal/id" +) + +type channelHandler struct { + svc *channels.Service +} + +func newChannelHandler(svc *channels.Service) *channelHandler { + return &channelHandler{svc: svc} +} + +type createChannelRequest struct { + Name string `json:"name"` + Provider string `json:"provider"` + Config map[string]string `json:"config"` + Events []string `json:"events"` +} + +type updateChannelRequest struct { + Name string `json:"name"` + Events []string `json:"events"` +} + +type rotateConfigRequest struct { + Config map[string]string `json:"config"` +} + +type testChannelRequest struct { + Provider string `json:"provider"` + Config map[string]string `json:"config"` +} + +type channelResponse struct { + ID string `json:"id"` + TeamID string `json:"team_id"` + Name string `json:"name"` + Provider string `json:"provider"` + Events []string `json:"events"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + Secret *string `json:"secret,omitempty"` +} + +func channelToResponse(ch db.Channel) channelResponse { + resp := channelResponse{ + ID: id.FormatChannelID(ch.ID), + TeamID: id.FormatTeamID(ch.TeamID), + Name: ch.Name, + Provider: ch.Provider, + Events: ch.EventTypes, + } + if ch.CreatedAt.Valid { + resp.CreatedAt = ch.CreatedAt.Time.Format(time.RFC3339) + } + if ch.UpdatedAt.Valid { + resp.UpdatedAt = ch.UpdatedAt.Time.Format(time.RFC3339) + } + return resp +} + +// Create handles POST /v1/channels. +func (h *channelHandler) Create(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + + var req createChannelRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + result, err := h.svc.Create(r.Context(), channels.CreateParams{ + TeamID: ac.TeamID, + Name: req.Name, + Provider: req.Provider, + Config: req.Config, + Events: req.Events, + }) + if err != nil { + status, code, msg := serviceErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + resp := channelToResponse(result.Channel) + if result.PlaintextSecret != "" { + resp.Secret = &result.PlaintextSecret + } + + writeJSON(w, http.StatusCreated, resp) +} + +// List handles GET /v1/channels. +func (h *channelHandler) List(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + + chs, err := h.svc.List(r.Context(), ac.TeamID) + if err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to list channels") + return + } + + resp := make([]channelResponse, len(chs)) + for i, ch := range chs { + resp[i] = channelToResponse(ch) + } + + writeJSON(w, http.StatusOK, resp) +} + +// Get handles GET /v1/channels/{id}. +func (h *channelHandler) Get(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + channelIDStr := chi.URLParam(r, "id") + + channelID, err := id.ParseChannelID(channelIDStr) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID") + return + } + + ch, err := h.svc.Get(r.Context(), channelID, ac.TeamID) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + writeError(w, http.StatusNotFound, "not_found", "channel not found") + } else { + writeError(w, http.StatusInternalServerError, "db_error", "failed to get channel") + } + return + } + + writeJSON(w, http.StatusOK, channelToResponse(ch)) +} + +// Update handles PATCH /v1/channels/{id}. +func (h *channelHandler) Update(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + channelIDStr := chi.URLParam(r, "id") + + channelID, err := id.ParseChannelID(channelIDStr) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID") + return + } + + var req updateChannelRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + ch, err := h.svc.Update(r.Context(), channelID, ac.TeamID, req.Name, req.Events) + if err != nil { + status, code, msg := serviceErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + writeJSON(w, http.StatusOK, channelToResponse(ch)) +} + +// Test handles POST /v1/channels/test. +func (h *channelHandler) Test(w http.ResponseWriter, r *http.Request) { + var req testChannelRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + if err := h.svc.Test(r.Context(), req.Provider, req.Config); err != nil { + status, code, msg := serviceErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// RotateConfig handles PUT /v1/channels/{id}/config. +func (h *channelHandler) RotateConfig(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + channelIDStr := chi.URLParam(r, "id") + + channelID, err := id.ParseChannelID(channelIDStr) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID") + return + } + + var req rotateConfigRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body") + return + } + + ch, err := h.svc.RotateConfig(r.Context(), channelID, ac.TeamID, req.Config) + if err != nil { + status, code, msg := serviceErrToHTTP(err) + writeError(w, status, code, msg) + return + } + + writeJSON(w, http.StatusOK, channelToResponse(ch)) +} + +// Delete handles DELETE /v1/channels/{id}. +func (h *channelHandler) Delete(w http.ResponseWriter, r *http.Request) { + ac := auth.MustFromContext(r.Context()) + channelIDStr := chi.URLParam(r, "id") + + channelID, err := id.ParseChannelID(channelIDStr) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID") + return + } + + if err := h.svc.Delete(r.Context(), channelID, ac.TeamID); err != nil { + writeError(w, http.StatusInternalServerError, "db_error", "failed to delete channel") + return + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/api/middleware.go b/internal/api/middleware.go index 5c9d8cd..2fdb6bc 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -95,6 +95,8 @@ func serviceErrToHTTP(err error) (int, string, string) { return http.StatusNotFound, "not_found", msg case strings.Contains(msg, "not running"), strings.Contains(msg, "not paused"): return http.StatusConflict, "invalid_state", msg + case strings.Contains(msg, "conflict:"): + return http.StatusConflict, "conflict", msg case strings.Contains(msg, "forbidden"): return http.StatusForbidden, "forbidden", msg case strings.Contains(msg, "invalid or expired"): diff --git a/internal/api/openapi.yaml b/internal/api/openapi.yaml index f5ae7a7..0b4fe74 100644 --- a/internal/api/openapi.yaml +++ b/internal/api/openapi.yaml @@ -1547,6 +1547,176 @@ paths: schema: $ref: "#/components/schemas/Error" + /v1/channels: + post: + summary: Create a notification channel + operationId: createChannel + tags: [channels] + security: + - bearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/CreateChannelRequest" + responses: + "201": + description: Channel created + content: + application/json: + schema: + $ref: "#/components/schemas/ChannelResponse" + "400": + $ref: "#/components/responses/BadRequest" + get: + summary: List notification channels + operationId: listChannels + tags: [channels] + security: + - bearerAuth: [] + responses: + "200": + description: Channels list + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/ChannelResponse" + + /v1/channels/test: + post: + summary: Test a channel configuration + description: > + Sends a test notification using the provided provider and config without + saving anything. Use this to verify credentials before creating a channel. + operationId: testChannel + tags: [channels] + security: + - bearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/TestChannelRequest" + responses: + "200": + description: Test notification sent successfully + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: ok + "400": + $ref: "#/components/responses/BadRequest" + + /v1/channels/{id}: + parameters: + - name: id + in: path + required: true + schema: + type: string + get: + summary: Get a notification channel + operationId: getChannel + tags: [channels] + security: + - bearerAuth: [] + responses: + "200": + description: Channel details + content: + application/json: + schema: + $ref: "#/components/schemas/ChannelResponse" + "404": + description: Channel not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + patch: + summary: Update a notification channel + operationId: updateChannel + tags: [channels] + security: + - bearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/UpdateChannelRequest" + responses: + "200": + description: Channel updated + content: + application/json: + schema: + $ref: "#/components/schemas/ChannelResponse" + "400": + $ref: "#/components/responses/BadRequest" + "404": + description: Channel not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + delete: + summary: Delete a notification channel + operationId: deleteChannel + tags: [channels] + security: + - bearerAuth: [] + responses: + "204": + description: Channel deleted + + /v1/channels/{id}/config: + parameters: + - name: id + in: path + required: true + schema: + type: string + put: + summary: Rotate channel secrets + description: > + Replaces the channel's provider configuration entirely with new secrets. + The previous config is discarded. Config fields must match the provider's + required fields. + operationId: rotateChannelConfig + tags: [channels] + security: + - bearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/RotateConfigRequest" + responses: + "200": + description: Config rotated + content: + application/json: + schema: + $ref: "#/components/schemas/ChannelResponse" + "400": + $ref: "#/components/responses/BadRequest" + "404": + description: Channel not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + components: securitySchemes: apiKeyAuth: @@ -2067,6 +2237,112 @@ components: format: int64 description: "Allocated disk bytes for the CoW sparse file" + CreateChannelRequest: + type: object + required: [name, provider, config, events] + properties: + name: + type: string + description: Unique channel name within the team. + provider: + type: string + enum: [discord, slack, teams, googlechat, telegram, matrix, webhook] + config: + type: object + additionalProperties: + type: string + description: > + Provider-specific configuration fields. + Discord/Slack/Teams/Google Chat: {"webhook_url": "..."}. + Telegram: {"bot_token": "...", "chat_id": "..."}. + Matrix: {"homeserver_url": "...", "access_token": "...", "room_id": "..."}. + Webhook: {"url": "...", "secret": "..."} (secret is auto-generated if omitted). + events: + type: array + items: + type: string + enum: + - capsule.created + - capsule.running + - capsule.paused + - capsule.destroyed + - template.snapshot.created + - template.snapshot.deleted + - host.up + - host.down + + TestChannelRequest: + type: object + required: [provider, config] + properties: + provider: + type: string + enum: [discord, slack, teams, googlechat, telegram, matrix, webhook] + config: + type: object + additionalProperties: + type: string + description: Provider-specific configuration fields (same as CreateChannelRequest.config). + + RotateConfigRequest: + type: object + required: [config] + properties: + config: + type: object + additionalProperties: + type: string + description: > + New provider configuration fields. Must include all required fields + for the channel's provider. Replaces the existing config entirely. + + UpdateChannelRequest: + type: object + required: [name, events] + properties: + name: + type: string + events: + type: array + items: + type: string + enum: + - capsule.created + - capsule.running + - capsule.paused + - capsule.destroyed + - template.snapshot.created + - template.snapshot.deleted + - host.up + - host.down + + ChannelResponse: + type: object + properties: + id: + type: string + team_id: + type: string + name: + type: string + provider: + type: string + enum: [discord, slack, teams, googlechat, telegram, matrix, webhook] + events: + type: array + items: + type: string + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + secret: + type: string + nullable: true + description: Webhook secret. Only returned on creation, never again. + Error: type: object properties: diff --git a/internal/api/server.go b/internal/api/server.go index d3b5c37..d306d58 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -12,6 +12,7 @@ import ( "git.omukk.dev/wrenn/sandbox/internal/audit" "git.omukk.dev/wrenn/sandbox/internal/auth" "git.omukk.dev/wrenn/sandbox/internal/auth/oauth" + "git.omukk.dev/wrenn/sandbox/internal/channels" "git.omukk.dev/wrenn/sandbox/internal/db" "git.omukk.dev/wrenn/sandbox/internal/lifecycle" "git.omukk.dev/wrenn/sandbox/internal/scheduler" @@ -38,6 +39,8 @@ func New( oauthRegistry *oauth.Registry, oauthRedirectURL string, ca *auth.CA, + al *audit.AuditLogger, + channelSvc *channels.Service, ) *Server { r := chi.NewRouter() r.Use(requestLogger()) @@ -52,8 +55,6 @@ func New( statsSvc := &service.StatsService{DB: queries, Pool: pgPool} buildSvc := &service.BuildService{DB: queries, Redis: rdb, Pool: pool, Scheduler: sched} - al := audit.New(queries) - sandbox := newSandboxHandler(sandboxSvc, al) exec := newExecHandler(queries, pool) execStream := newExecStreamHandler(queries, pool) @@ -70,6 +71,7 @@ func New( statsH := newStatsHandler(statsSvc) metricsH := newSandboxMetricsHandler(queries, pool) buildH := newBuildHandler(buildSvc, queries, pool) + channelH := newChannelHandler(channelSvc) // OpenAPI spec and docs. r.Get("/openapi.yaml", serveOpenAPI) @@ -171,6 +173,20 @@ func New( }) }) + // JWT-authenticated: notification channels. + r.Route("/v1/channels", func(r chi.Router) { + r.Use(requireJWT(jwtSecret)) + r.Post("/", channelH.Create) + r.Get("/", channelH.List) + r.Post("/test", channelH.Test) + r.Route("/{id}", func(r chi.Router) { + r.Get("/", channelH.Get) + r.Patch("/", channelH.Update) + r.Delete("/", channelH.Delete) + r.Put("/config", channelH.RotateConfig) + }) + }) + // JWT-authenticated: audit log. r.With(requireJWT(jwtSecret)).Get("/v1/audit-logs", auditH.List) diff --git a/internal/audit/logger.go b/internal/audit/logger.go index 72d4c4f..281aca0 100644 --- a/internal/audit/logger.go +++ b/internal/audit/logger.go @@ -9,6 +9,7 @@ import ( "git.omukk.dev/wrenn/sandbox/internal/auth" "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/internal/events" "git.omukk.dev/wrenn/sandbox/internal/id" ) @@ -16,14 +17,38 @@ import ( // All methods are fire-and-forget: failures are logged via slog and never // propagated to the caller. type AuditLogger struct { - db *db.Queries + db *db.Queries + pub events.EventPublisher // optional — nil disables event publishing } -// New constructs an AuditLogger. +// New constructs an AuditLogger without event publishing. func New(queries *db.Queries) *AuditLogger { return &AuditLogger{db: queries} } +// NewWithPublisher constructs an AuditLogger that also publishes channel events. +func NewWithPublisher(queries *db.Queries, pub events.EventPublisher) *AuditLogger { + return &AuditLogger{db: queries, pub: pub} +} + +// publish sends an event to the notification stream if a publisher is configured. +func (l *AuditLogger) publish(ctx context.Context, e events.Event) { + if l.pub != nil { + l.pub.Publish(ctx, e) + } +} + +// actorToEvent converts auth context fields to an events.Actor. +func actorToEvent(ac auth.AuthContext) events.Actor { + at, aid, aname := actorFields(ac) + return events.Actor{Type: events.ActorKind(at), ID: aid, Name: aname} +} + +// systemActor returns an events.Actor for system-initiated events. +func systemActor() events.Actor { + return events.Actor{Type: events.ActorSystem} +} + // actorFields extracts actor_type, actor_id, and actor_name from an AuthContext. // actor_id is stored as a prefixed string in the TEXT column. func actorFields(ac auth.AuthContext) (actorType, actorID, actorName string) { @@ -82,6 +107,13 @@ func (l *AuditLogger) LogSandboxCreate(ctx context.Context, ac auth.AuthContext, Status: "success", Metadata: marshalMeta(map[string]any{"template": template}), }) + l.publish(ctx, events.Event{ + Event: events.CapsuleCreated, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"}, + }) } func (l *AuditLogger) LogSandboxPause(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) { @@ -99,6 +131,13 @@ func (l *AuditLogger) LogSandboxPause(ctx context.Context, ac auth.AuthContext, Status: "success", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.CapsulePaused, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"}, + }) } // LogSandboxAutoPause records a system-initiated auto-pause (TTL or host reconciler). @@ -116,6 +155,13 @@ func (l *AuditLogger) LogSandboxAutoPause(ctx context.Context, teamID, sandboxID Status: "info", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.CapsulePaused, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(teamID), + Actor: systemActor(), + Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"}, + }) } func (l *AuditLogger) LogSandboxResume(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) { @@ -133,6 +179,13 @@ func (l *AuditLogger) LogSandboxResume(ctx context.Context, ac auth.AuthContext, Status: "success", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.CapsuleRunning, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"}, + }) } func (l *AuditLogger) LogSandboxDestroy(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) { @@ -150,6 +203,13 @@ func (l *AuditLogger) LogSandboxDestroy(ctx context.Context, ac auth.AuthContext Status: "warning", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.CapsuleDestroyed, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"}, + }) } // --- Snapshot events (scope: team) --- @@ -169,6 +229,13 @@ func (l *AuditLogger) LogSnapshotCreate(ctx context.Context, ac auth.AuthContext Status: "success", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.SnapshotCreated, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: name, Type: "snapshot"}, + }) } func (l *AuditLogger) LogSnapshotDelete(ctx context.Context, ac auth.AuthContext, name string) { @@ -186,6 +253,13 @@ func (l *AuditLogger) LogSnapshotDelete(ctx context.Context, ac auth.AuthContext Status: "warning", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.SnapshotDeleted, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(ac.TeamID), + Actor: actorToEvent(ac), + Resource: events.Resource{ID: name, Type: "snapshot"}, + }) } // --- Team events (scope: team) --- @@ -387,6 +461,13 @@ func (l *AuditLogger) LogHostMarkedDown(ctx context.Context, teamID, hostID pgty Status: "error", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.HostDown, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(teamID), + Actor: systemActor(), + Resource: events.Resource{ID: id.FormatHostID(hostID), Type: "host"}, + }) } // LogHostMarkedUp records a system-initiated host status transition back to online. @@ -408,4 +489,11 @@ func (l *AuditLogger) LogHostMarkedUp(ctx context.Context, teamID, hostID pgtype Status: "success", Metadata: []byte("{}"), }) + l.publish(ctx, events.Event{ + Event: events.HostUp, + Timestamp: events.Now(), + TeamID: id.FormatTeamID(teamID), + Actor: systemActor(), + Resource: events.Resource{ID: id.FormatHostID(hostID), Type: "host"}, + }) } diff --git a/internal/channels/crypto.go b/internal/channels/crypto.go new file mode 100644 index 0000000..c0c1926 --- /dev/null +++ b/internal/channels/crypto.go @@ -0,0 +1,63 @@ +package channels + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "fmt" + "io" +) + +// EncryptSecret encrypts plaintext using AES-256-GCM with a random nonce. +// Returns base64(nonce || ciphertext). +func EncryptSecret(key [32]byte, plaintext string) (string, error) { + block, err := aes.NewCipher(key[:]) + if err != nil { + return "", fmt.Errorf("aes cipher: %w", err) + } + + gcm, err := cipher.NewGCM(block) + if err != nil { + return "", fmt.Errorf("gcm: %w", err) + } + + nonce := make([]byte, gcm.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return "", fmt.Errorf("nonce: %w", err) + } + + ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil) + return base64.StdEncoding.EncodeToString(ciphertext), nil +} + +// DecryptSecret decrypts a value produced by EncryptSecret. +func DecryptSecret(key [32]byte, encoded string) (string, error) { + data, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return "", fmt.Errorf("base64 decode: %w", err) + } + + block, err := aes.NewCipher(key[:]) + if err != nil { + return "", fmt.Errorf("aes cipher: %w", err) + } + + gcm, err := cipher.NewGCM(block) + if err != nil { + return "", fmt.Errorf("gcm: %w", err) + } + + nonceSize := gcm.NonceSize() + if len(data) < nonceSize { + return "", fmt.Errorf("ciphertext too short") + } + + nonce, ciphertext := data[:nonceSize], data[nonceSize:] + plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) + if err != nil { + return "", fmt.Errorf("decrypt: %w", err) + } + + return string(plaintext), nil +} diff --git a/internal/channels/deliver.go b/internal/channels/deliver.go new file mode 100644 index 0000000..1bd8fd6 --- /dev/null +++ b/internal/channels/deliver.go @@ -0,0 +1,36 @@ +package channels + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/containrrr/shoutrrr" + + "git.omukk.dev/wrenn/sandbox/internal/events" +) + +// Deliver sends a notification to a single provider with the given config. +// For webhooks it uses HMAC-signed HTTP POST; for all others it uses shoutrrr. +func Deliver(ctx context.Context, provider string, config map[string]string, e events.Event) error { + payload, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + + if provider == "webhook" { + wh := NewWebhookDelivery() + return wh.Deliver(ctx, config["url"], config["secret"], payload) + } + + shoutrrrURL, err := ShoutrrrURL(provider, config) + if err != nil { + return fmt.Errorf("build shoutrrr URL: %w", err) + } + + msg := FormatMessage(e) + if err := shoutrrr.Send(shoutrrrURL, msg); err != nil { + return fmt.Errorf("shoutrrr send: %w", err) + } + return nil +} diff --git a/internal/channels/dispatcher.go b/internal/channels/dispatcher.go new file mode 100644 index 0000000..75bed1b --- /dev/null +++ b/internal/channels/dispatcher.go @@ -0,0 +1,183 @@ +package channels + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "github.com/redis/go-redis/v9" + + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/internal/events" + "git.omukk.dev/wrenn/sandbox/internal/id" +) + +const ( + groupName = "wrenn-channels-v1" + consumerName = "cp-0" +) + +// Dispatcher consumes events from the Redis stream and delivers them +// to matching notification channels. +type Dispatcher struct { + rdb *redis.Client + db *db.Queries + encKey [32]byte + webhook *WebhookDelivery +} + +// NewDispatcher constructs an event dispatcher. +func NewDispatcher(rdb *redis.Client, queries *db.Queries, encKey [32]byte) *Dispatcher { + return &Dispatcher{ + rdb: rdb, + db: queries, + encKey: encKey, + webhook: NewWebhookDelivery(), + } +} + +// Start launches the consumer goroutine. Returns when ctx is cancelled. +func (d *Dispatcher) Start(ctx context.Context) { + go d.run(ctx) +} + +func (d *Dispatcher) run(ctx context.Context) { + // Create consumer group idempotently. "$" means only new messages. + err := d.rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "$").Err() + if err != nil && !isGroupExistsError(err) { + slog.Error("channels: failed to create consumer group", "error", err) + return + } + + for { + select { + case <-ctx.Done(): + return + default: + } + + streams, err := d.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamKey, ">"}, + Count: 10, + Block: 5 * time.Second, + }).Result() + + if err != nil { + if err == redis.Nil || ctx.Err() != nil { + continue + } + slog.Warn("channels: xreadgroup error", "error", err) + time.Sleep(1 * time.Second) + continue + } + + for _, stream := range streams { + for _, msg := range stream.Messages { + d.handleMessage(ctx, msg) + } + } + } +} + +func (d *Dispatcher) handleMessage(ctx context.Context, msg redis.XMessage) { + defer func() { + if err := d.rdb.XAck(ctx, streamKey, groupName, msg.ID).Err(); err != nil { + slog.Warn("channels: xack failed", "id", msg.ID, "error", err) + } + }() + + payload, ok := msg.Values["payload"].(string) + if !ok { + slog.Warn("channels: message missing payload", "id", msg.ID) + return + } + + var event events.Event + if err := json.Unmarshal([]byte(payload), &event); err != nil { + slog.Warn("channels: failed to unmarshal event", "id", msg.ID, "error", err) + return + } + + teamID, err := id.ParseTeamID(event.TeamID) + if err != nil { + slog.Warn("channels: invalid team ID in event", "team_id", event.TeamID, "error", err) + return + } + + channels, err := d.db.ListChannelsForEvent(ctx, db.ListChannelsForEventParams{ + TeamID: teamID, + EventType: event.Event, + }) + if err != nil { + slog.Warn("channels: failed to list channels for event", "event", event.Event, "error", err) + return + } + + for _, ch := range channels { + d.dispatch(ctx, ch, event) + } +} + +// retryDelays defines the wait durations before each retry attempt. +var retryDelays = []time.Duration{10 * time.Second, 30 * time.Second} + +func (d *Dispatcher) dispatch(ctx context.Context, ch db.Channel, e events.Event) { + config, err := d.decryptConfig(ch.Config) + if err != nil { + slog.Warn("channels: failed to decrypt config", + "channel_id", id.FormatChannelID(ch.ID), "error", err) + return + } + + chID := id.FormatChannelID(ch.ID) + + if err := Deliver(ctx, ch.Provider, config, e); err != nil { + slog.Warn("channels: delivery failed, scheduling retries", + "channel_id", chID, "provider", ch.Provider, "error", err) + go d.retryDeliver(ctx, ch.Provider, config, e, chID) + } +} + +func (d *Dispatcher) retryDeliver(ctx context.Context, provider string, config map[string]string, e events.Event, chID string) { + for i, delay := range retryDelays { + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + + if err := Deliver(ctx, provider, config, e); err != nil { + slog.Warn("channels: retry delivery failed", + "channel_id", chID, "provider", provider, + "attempt", i+2, "error", err) + continue + } + return + } + slog.Error("channels: delivery failed after all retries", + "channel_id", chID, "provider", provider, "event", e.Event) +} + +func (d *Dispatcher) decryptConfig(configJSON []byte) (map[string]string, error) { + var encrypted map[string]string + if err := json.Unmarshal(configJSON, &encrypted); err != nil { + return nil, err + } + + decrypted := make(map[string]string, len(encrypted)) + for k, v := range encrypted { + plaintext, err := DecryptSecret(d.encKey, v) + if err != nil { + return nil, err + } + decrypted[k] = plaintext + } + return decrypted, nil +} + +func isGroupExistsError(err error) bool { + return err != nil && err.Error() == "BUSYGROUP Consumer Group name already exists" +} diff --git a/internal/channels/message.go b/internal/channels/message.go new file mode 100644 index 0000000..9435260 --- /dev/null +++ b/internal/channels/message.go @@ -0,0 +1,31 @@ +package channels + +import ( + "fmt" + + "git.omukk.dev/wrenn/sandbox/internal/events" +) + +// FormatMessage produces a compact notification string for chat providers. +func FormatMessage(e events.Event) string { + switch e.Event { + case events.CapsuleCreated: + return fmt.Sprintf("[%s] Capsule %s created", e.Event, e.Resource.ID) + case events.CapsuleRunning: + return fmt.Sprintf("[%s] Capsule %s is running", e.Event, e.Resource.ID) + case events.CapsulePaused: + return fmt.Sprintf("[%s] Capsule %s paused", e.Event, e.Resource.ID) + case events.CapsuleDestroyed: + return fmt.Sprintf("[%s] Capsule %s destroyed", e.Event, e.Resource.ID) + case events.SnapshotCreated: + return fmt.Sprintf("[%s] Template snapshot %s created", e.Event, e.Resource.ID) + case events.SnapshotDeleted: + return fmt.Sprintf("[%s] Template snapshot %s deleted", e.Event, e.Resource.ID) + case events.HostUp: + return fmt.Sprintf("[%s] Host %s is up", e.Event, e.Resource.ID) + case events.HostDown: + return fmt.Sprintf("[%s] Host %s is down", e.Event, e.Resource.ID) + default: + return fmt.Sprintf("[%s] %s %s", e.Event, e.Resource.Type, e.Resource.ID) + } +} diff --git a/internal/channels/publisher.go b/internal/channels/publisher.go new file mode 100644 index 0000000..be358f2 --- /dev/null +++ b/internal/channels/publisher.go @@ -0,0 +1,44 @@ +package channels + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/redis/go-redis/v9" + + "git.omukk.dev/wrenn/sandbox/internal/events" +) + +const streamKey = "wrenn:events" + +// Publisher pushes events onto the Redis stream for the dispatcher to consume. +type Publisher struct { + rdb *redis.Client +} + +// NewPublisher constructs an event publisher. +func NewPublisher(rdb *redis.Client) *Publisher { + return &Publisher{rdb: rdb} +} + +// Publish serializes the event and appends it to the global stream. +// Fire-and-forget: failures are logged, never propagated. +func (p *Publisher) Publish(ctx context.Context, e events.Event) { + payload, err := json.Marshal(e) + if err != nil { + slog.Warn("channels: failed to marshal event", "event", e.Event, "error", err) + return + } + + if err := p.rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + MaxLen: 10000, + Approx: true, + Values: map[string]interface{}{ + "payload": string(payload), + }, + }).Err(); err != nil { + slog.Warn("channels: failed to publish event", "event", e.Event, "error", err) + } +} diff --git a/internal/channels/service.go b/internal/channels/service.go new file mode 100644 index 0000000..ba7b5ed --- /dev/null +++ b/internal/channels/service.go @@ -0,0 +1,280 @@ +package channels + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgtype" + + "git.omukk.dev/wrenn/sandbox/internal/db" + "git.omukk.dev/wrenn/sandbox/internal/events" + "git.omukk.dev/wrenn/sandbox/internal/id" +) + +// Valid providers. +var validProviders = map[string]bool{ + "discord": true, + "slack": true, + "teams": true, + "googlechat": true, + "telegram": true, + "matrix": true, + "webhook": true, +} + +// Required config fields per provider. +var requiredFields = map[string][]string{ + "discord": {"webhook_url"}, + "slack": {"webhook_url"}, + "teams": {"webhook_url"}, + "googlechat": {"webhook_url"}, + "telegram": {"bot_token", "chat_id"}, + "matrix": {"homeserver_url", "access_token", "room_id"}, + "webhook": {"url"}, +} + +// validEvents maps event type strings to true for validation. +var validEvents map[string]bool + +func init() { + validEvents = make(map[string]bool, len(events.AllEventTypes)) + for _, et := range events.AllEventTypes { + validEvents[et] = true + } +} + +// Service handles channel CRUD operations. +type Service struct { + DB *db.Queries + EncKey [32]byte +} + +// CreateParams holds the parameters for creating a channel. +type CreateParams struct { + TeamID pgtype.UUID + Name string + Provider string + Config map[string]string + Events []string +} + +// CreateResult holds the result of creating a channel. +type CreateResult struct { + Channel db.Channel + PlaintextSecret string // non-empty only for webhook provider +} + +// Create creates a new notification channel. +func (s *Service) Create(ctx context.Context, p CreateParams) (CreateResult, error) { + if p.Name == "" { + return CreateResult{}, fmt.Errorf("invalid: channel name is required") + } + + if !validProviders[p.Provider] { + return CreateResult{}, fmt.Errorf("invalid: unsupported provider %q", p.Provider) + } + + if len(p.Events) == 0 { + return CreateResult{}, fmt.Errorf("invalid: at least one event type is required") + } + for _, et := range p.Events { + if !validEvents[et] { + return CreateResult{}, fmt.Errorf("invalid: unknown event type %q", et) + } + } + + // Validate required config fields. + for _, field := range requiredFields[p.Provider] { + if p.Config[field] == "" { + return CreateResult{}, fmt.Errorf("invalid: %s is required for %s", field, p.Provider) + } + } + + // For webhooks, auto-generate secret if not provided. + var plaintextSecret string + if p.Provider == "webhook" { + if p.Config["secret"] == "" { + secret := generateSecret() + p.Config["secret"] = secret + plaintextSecret = secret + } else { + plaintextSecret = p.Config["secret"] + } + } + + // Encrypt config fields. + encrypted := make(map[string]string, len(p.Config)) + for k, v := range p.Config { + enc, err := EncryptSecret(s.EncKey, v) + if err != nil { + return CreateResult{}, fmt.Errorf("encrypt config field %s: %w", k, err) + } + encrypted[k] = enc + } + + configJSON, err := json.Marshal(encrypted) + if err != nil { + return CreateResult{}, fmt.Errorf("marshal config: %w", err) + } + + ch, err := s.DB.InsertChannel(ctx, db.InsertChannelParams{ + ID: id.NewChannelID(), + TeamID: p.TeamID, + Name: p.Name, + Provider: p.Provider, + Config: configJSON, + EventTypes: p.Events, + }) + if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" { + return CreateResult{}, fmt.Errorf("conflict: channel name %q already exists", p.Name) + } + return CreateResult{}, fmt.Errorf("insert channel: %w", err) + } + + return CreateResult{Channel: ch, PlaintextSecret: plaintextSecret}, nil +} + +// List returns all channels belonging to the given team. +func (s *Service) List(ctx context.Context, teamID pgtype.UUID) ([]db.Channel, error) { + return s.DB.ListChannelsByTeam(ctx, teamID) +} + +// Get returns a single channel by ID, scoped to the given team. +func (s *Service) Get(ctx context.Context, channelID, teamID pgtype.UUID) (db.Channel, error) { + return s.DB.GetChannelByTeam(ctx, db.GetChannelByTeamParams{ID: channelID, TeamID: teamID}) +} + +// Update updates a channel's name and event types. +func (s *Service) Update(ctx context.Context, channelID, teamID pgtype.UUID, name string, eventTypes []string) (db.Channel, error) { + if name == "" { + return db.Channel{}, fmt.Errorf("invalid: channel name is required") + } + + if len(eventTypes) == 0 { + return db.Channel{}, fmt.Errorf("invalid: at least one event type is required") + } + for _, et := range eventTypes { + if !validEvents[et] { + return db.Channel{}, fmt.Errorf("invalid: unknown event type %q", et) + } + } + + ch, err := s.DB.UpdateChannel(ctx, db.UpdateChannelParams{ + ID: channelID, + TeamID: teamID, + Name: name, + EventTypes: eventTypes, + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return db.Channel{}, fmt.Errorf("channel not found") + } + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" { + return db.Channel{}, fmt.Errorf("conflict: channel name %q already exists", name) + } + return db.Channel{}, fmt.Errorf("update channel: %w", err) + } + return ch, nil +} + +// RotateConfig replaces a channel's config with new provider secrets. +func (s *Service) RotateConfig(ctx context.Context, channelID, teamID pgtype.UUID, config map[string]string) (db.Channel, error) { + // Look up the existing channel to get its provider for validation. + ch, err := s.DB.GetChannelByTeam(ctx, db.GetChannelByTeamParams{ID: channelID, TeamID: teamID}) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return db.Channel{}, fmt.Errorf("channel not found") + } + return db.Channel{}, fmt.Errorf("get channel: %w", err) + } + + // Validate required config fields for this provider. + for _, field := range requiredFields[ch.Provider] { + if config[field] == "" { + return db.Channel{}, fmt.Errorf("invalid: %s is required for %s", field, ch.Provider) + } + } + + // For webhooks, auto-generate secret if not provided. + if ch.Provider == "webhook" && config["secret"] == "" { + config["secret"] = generateSecret() + } + + // Encrypt all config fields. + encrypted := make(map[string]string, len(config)) + for k, v := range config { + enc, err := EncryptSecret(s.EncKey, v) + if err != nil { + return db.Channel{}, fmt.Errorf("encrypt config field %s: %w", k, err) + } + encrypted[k] = enc + } + + configJSON, err := json.Marshal(encrypted) + if err != nil { + return db.Channel{}, fmt.Errorf("marshal config: %w", err) + } + + updated, err := s.DB.UpdateChannelConfig(ctx, db.UpdateChannelConfigParams{ + ID: channelID, + TeamID: teamID, + Config: configJSON, + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return db.Channel{}, fmt.Errorf("channel not found") + } + return db.Channel{}, fmt.Errorf("update channel config: %w", err) + } + return updated, nil +} + +// Test validates config and sends a test notification without persisting anything. +func (s *Service) Test(ctx context.Context, provider string, config map[string]string) error { + if !validProviders[provider] { + return fmt.Errorf("invalid: unsupported provider %q", provider) + } + + for _, field := range requiredFields[provider] { + if config[field] == "" { + return fmt.Errorf("invalid: %s is required for %s", field, provider) + } + } + + // For webhooks, auto-generate a temporary secret if not provided. + if provider == "webhook" && config["secret"] == "" { + config["secret"] = generateSecret() + } + + testEvent := events.Event{ + Event: "channel.test", + Timestamp: events.Now(), + TeamID: "test", + Actor: events.Actor{Type: events.ActorSystem}, + Resource: events.Resource{ID: "test", Type: "channel"}, + } + + return Deliver(ctx, provider, config, testEvent) +} + +// Delete removes a channel by ID, scoped to the given team. +func (s *Service) Delete(ctx context.Context, channelID, teamID pgtype.UUID) error { + return s.DB.DeleteChannelByTeam(ctx, db.DeleteChannelByTeamParams{ID: channelID, TeamID: teamID}) +} + +func generateSecret() string { + b := make([]byte, 32) + if _, err := rand.Read(b); err != nil { + panic(fmt.Sprintf("crypto/rand failed: %v", err)) + } + return hex.EncodeToString(b) +} diff --git a/internal/channels/shoutrrr.go b/internal/channels/shoutrrr.go new file mode 100644 index 0000000..f173e07 --- /dev/null +++ b/internal/channels/shoutrrr.go @@ -0,0 +1,119 @@ +package channels + +import ( + "fmt" + "net/url" + "regexp" + "strings" +) + +// ShoutrrrURL builds a shoutrrr-compatible URL from structured provider config. +func ShoutrrrURL(provider string, config map[string]string) (string, error) { + switch provider { + case "discord": + return discordURL(config) + case "slack": + return slackURL(config) + case "teams": + return teamsURL(config) + case "googlechat": + return googlechatURL(config) + case "telegram": + return telegramURL(config) + case "matrix": + return matrixURL(config) + default: + return "", fmt.Errorf("unsupported shoutrrr provider: %s", provider) + } +} + +// discordURL converts https://discord.com/api/webhooks/{id}/{token} → discord://{token}@{id} +func discordURL(config map[string]string) (string, error) { + u, err := url.Parse(config["webhook_url"]) + if err != nil { + return "", fmt.Errorf("invalid discord webhook URL: %w", err) + } + // Path: /api/webhooks/{id}/{token} + parts := strings.Split(strings.TrimPrefix(u.Path, "/"), "/") + if len(parts) < 4 || parts[0] != "api" || parts[1] != "webhooks" { + return "", fmt.Errorf("unexpected discord webhook URL format") + } + webhookID, token := parts[2], parts[3] + return fmt.Sprintf("discord://%s@%s", token, webhookID), nil +} + +// slackURL converts https://hooks.slack.com/services/T.../B.../XXX → slack://T.../B.../XXX +func slackURL(config map[string]string) (string, error) { + u, err := url.Parse(config["webhook_url"]) + if err != nil { + return "", fmt.Errorf("invalid slack webhook URL: %w", err) + } + // Path: /services/TXXXXX/BXXXXX/XXXXXXXX + parts := strings.Split(strings.TrimPrefix(u.Path, "/"), "/") + if len(parts) < 4 || parts[0] != "services" { + return "", fmt.Errorf("unexpected slack webhook URL format") + } + return fmt.Sprintf("slack://hook:%s-%s-%s@webhook", parts[1], parts[2], parts[3]), nil +} + +// teamsWebhookRe extracts the 4 components from a Teams webhook URL. +// Format: https:////{group}@{tenant}/IncomingWebhook/{altID}/{groupOwner} +var teamsWebhookRe = regexp.MustCompile(`([0-9a-f-]{36})@([0-9a-f-]{36})/[^/]+/([0-9a-f]{32})/([0-9a-f-]{36})`) + +// teamsURL converts a Teams webhook URL → teams://Group@Tenant/AltID/GroupOwner +func teamsURL(config map[string]string) (string, error) { + webhookURL := config["webhook_url"] + if webhookURL == "" { + return "", fmt.Errorf("teams webhook_url is required") + } + groups := teamsWebhookRe.FindStringSubmatch(webhookURL) + if len(groups) != 5 { + return "", fmt.Errorf("unexpected teams webhook URL format") + } + group, tenant, altID, groupOwner := groups[1], groups[2], groups[3], groups[4] + return fmt.Sprintf("teams://%s@%s/%s/%s", group, tenant, altID, groupOwner), nil +} + +// googlechatURL converts a Google Chat webhook URL to shoutrrr format. +// Input: https://chat.googleapis.com/v1/spaces/SPACE/messages?key=KEY&token=TOKEN +// Output: googlechat://chat.googleapis.com/v1/spaces/SPACE/messages?key=KEY&token=TOKEN +func googlechatURL(config map[string]string) (string, error) { + webhookURL := config["webhook_url"] + if webhookURL == "" { + return "", fmt.Errorf("googlechat webhook_url is required") + } + u, err := url.Parse(webhookURL) + if err != nil { + return "", fmt.Errorf("invalid googlechat webhook URL: %w", err) + } + if u.Host != "chat.googleapis.com" { + return "", fmt.Errorf("unexpected googlechat webhook URL host: %s", u.Host) + } + // Rebuild as googlechat:// scheme with same host, path, and query. + u.Scheme = "googlechat" + return u.String(), nil +} + +// telegramURL builds telegram://token@telegram/?chats=chatID +func telegramURL(config map[string]string) (string, error) { + token := config["bot_token"] + chatID := config["chat_id"] + if token == "" || chatID == "" { + return "", fmt.Errorf("telegram bot_token and chat_id are required") + } + return fmt.Sprintf("telegram://%s@telegram/?chats=%s", token, chatID), nil +} + +// matrixURL builds matrix://user:token@homeserver/room +func matrixURL(config map[string]string) (string, error) { + homeserver := config["homeserver_url"] + token := config["access_token"] + roomID := config["room_id"] + if homeserver == "" || token == "" || roomID == "" { + return "", fmt.Errorf("matrix homeserver_url, access_token, and room_id are required") + } + // Strip protocol from homeserver URL. + host := strings.TrimPrefix(strings.TrimPrefix(homeserver, "https://"), "http://") + // Room ID often starts with ! — URL-encode it. + return fmt.Sprintf("matrix://:%s@%s/%s", url.PathEscape(token), host, url.PathEscape(roomID)), nil +} diff --git a/internal/channels/webhook.go b/internal/channels/webhook.go new file mode 100644 index 0000000..9c70872 --- /dev/null +++ b/internal/channels/webhook.go @@ -0,0 +1,62 @@ +package channels + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "strings" + "time" + + "github.com/google/uuid" +) + +// WebhookDelivery delivers events to webhook URLs with HMAC signing. +type WebhookDelivery struct { + client *http.Client +} + +// NewWebhookDelivery constructs a webhook delivery client. +func NewWebhookDelivery() *WebhookDelivery { + return &WebhookDelivery{ + client: &http.Client{ + Timeout: 10 * time.Second, + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, + }, + } +} + +// Deliver signs and POSTs the event payload to the configured URL. +func (d *WebhookDelivery) Deliver(ctx context.Context, targetURL, secret string, payload []byte) error { + timestamp := time.Now().UTC().Format(time.RFC3339) + deliveryID := uuid.New().String() + + // Compute HMAC-SHA256: sign over "timestamp.body". + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write([]byte(timestamp + "." + string(payload))) + signature := "sha256=" + hex.EncodeToString(mac.Sum(nil)) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, strings.NewReader(string(payload))) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-WRENN-SIGNATURE", signature) + req.Header.Set("X-Wrenn-Delivery", deliveryID) + req.Header.Set("X-Wrenn-Timestamp", timestamp) + + resp, err := d.client.Do(req) + if err != nil { + return fmt.Errorf("http post: %w", err) + } + resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("webhook returned %d", resp.StatusCode) + } + return nil +} diff --git a/internal/config/config.go b/internal/config/config.go index e4e6740..dbc1f1f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/hex" "os" "github.com/joho/godotenv" @@ -22,6 +23,10 @@ type Config struct { OAuthGitHubClientSecret string OAuthRedirectURL string CPPublicURL string + + // Channels — encryption for channel secrets (AES-256-GCM). + EncryptionKeyHex string // WRENN_ENCRYPTION_KEY raw hex string (for validation) + EncryptionKey [32]byte // parsed 32-byte key } // Load reads configuration from a .env file (if present) and environment variables. @@ -30,7 +35,7 @@ func Load() Config { // Best-effort load — missing .env file is fine. _ = godotenv.Load() - return Config{ + cfg := Config{ DatabaseURL: envOrDefault("DATABASE_URL", "postgres://wrenn:wrenn@localhost:5432/wrenn?sslmode=disable"), RedisURL: envOrDefault("REDIS_URL", "redis://localhost:6379/0"), ListenAddr: envOrDefault("WRENN_CP_LISTEN_ADDR", ":8080"), @@ -43,7 +48,18 @@ func Load() Config { OAuthGitHubClientSecret: os.Getenv("OAUTH_GITHUB_CLIENT_SECRET"), OAuthRedirectURL: envOrDefault("OAUTH_REDIRECT_URL", "https://app.wrenn.dev"), CPPublicURL: os.Getenv("CP_PUBLIC_URL"), + + EncryptionKeyHex: os.Getenv("WRENN_ENCRYPTION_KEY"), } + + if cfg.EncryptionKeyHex != "" { + b, err := hex.DecodeString(cfg.EncryptionKeyHex) + if err == nil && len(b) == 32 { + copy(cfg.EncryptionKey[:], b) + } + } + + return cfg } func envOrDefault(key, def string) string { diff --git a/internal/db/channels.sql.go b/internal/db/channels.sql.go new file mode 100644 index 0000000..18f9048 --- /dev/null +++ b/internal/db/channels.sql.go @@ -0,0 +1,225 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: channels.sql + +package db + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const deleteChannelByTeam = `-- name: DeleteChannelByTeam :exec +DELETE FROM channels WHERE id = $1 AND team_id = $2 +` + +type DeleteChannelByTeamParams struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` +} + +func (q *Queries) DeleteChannelByTeam(ctx context.Context, arg DeleteChannelByTeamParams) error { + _, err := q.db.Exec(ctx, deleteChannelByTeam, arg.ID, arg.TeamID) + return err +} + +const getChannelByTeam = `-- name: GetChannelByTeam :one +SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels WHERE id = $1 AND team_id = $2 +` + +type GetChannelByTeamParams struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` +} + +func (q *Queries) GetChannelByTeam(ctx context.Context, arg GetChannelByTeamParams) (Channel, error) { + row := q.db.QueryRow(ctx, getChannelByTeam, arg.ID, arg.TeamID) + var i Channel + err := row.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const insertChannel = `-- name: InsertChannel :one +INSERT INTO channels (id, team_id, name, provider, config, event_types) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at +` + +type InsertChannelParams struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` + Name string `json:"name"` + Provider string `json:"provider"` + Config []byte `json:"config"` + EventTypes []string `json:"event_types"` +} + +func (q *Queries) InsertChannel(ctx context.Context, arg InsertChannelParams) (Channel, error) { + row := q.db.QueryRow(ctx, insertChannel, + arg.ID, + arg.TeamID, + arg.Name, + arg.Provider, + arg.Config, + arg.EventTypes, + ) + var i Channel + err := row.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const listChannelsByTeam = `-- name: ListChannelsByTeam :many +SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels WHERE team_id = $1 ORDER BY created_at DESC +` + +func (q *Queries) ListChannelsByTeam(ctx context.Context, teamID pgtype.UUID) ([]Channel, error) { + rows, err := q.db.Query(ctx, listChannelsByTeam, teamID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Channel + for rows.Next() { + var i Channel + if err := rows.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listChannelsForEvent = `-- name: ListChannelsForEvent :many +SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels +WHERE team_id = $1 + AND $2::text = ANY(event_types) +ORDER BY created_at +` + +type ListChannelsForEventParams struct { + TeamID pgtype.UUID `json:"team_id"` + EventType string `json:"event_type"` +} + +func (q *Queries) ListChannelsForEvent(ctx context.Context, arg ListChannelsForEventParams) ([]Channel, error) { + rows, err := q.db.Query(ctx, listChannelsForEvent, arg.TeamID, arg.EventType) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Channel + for rows.Next() { + var i Channel + if err := rows.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateChannel = `-- name: UpdateChannel :one +UPDATE channels SET name = $3, event_types = $4, updated_at = NOW() +WHERE id = $1 AND team_id = $2 +RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at +` + +type UpdateChannelParams struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` + Name string `json:"name"` + EventTypes []string `json:"event_types"` +} + +func (q *Queries) UpdateChannel(ctx context.Context, arg UpdateChannelParams) (Channel, error) { + row := q.db.QueryRow(ctx, updateChannel, + arg.ID, + arg.TeamID, + arg.Name, + arg.EventTypes, + ) + var i Channel + err := row.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const updateChannelConfig = `-- name: UpdateChannelConfig :one +UPDATE channels SET config = $3, updated_at = NOW() +WHERE id = $1 AND team_id = $2 +RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at +` + +type UpdateChannelConfigParams struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` + Config []byte `json:"config"` +} + +func (q *Queries) UpdateChannelConfig(ctx context.Context, arg UpdateChannelConfigParams) (Channel, error) { + row := q.db.QueryRow(ctx, updateChannelConfig, arg.ID, arg.TeamID, arg.Config) + var i Channel + err := row.Scan( + &i.ID, + &i.TeamID, + &i.Name, + &i.Provider, + &i.Config, + &i.EventTypes, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} diff --git a/internal/db/models.go b/internal/db/models.go index 1e9a5d0..3b9cd9e 100644 --- a/internal/db/models.go +++ b/internal/db/models.go @@ -30,6 +30,17 @@ type AuditLog struct { CreatedAt pgtype.Timestamptz `json:"created_at"` } +type Channel struct { + ID pgtype.UUID `json:"id"` + TeamID pgtype.UUID `json:"team_id"` + Name string `json:"name"` + Provider string `json:"provider"` + Config []byte `json:"config"` + EventTypes []string `json:"event_types"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type Host struct { ID pgtype.UUID `json:"id"` Type string `json:"type"` diff --git a/internal/events/event.go b/internal/events/event.go new file mode 100644 index 0000000..6647c72 --- /dev/null +++ b/internal/events/event.go @@ -0,0 +1,73 @@ +package events + +import ( + "context" + "time" +) + +// EventPublisher pushes events onto the notification stream. +// Satisfied by *channels.Publisher. +type EventPublisher interface { + Publish(ctx context.Context, e Event) +} + +// ActorKind identifies what initiated an event. +type ActorKind string + +const ( + ActorUser ActorKind = "user" + ActorAPIKey ActorKind = "api_key" + ActorSystem ActorKind = "system" +) + +// Actor describes who triggered an event. +type Actor struct { + Type ActorKind `json:"type"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` +} + +// Resource identifies the object the event relates to. +type Resource struct { + ID string `json:"id"` + Type string `json:"type"` +} + +// Event is the canonical notification payload published to the Redis stream +// and delivered to channel subscribers. +type Event struct { + Event string `json:"event"` + Timestamp string `json:"timestamp"` + TeamID string `json:"team_id"` + Actor Actor `json:"actor"` + Resource Resource `json:"resource"` +} + +// Event type constants. +const ( + CapsuleCreated = "capsule.created" + CapsuleRunning = "capsule.running" + CapsulePaused = "capsule.paused" + CapsuleDestroyed = "capsule.destroyed" + SnapshotCreated = "template.snapshot.created" + SnapshotDeleted = "template.snapshot.deleted" + HostUp = "host.up" + HostDown = "host.down" +) + +// AllEventTypes is the complete set of valid event type strings. +var AllEventTypes = []string{ + CapsuleCreated, + CapsuleRunning, + CapsulePaused, + CapsuleDestroyed, + SnapshotCreated, + SnapshotDeleted, + HostUp, + HostDown, +} + +// Now returns the current time formatted for event timestamps. +func Now() string { + return time.Now().UTC().Format(time.RFC3339) +} diff --git a/internal/id/id.go b/internal/id/id.go index f4b6cdb..2ef5d88 100644 --- a/internal/id/id.go +++ b/internal/id/id.go @@ -35,6 +35,7 @@ func NewRefreshTokenID() pgtype.UUID { return newUUID() } func NewAuditLogID() pgtype.UUID { return newUUID() } func NewBuildID() pgtype.UUID { return newUUID() } func NewAdminPermissionID() pgtype.UUID { return newUUID() } +func NewChannelID() pgtype.UUID { return newUUID() } func NewTemplateID() pgtype.UUID { return newUUID() } @@ -75,6 +76,7 @@ const ( PrefixAuditLog = "log-" PrefixBuild = "bld-" PrefixAdminPermission = "perm-" + PrefixChannel = "ch-" ) // UUIDToBase36 encodes 16 UUID bytes as a 25-char base36 string (0-9a-z). @@ -123,6 +125,7 @@ func FormatHostTokenID(id pgtype.UUID) string { return formatUUID(PrefixHostT func FormatRefreshTokenID(id pgtype.UUID) string { return formatUUID(PrefixRefreshToken, id) } func FormatAuditLogID(id pgtype.UUID) string { return formatUUID(PrefixAuditLog, id) } func FormatBuildID(id pgtype.UUID) string { return formatUUID(PrefixBuild, id) } +func FormatChannelID(id pgtype.UUID) string { return formatUUID(PrefixChannel, id) } // --- Parsing (prefixed string from API/RPC input → pgtype.UUID) --- @@ -145,6 +148,7 @@ func ParseHostID(s string) (pgtype.UUID, error) { return parseUUID(PrefixHo func ParseHostTokenID(s string) (pgtype.UUID, error) { return parseUUID(PrefixHostToken, s) } func ParseAuditLogID(s string) (pgtype.UUID, error) { return parseUUID(PrefixAuditLog, s) } func ParseBuildID(s string) (pgtype.UUID, error) { return parseUUID(PrefixBuild, s) } +func ParseChannelID(s string) (pgtype.UUID, error) { return parseUUID(PrefixChannel, s) } // --- Well-known IDs --- From 0f789821868782fb4d40e3e093c24cab45b30953 Mon Sep 17 00:00:00 2001 From: pptx704 Date: Fri, 10 Apr 2026 01:17:03 +0600 Subject: [PATCH 2/2] feat: channel audit logging, name cleaning, message formatting, and dashboard UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add audit log entries for channel create, update, rotate_config, delete - Clean channel names on create/update (trim, lowercase, spaces → hyphens, SafeName validation) - Format chat notifications with full event details (resource, actor, team, timestamp) instead of one-liners - Fix Discord split-line embeds by setting splitLines=No on shoutrrr URL - Add channels dashboard page and sidebar navigation --- frontend/src/lib/api/channels.ts | 72 + frontend/src/lib/components/Sidebar.svelte | 4 +- .../lib/components/icons/IconBroadcast.svelte | 22 + frontend/src/lib/components/icons/index.ts | 1 + .../routes/dashboard/channels/+page.svelte | 1378 +++++++++++++++++ internal/api/handlers_channels.go | 13 +- internal/api/server.go | 2 +- internal/audit/logger.go | 70 + internal/channels/message.go | 54 +- internal/channels/service.go | 26 +- internal/channels/shoutrrr.go | 2 +- 11 files changed, 1624 insertions(+), 20 deletions(-) create mode 100644 frontend/src/lib/api/channels.ts create mode 100644 frontend/src/lib/components/icons/IconBroadcast.svelte create mode 100644 frontend/src/routes/dashboard/channels/+page.svelte diff --git a/frontend/src/lib/api/channels.ts b/frontend/src/lib/api/channels.ts new file mode 100644 index 0000000..130a9a8 --- /dev/null +++ b/frontend/src/lib/api/channels.ts @@ -0,0 +1,72 @@ +import { apiFetch, type ApiResult } from '$lib/api/client'; + +export type Channel = { + id: string; + team_id: string; + name: string; + provider: string; + events: string[]; + created_at: string; + updated_at: string; + secret?: string; // only present immediately after creation (webhook provider) +}; + +export const PROVIDERS = [ + { value: 'discord', label: 'Discord', fields: ['webhook_url'] }, + { value: 'slack', label: 'Slack', fields: ['webhook_url'] }, + { value: 'teams', label: 'Teams', fields: ['webhook_url'] }, + { value: 'googlechat', label: 'Google Chat', fields: ['webhook_url'] }, + { value: 'telegram', label: 'Telegram', fields: ['bot_token', 'chat_id'] }, + { value: 'matrix', label: 'Matrix', fields: ['homeserver_url', 'access_token', 'room_id'] }, + { value: 'webhook', label: 'Webhook', fields: ['url'] } +] as const; + +export const EVENT_TYPES = [ + { value: 'capsule.created', group: 'Capsule' }, + { value: 'capsule.running', group: 'Capsule' }, + { value: 'capsule.paused', group: 'Capsule' }, + { value: 'capsule.destroyed', group: 'Capsule' }, + { value: 'template.snapshot.created', group: 'Template' }, + { value: 'template.snapshot.deleted', group: 'Template' }, + { value: 'host.up', group: 'Host' }, + { value: 'host.down', group: 'Host' } +] as const; + +export async function listChannels(): Promise> { + return apiFetch('GET', '/api/v1/channels'); +} + +export async function createChannel( + name: string, + provider: string, + config: Record, + events: string[] +): Promise> { + return apiFetch('POST', '/api/v1/channels', { name, provider, config, events }); +} + +export async function updateChannel( + id: string, + name: string, + events: string[] +): Promise> { + return apiFetch('PATCH', `/api/v1/channels/${id}`, { name, events }); +} + +export async function deleteChannel(id: string): Promise> { + return apiFetch('DELETE', `/api/v1/channels/${id}`); +} + +export async function rotateConfig( + id: string, + config: Record +): Promise> { + return apiFetch('PUT', `/api/v1/channels/${id}/config`, { config }); +} + +export async function testChannel( + provider: string, + config: Record +): Promise> { + return apiFetch('POST', '/api/v1/channels/test', { provider, config }); +} diff --git a/frontend/src/lib/components/Sidebar.svelte b/frontend/src/lib/components/Sidebar.svelte index 06a8a56..4111dd8 100644 --- a/frontend/src/lib/components/Sidebar.svelte +++ b/frontend/src/lib/components/Sidebar.svelte @@ -22,7 +22,8 @@ IconAudit, IconServer, IconShield, - IconMetrics + IconMetrics, + IconBroadcast } from './icons'; let { collapsed = $bindable(false) }: { collapsed: boolean } = $props(); @@ -58,6 +59,7 @@ let managementItems = $derived([ { label: 'Keys', icon: IconKey, href: '/dashboard/keys' }, + { label: 'Channels', icon: IconBroadcast, href: '/dashboard/channels' }, { label: 'Team', icon: IconMembers, href: '/dashboard/team' }, { label: 'Audit Logs', icon: IconAudit, href: '/dashboard/audit' }, ...(currentTeamIsByoc diff --git a/frontend/src/lib/components/icons/IconBroadcast.svelte b/frontend/src/lib/components/icons/IconBroadcast.svelte new file mode 100644 index 0000000..4ed7697 --- /dev/null +++ b/frontend/src/lib/components/icons/IconBroadcast.svelte @@ -0,0 +1,22 @@ + + + diff --git a/frontend/src/lib/components/icons/index.ts b/frontend/src/lib/components/icons/index.ts index babf0a5..8e256f1 100644 --- a/frontend/src/lib/components/icons/index.ts +++ b/frontend/src/lib/components/icons/index.ts @@ -27,3 +27,4 @@ export { default as IconServer } from './IconServer.svelte'; export { default as IconGear } from './IconGear.svelte'; export { default as IconShield } from './IconShield.svelte'; export { default as IconMetrics } from './IconMetrics.svelte'; +export { default as IconBroadcast } from './IconBroadcast.svelte'; diff --git a/frontend/src/routes/dashboard/channels/+page.svelte b/frontend/src/routes/dashboard/channels/+page.svelte new file mode 100644 index 0000000..62a9e26 --- /dev/null +++ b/frontend/src/routes/dashboard/channels/+page.svelte @@ -0,0 +1,1378 @@ + + + + Wrenn — Channels + + + + { + if (e.key === 'Escape') { + if (openDropdownId) { openDropdownId = null; return; } + if (creating || editing || deleting || rotating || testing) return; + if (showCreate) { showCreate = false; return; } + if (revealChannel) { revealChannel = null; return; } + editTarget = null; + deleteTarget = null; + rotateTarget = null; + } + }} + onclick={(e) => { + if (openDropdownId && !(e.target as Element)?.closest('.split-btn-container')) { + openDropdownId = null; + } + }} +/> + +
+ + +
+
+ +
+
+
+

+ Channels +

+

+ Route capsule events to Discord, Slack, Telegram, and other destinations. +

+
+ + +
+ +
+
+ + +
+ {#if error} +
+ + + + {error}. Try refreshing the page. +
+ {/if} + + {#if loading} + +
+
+
+
+
+
Channel
+
Provider
+
Events
+
Updated
+
Actions
+
+ {#each Array(3) as _, i} +
+
+
+
+
+
+
+
+
+
+
+
+
+ {/each} +
+ {:else if channels.length === 0} + +
+
+
+
+ + + + + + + +
+
+

No channels yet

+

Channels deliver capsule events to your team's tools. Connect Discord, Slack, or a custom webhook.

+ +
+ {:else} + +
+ + {channels.length} {channels.length === 1 ? 'channel' : 'channels'} total + +
+ + +
+ +
+
Channel
+
Provider
+
Events
+
Updated
+
Actions
+
+ + + {#each channels as ch, i (ch.id)} +
+
+ + +
+ {ch.name} +
{ch.id}
+
+ + +
+ + {@render providerIcon(ch.provider)} + {providerLabel(ch.provider)} + +
+ + +
+ {#if ch.events.length <= 3} + {#each ch.events as ev} + {ev} + {/each} + {:else} + {#each ch.events.slice(0, 2) as ev} + {ev} + {/each} + +{ch.events.length - 2} + {/if} +
+ + +
+ + {timeAgo(ch.updated_at)} + +
+ + +
+
+ + + +
+ + +
+
+
+ {/each} +
+ {/if} +
+
+ + +
+
+ + All systems operational +
+
+
+
+ + +{#if openDropdownId} + {@const dropdownChannel = channels.find((c) => c.id === openDropdownId)} + {#if dropdownChannel} +
+ +
+ +
+ {/if} +{/if} + + +{#if showCreate} +
+ +
{ if (!creating && !testing) showCreate = false; }} + onkeydown={(e) => { if (e.key === 'Escape' && !creating && !testing) showCreate = false; }} + >
+ +
+ + +
+
+ + {#if createStep === 2} + + + + {:else} + 1 + {/if} + + Connection +
+
+
+ + 2 + + Events +
+
+ + {#if createError} +
+ {createError} +
+ {/if} + + {#if createStep === 1} + +

New Channel

+

Name the channel, pick a provider, and enter its connection details.

+ + +
+ + +
+ + +
+ +
+ + + {#if providerDropdownOpen} +
+ {#each PROVIDERS as p} + + {/each} +
+ {/if} +
+
+ + +
+ {#each selectedProvider.fields as field} +
+ + { createConfig = { ...createConfig, [field]: e.currentTarget.value }; }} + disabled={creating} + class="w-full rounded-[var(--radius-input)] border border-[var(--color-border)] bg-[var(--color-bg-4)] px-3 py-2 font-mono text-meta text-[var(--color-text-bright)] outline-none placeholder:text-[var(--color-text-muted)] placeholder:font-sans transition-colors duration-150 focus:border-[var(--color-accent)] disabled:opacity-60" + /> +
+ {/each} + + {#if createProvider === 'webhook'} +
+ + { createConfig = { ...createConfig, secret: e.currentTarget.value }; }} + disabled={creating} + class="w-full rounded-[var(--radius-input)] border border-[var(--color-border)] bg-[var(--color-bg-4)] px-3 py-2 font-mono text-meta text-[var(--color-text-bright)] outline-none placeholder:text-[var(--color-text-muted)] placeholder:font-sans transition-colors duration-150 focus:border-[var(--color-accent)] disabled:opacity-60" + /> +
+ {/if} +
+ + +
+ + +
+ + +
+
+ + {:else} + +

Choose Events

+

+ Pick the events that trigger a notification to + {createName} + via {providerLabel(createProvider)}. +

+ + +
+ +
+ + + {#if eventsDropdownOpen} +
+ {#each Object.entries(groupedEvents) as [group, events], gi} +
{group}
+ + {#each events as et} + {@const checked = createEvents.includes(et.value)} + + {/each} + + {#if gi < Object.entries(groupedEvents).length - 1} +
+ {/if} + {/each} +
+ {/if} +
+
+ + + {#if createEvents.length > 0} +
+ {#each createEvents as ev} + + {ev} + + + {/each} +
+ {/if} + + +
+ + + +
+ {/if} +
+
+{/if} + + +{#if revealChannel} +
+ +
{ if (e.key === 'Escape') dismissReveal(); }} + >
+ +
+ +
+ + + + + + Channel created +
+ +

{revealChannel.name}

+

+ Copy the webhook signing secret now — it won't be shown again. +

+ + +
+
+ + {revealChannel.secret ?? ''} + + {#key copyCount} + + {/key} +
+
+ + +
+ + + + +

+ Use this secret to verify webhook signatures (HMAC-SHA256). It cannot be retrieved after you close this dialog. +

+
+ +
+ +
+
+
+{/if} + + +{#if editTarget} +
+ +
{ if (!editing) editTarget = null; }} + onkeydown={(e) => { if (e.key === 'Escape' && !editing) editTarget = null; }} + >
+ +
+

Edit Channel

+

+ Update the name or subscribed events. To change the provider, delete this channel and create a new one. +

+ +
+ + {@render providerIcon(editTarget.provider)} + {providerLabel(editTarget.provider)} + +
+ + {#if editError} +
+ {editError} +
+ {/if} + + +
+ + +
+ + +
+ +
+ + + {#if editEventsDropdownOpen} +
+ {#each Object.entries(groupedEvents) as [group, events], gi} +
{group}
+ + {#each events as et} + {@const checked = editEvents.includes(et.value)} + + {/each} + + {#if gi < Object.entries(groupedEvents).length - 1} +
+ {/if} + {/each} +
+ {/if} +
+
+ +
+ + +
+
+
+{/if} + + +{#if deleteTarget} +
+ +
{ if (!deleting) deleteTarget = null; }} + onkeydown={(e) => { if (e.key === 'Escape' && !deleting) deleteTarget = null; }} + >
+ +
+

Delete Channel

+

+ Permanently delete {deleteTarget.name}? + Events will stop being delivered to this destination immediately. +

+ + {@render providerIcon(deleteTarget.provider)} + {providerLabel(deleteTarget.provider)} + + + {#if deleteError} +
+ {deleteError} +
+ {/if} + +
+ + +
+
+
+{/if} + + +{#if rotateTarget} +
+ +
{ if (!rotating) rotateTarget = null; }} + onkeydown={(e) => { if (e.key === 'Escape' && !rotating) rotateTarget = null; }} + >
+ +
+

+ {rotateTarget.provider === 'webhook' ? 'Rotate Signing Secret' : 'Rotate Credentials'} +

+

+ {#if rotateTarget.provider === 'webhook'} + Replace the HMAC signing secret for {rotateTarget.name}. The webhook URL stays the same. + {:else} + Replace the connection credentials for {rotateTarget.name}. This takes effect immediately. + {/if} +

+ +
+ + {@render providerIcon(rotateTarget.provider)} + {providerLabel(rotateTarget.provider)} + +
+ + {#if rotateError} +
+ {rotateError} +
+ {/if} + +
+ {#each rotateFieldsFor(rotateTarget) as field} +
+ + { rotateConfig_ = { ...rotateConfig_, [field]: e.currentTarget.value }; }} + disabled={rotating} + class="w-full rounded-[var(--radius-input)] border border-[var(--color-border)] bg-[var(--color-bg-4)] px-3 py-2 font-mono text-meta text-[var(--color-text-bright)] outline-none placeholder:text-[var(--color-text-muted)] placeholder:font-sans transition-colors duration-150 focus:border-[var(--color-accent)] disabled:opacity-60" + /> +
+ {/each} +
+ + +
+ + + + +

+ {#if rotateTarget.provider === 'webhook'} + Your endpoint must verify signatures with the new secret. Old signatures will fail immediately. + {:else} + Old credentials stop working immediately. Make sure the new values are configured in your destination first. + {/if} +

+
+ +
+ + +
+
+
+{/if} + +{#snippet providerIcon(provider: string)} + {#if provider === 'discord'} + + {:else if provider === 'slack'} + + {:else if provider === 'telegram'} + + {:else if provider === 'webhook'} + + {:else} + + {/if} +{/snippet} + + diff --git a/internal/api/handlers_channels.go b/internal/api/handlers_channels.go index 3a13998..0fd1d8e 100644 --- a/internal/api/handlers_channels.go +++ b/internal/api/handlers_channels.go @@ -8,6 +8,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5" + "git.omukk.dev/wrenn/sandbox/internal/audit" "git.omukk.dev/wrenn/sandbox/internal/auth" "git.omukk.dev/wrenn/sandbox/internal/channels" "git.omukk.dev/wrenn/sandbox/internal/db" @@ -15,11 +16,12 @@ import ( ) type channelHandler struct { - svc *channels.Service + svc *channels.Service + audit *audit.AuditLogger } -func newChannelHandler(svc *channels.Service) *channelHandler { - return &channelHandler{svc: svc} +func newChannelHandler(svc *channels.Service, al *audit.AuditLogger) *channelHandler { + return &channelHandler{svc: svc, audit: al} } type createChannelRequest struct { @@ -94,6 +96,8 @@ func (h *channelHandler) Create(w http.ResponseWriter, r *http.Request) { return } + h.audit.LogChannelCreate(r.Context(), ac, result.Channel.ID, result.Channel.Name, result.Channel.Provider) + resp := channelToResponse(result.Channel) if result.PlaintextSecret != "" { resp.Secret = &result.PlaintextSecret @@ -168,6 +172,7 @@ func (h *channelHandler) Update(w http.ResponseWriter, r *http.Request) { return } + h.audit.LogChannelUpdate(r.Context(), ac, channelID) writeJSON(w, http.StatusOK, channelToResponse(ch)) } @@ -212,6 +217,7 @@ func (h *channelHandler) RotateConfig(w http.ResponseWriter, r *http.Request) { return } + h.audit.LogChannelRotateConfig(r.Context(), ac, channelID) writeJSON(w, http.StatusOK, channelToResponse(ch)) } @@ -231,5 +237,6 @@ func (h *channelHandler) Delete(w http.ResponseWriter, r *http.Request) { return } + h.audit.LogChannelDelete(r.Context(), ac, channelID) w.WriteHeader(http.StatusNoContent) } diff --git a/internal/api/server.go b/internal/api/server.go index d306d58..fb80f5a 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -71,7 +71,7 @@ func New( statsH := newStatsHandler(statsSvc) metricsH := newSandboxMetricsHandler(queries, pool) buildH := newBuildHandler(buildSvc, queries, pool) - channelH := newChannelHandler(channelSvc) + channelH := newChannelHandler(channelSvc, al) // OpenAPI spec and docs. r.Get("/openapi.yaml", serveOpenAPI) diff --git a/internal/audit/logger.go b/internal/audit/logger.go index 281aca0..55110ac 100644 --- a/internal/audit/logger.go +++ b/internal/audit/logger.go @@ -281,6 +281,76 @@ func (l *AuditLogger) LogTeamRename(ctx context.Context, ac auth.AuthContext, te }) } +// --- Channel events (scope: team) --- + +func (l *AuditLogger) LogChannelCreate(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID, name, provider string) { + actorType, actorID, actorName := actorFields(ac) + l.write(ctx, db.InsertAuditLogParams{ + ID: id.NewAuditLogID(), + TeamID: ac.TeamID, + ActorType: actorType, + ActorID: optText(actorID), + ActorName: actorName, + ResourceType: "channel", + ResourceID: optText(id.FormatChannelID(channelID)), + Action: "create", + Scope: "team", + Status: "success", + Metadata: marshalMeta(map[string]any{"name": name, "provider": provider}), + }) +} + +func (l *AuditLogger) LogChannelUpdate(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) { + actorType, actorID, actorName := actorFields(ac) + l.write(ctx, db.InsertAuditLogParams{ + ID: id.NewAuditLogID(), + TeamID: ac.TeamID, + ActorType: actorType, + ActorID: optText(actorID), + ActorName: actorName, + ResourceType: "channel", + ResourceID: optText(id.FormatChannelID(channelID)), + Action: "update", + Scope: "team", + Status: "info", + Metadata: []byte("{}"), + }) +} + +func (l *AuditLogger) LogChannelRotateConfig(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) { + actorType, actorID, actorName := actorFields(ac) + l.write(ctx, db.InsertAuditLogParams{ + ID: id.NewAuditLogID(), + TeamID: ac.TeamID, + ActorType: actorType, + ActorID: optText(actorID), + ActorName: actorName, + ResourceType: "channel", + ResourceID: optText(id.FormatChannelID(channelID)), + Action: "rotate_config", + Scope: "team", + Status: "info", + Metadata: []byte("{}"), + }) +} + +func (l *AuditLogger) LogChannelDelete(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) { + actorType, actorID, actorName := actorFields(ac) + l.write(ctx, db.InsertAuditLogParams{ + ID: id.NewAuditLogID(), + TeamID: ac.TeamID, + ActorType: actorType, + ActorID: optText(actorID), + ActorName: actorName, + ResourceType: "channel", + ResourceID: optText(id.FormatChannelID(channelID)), + Action: "delete", + Scope: "team", + Status: "warning", + Metadata: []byte("{}"), + }) +} + // --- API key events (scope: team) --- func (l *AuditLogger) LogAPIKeyCreate(ctx context.Context, ac auth.AuthContext, keyID pgtype.UUID, keyName string) { diff --git a/internal/channels/message.go b/internal/channels/message.go index 9435260..f786281 100644 --- a/internal/channels/message.go +++ b/internal/channels/message.go @@ -2,30 +2,64 @@ package channels import ( "fmt" + "strings" "git.omukk.dev/wrenn/sandbox/internal/events" ) -// FormatMessage produces a compact notification string for chat providers. +// FormatMessage produces a human-readable notification string containing +// the event summary, resource details, actor, and timestamp. func FormatMessage(e events.Event) string { + var b strings.Builder + + b.WriteString(formatSummary(e)) + fmt.Fprintf(&b, "\n\nEvent: %s", e.Event) + fmt.Fprintf(&b, "\nResource: %s %s", e.Resource.Type, e.Resource.ID) + fmt.Fprintf(&b, "\nActor: %s", formatActor(e.Actor)) + fmt.Fprintf(&b, "\nTeam: %s", e.TeamID) + fmt.Fprintf(&b, "\nTime: %s", e.Timestamp) + + return b.String() +} + +func formatSummary(e events.Event) string { switch e.Event { case events.CapsuleCreated: - return fmt.Sprintf("[%s] Capsule %s created", e.Event, e.Resource.ID) + return fmt.Sprintf("Capsule %s created", e.Resource.ID) case events.CapsuleRunning: - return fmt.Sprintf("[%s] Capsule %s is running", e.Event, e.Resource.ID) + return fmt.Sprintf("Capsule %s is running", e.Resource.ID) case events.CapsulePaused: - return fmt.Sprintf("[%s] Capsule %s paused", e.Event, e.Resource.ID) + return fmt.Sprintf("Capsule %s paused", e.Resource.ID) case events.CapsuleDestroyed: - return fmt.Sprintf("[%s] Capsule %s destroyed", e.Event, e.Resource.ID) + return fmt.Sprintf("Capsule %s destroyed", e.Resource.ID) case events.SnapshotCreated: - return fmt.Sprintf("[%s] Template snapshot %s created", e.Event, e.Resource.ID) + return fmt.Sprintf("Template snapshot %s created", e.Resource.ID) case events.SnapshotDeleted: - return fmt.Sprintf("[%s] Template snapshot %s deleted", e.Event, e.Resource.ID) + return fmt.Sprintf("Template snapshot %s deleted", e.Resource.ID) case events.HostUp: - return fmt.Sprintf("[%s] Host %s is up", e.Event, e.Resource.ID) + return fmt.Sprintf("Host %s is up", e.Resource.ID) case events.HostDown: - return fmt.Sprintf("[%s] Host %s is down", e.Event, e.Resource.ID) + return fmt.Sprintf("Host %s is down", e.Resource.ID) default: - return fmt.Sprintf("[%s] %s %s", e.Event, e.Resource.Type, e.Resource.ID) + return fmt.Sprintf("%s %s", e.Resource.Type, e.Resource.ID) + } +} + +func formatActor(a events.Actor) string { + switch a.Type { + case events.ActorSystem: + return "system" + case events.ActorUser: + if a.Name != "" { + return fmt.Sprintf("%s (%s)", a.Name, a.ID) + } + return a.ID + case events.ActorAPIKey: + if a.Name != "" { + return fmt.Sprintf("api_key %s (%s)", a.Name, a.ID) + } + return fmt.Sprintf("api_key %s", a.ID) + default: + return string(a.Type) } } diff --git a/internal/channels/service.go b/internal/channels/service.go index ba7b5ed..7f2652c 100644 --- a/internal/channels/service.go +++ b/internal/channels/service.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -15,6 +16,7 @@ import ( "git.omukk.dev/wrenn/sandbox/internal/db" "git.omukk.dev/wrenn/sandbox/internal/events" "git.omukk.dev/wrenn/sandbox/internal/id" + "git.omukk.dev/wrenn/sandbox/internal/validate" ) // Valid providers. @@ -72,9 +74,11 @@ type CreateResult struct { // Create creates a new notification channel. func (s *Service) Create(ctx context.Context, p CreateParams) (CreateResult, error) { - if p.Name == "" { - return CreateResult{}, fmt.Errorf("invalid: channel name is required") + clean, err := cleanName(p.Name) + if err != nil { + return CreateResult{}, err } + p.Name = clean if !validProviders[p.Provider] { return CreateResult{}, fmt.Errorf("invalid: unsupported provider %q", p.Provider) @@ -154,9 +158,11 @@ func (s *Service) Get(ctx context.Context, channelID, teamID pgtype.UUID) (db.Ch // Update updates a channel's name and event types. func (s *Service) Update(ctx context.Context, channelID, teamID pgtype.UUID, name string, eventTypes []string) (db.Channel, error) { - if name == "" { - return db.Channel{}, fmt.Errorf("invalid: channel name is required") + clean, err := cleanName(name) + if err != nil { + return db.Channel{}, err } + name = clean if len(eventTypes) == 0 { return db.Channel{}, fmt.Errorf("invalid: at least one event type is required") @@ -271,6 +277,18 @@ func (s *Service) Delete(ctx context.Context, channelID, teamID pgtype.UUID) err return s.DB.DeleteChannelByTeam(ctx, db.DeleteChannelByTeamParams{ID: channelID, TeamID: teamID}) } +// cleanName normalises a channel name: trim whitespace, lowercase, replace +// spaces with hyphens, then validate against SafeName rules. +func cleanName(name string) (string, error) { + name = strings.TrimSpace(name) + name = strings.ToLower(name) + name = strings.ReplaceAll(name, " ", "-") + if err := validate.SafeName(name); err != nil { + return "", fmt.Errorf("invalid: %w", err) + } + return name, nil +} + func generateSecret() string { b := make([]byte, 32) if _, err := rand.Read(b); err != nil { diff --git a/internal/channels/shoutrrr.go b/internal/channels/shoutrrr.go index f173e07..d7f4557 100644 --- a/internal/channels/shoutrrr.go +++ b/internal/channels/shoutrrr.go @@ -39,7 +39,7 @@ func discordURL(config map[string]string) (string, error) { return "", fmt.Errorf("unexpected discord webhook URL format") } webhookID, token := parts[2], parts[3] - return fmt.Sprintf("discord://%s@%s", token, webhookID), nil + return fmt.Sprintf("discord://%s@%s?splitLines=No", token, webhookID), nil } // slackURL converts https://hooks.slack.com/services/T.../B.../XXX → slack://T.../B.../XXX