forked from wrenn/wrenn
Merge pull request 'Added channels for external notifications' (#13) from feat/channels into dev
Reviewed-on: wrenn/sandbox#13
This commit is contained in:
@ -35,6 +35,10 @@ JWT_SECRET=
|
||||
WRENN_CA_CERT=
|
||||
WRENN_CA_KEY=
|
||||
|
||||
# Channels (notification destinations)
|
||||
# AES-256-GCM key for encrypting channel secrets. Generate with: openssl rand -hex 32
|
||||
WRENN_ENCRYPTION_KEY=
|
||||
|
||||
# OAuth
|
||||
OAUTH_GITHUB_CLIENT_ID=
|
||||
OAUTH_GITHUB_CLIENT_SECRET=
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"git.omukk.dev/wrenn/sandbox/internal/audit"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth/oauth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/channels"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/config"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
|
||||
@ -124,15 +125,30 @@ func main() {
|
||||
slog.Info("registered OAuth provider", "provider", "github")
|
||||
}
|
||||
|
||||
// Channels: publisher, service, dispatcher.
|
||||
if len(cfg.EncryptionKeyHex) != 64 {
|
||||
slog.Error("WRENN_ENCRYPTION_KEY must be a hex-encoded 32-byte key (64 hex chars)")
|
||||
os.Exit(1)
|
||||
}
|
||||
channelPub := channels.NewPublisher(rdb)
|
||||
channelSvc := &channels.Service{DB: queries, EncKey: cfg.EncryptionKey}
|
||||
channelDispatcher := channels.NewDispatcher(rdb, queries, cfg.EncryptionKey)
|
||||
|
||||
// Shared audit logger with event publishing.
|
||||
al := audit.NewWithPublisher(queries, channelPub)
|
||||
|
||||
// API server.
|
||||
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca)
|
||||
srv := api.New(queries, hostPool, hostScheduler, pool, rdb, []byte(cfg.JWTSecret), oauthRegistry, cfg.OAuthRedirectURL, ca, al, channelSvc)
|
||||
|
||||
// Start template build workers (2 concurrent).
|
||||
stopBuildWorkers := srv.BuildSvc.StartWorkers(ctx, 2)
|
||||
defer stopBuildWorkers()
|
||||
|
||||
// Start channel event dispatcher.
|
||||
channelDispatcher.Start(ctx)
|
||||
|
||||
// Start host monitor (passive + active reconciliation every 30s).
|
||||
monitor := api.NewHostMonitor(queries, hostPool, audit.New(queries), 30*time.Second)
|
||||
monitor := api.NewHostMonitor(queries, hostPool, al, 30*time.Second)
|
||||
monitor.Start(ctx)
|
||||
|
||||
// Start metrics sampler (records per-team sandbox stats every 10s).
|
||||
|
||||
19
db/migrations/20260409103357_add_channels.sql
Normal file
19
db/migrations/20260409103357_add_channels.sql
Normal file
@ -0,0 +1,19 @@
|
||||
-- +goose Up
|
||||
|
||||
CREATE TABLE channels (
|
||||
id UUID PRIMARY KEY,
|
||||
team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
config JSONB NOT NULL DEFAULT '{}',
|
||||
event_types TEXT[] NOT NULL DEFAULT '{}',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE (team_id, name)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channels_team ON channels(team_id);
|
||||
|
||||
-- +goose Down
|
||||
|
||||
DROP TABLE IF EXISTS channels;
|
||||
29
db/queries/channels.sql
Normal file
29
db/queries/channels.sql
Normal file
@ -0,0 +1,29 @@
|
||||
-- name: InsertChannel :one
|
||||
INSERT INTO channels (id, team_id, name, provider, config, event_types)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING *;
|
||||
|
||||
-- name: ListChannelsByTeam :many
|
||||
SELECT * FROM channels WHERE team_id = $1 ORDER BY created_at DESC;
|
||||
|
||||
-- name: GetChannelByTeam :one
|
||||
SELECT * FROM channels WHERE id = $1 AND team_id = $2;
|
||||
|
||||
-- name: UpdateChannel :one
|
||||
UPDATE channels SET name = $3, event_types = $4, updated_at = NOW()
|
||||
WHERE id = $1 AND team_id = $2
|
||||
RETURNING *;
|
||||
|
||||
-- name: UpdateChannelConfig :one
|
||||
UPDATE channels SET config = $3, updated_at = NOW()
|
||||
WHERE id = $1 AND team_id = $2
|
||||
RETURNING *;
|
||||
|
||||
-- name: DeleteChannelByTeam :exec
|
||||
DELETE FROM channels WHERE id = $1 AND team_id = $2;
|
||||
|
||||
-- name: ListChannelsForEvent :many
|
||||
SELECT * FROM channels
|
||||
WHERE team_id = $1
|
||||
AND sqlc.arg(event_type)::text = ANY(event_types)
|
||||
ORDER BY created_at;
|
||||
72
frontend/src/lib/api/channels.ts
Normal file
72
frontend/src/lib/api/channels.ts
Normal file
@ -0,0 +1,72 @@
|
||||
import { apiFetch, type ApiResult } from '$lib/api/client';
|
||||
|
||||
export type Channel = {
|
||||
id: string;
|
||||
team_id: string;
|
||||
name: string;
|
||||
provider: string;
|
||||
events: string[];
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
secret?: string; // only present immediately after creation (webhook provider)
|
||||
};
|
||||
|
||||
export const PROVIDERS = [
|
||||
{ value: 'discord', label: 'Discord', fields: ['webhook_url'] },
|
||||
{ value: 'slack', label: 'Slack', fields: ['webhook_url'] },
|
||||
{ value: 'teams', label: 'Teams', fields: ['webhook_url'] },
|
||||
{ value: 'googlechat', label: 'Google Chat', fields: ['webhook_url'] },
|
||||
{ value: 'telegram', label: 'Telegram', fields: ['bot_token', 'chat_id'] },
|
||||
{ value: 'matrix', label: 'Matrix', fields: ['homeserver_url', 'access_token', 'room_id'] },
|
||||
{ value: 'webhook', label: 'Webhook', fields: ['url'] }
|
||||
] as const;
|
||||
|
||||
export const EVENT_TYPES = [
|
||||
{ value: 'capsule.created', group: 'Capsule' },
|
||||
{ value: 'capsule.running', group: 'Capsule' },
|
||||
{ value: 'capsule.paused', group: 'Capsule' },
|
||||
{ value: 'capsule.destroyed', group: 'Capsule' },
|
||||
{ value: 'template.snapshot.created', group: 'Template' },
|
||||
{ value: 'template.snapshot.deleted', group: 'Template' },
|
||||
{ value: 'host.up', group: 'Host' },
|
||||
{ value: 'host.down', group: 'Host' }
|
||||
] as const;
|
||||
|
||||
export async function listChannels(): Promise<ApiResult<Channel[]>> {
|
||||
return apiFetch('GET', '/api/v1/channels');
|
||||
}
|
||||
|
||||
export async function createChannel(
|
||||
name: string,
|
||||
provider: string,
|
||||
config: Record<string, string>,
|
||||
events: string[]
|
||||
): Promise<ApiResult<Channel>> {
|
||||
return apiFetch('POST', '/api/v1/channels', { name, provider, config, events });
|
||||
}
|
||||
|
||||
export async function updateChannel(
|
||||
id: string,
|
||||
name: string,
|
||||
events: string[]
|
||||
): Promise<ApiResult<Channel>> {
|
||||
return apiFetch('PATCH', `/api/v1/channels/${id}`, { name, events });
|
||||
}
|
||||
|
||||
export async function deleteChannel(id: string): Promise<ApiResult<void>> {
|
||||
return apiFetch('DELETE', `/api/v1/channels/${id}`);
|
||||
}
|
||||
|
||||
export async function rotateConfig(
|
||||
id: string,
|
||||
config: Record<string, string>
|
||||
): Promise<ApiResult<Channel>> {
|
||||
return apiFetch('PUT', `/api/v1/channels/${id}/config`, { config });
|
||||
}
|
||||
|
||||
export async function testChannel(
|
||||
provider: string,
|
||||
config: Record<string, string>
|
||||
): Promise<ApiResult<{ status: string }>> {
|
||||
return apiFetch('POST', '/api/v1/channels/test', { provider, config });
|
||||
}
|
||||
@ -22,7 +22,8 @@
|
||||
IconAudit,
|
||||
IconServer,
|
||||
IconShield,
|
||||
IconMetrics
|
||||
IconMetrics,
|
||||
IconBroadcast
|
||||
} from './icons';
|
||||
|
||||
let { collapsed = $bindable(false) }: { collapsed: boolean } = $props();
|
||||
@ -58,6 +59,7 @@
|
||||
|
||||
let managementItems = $derived<NavItem[]>([
|
||||
{ label: 'Keys', icon: IconKey, href: '/dashboard/keys' },
|
||||
{ label: 'Channels', icon: IconBroadcast, href: '/dashboard/channels' },
|
||||
{ label: 'Team', icon: IconMembers, href: '/dashboard/team' },
|
||||
{ label: 'Audit Logs', icon: IconAudit, href: '/dashboard/audit' },
|
||||
...(currentTeamIsByoc
|
||||
|
||||
22
frontend/src/lib/components/icons/IconBroadcast.svelte
Normal file
22
frontend/src/lib/components/icons/IconBroadcast.svelte
Normal file
@ -0,0 +1,22 @@
|
||||
<script lang="ts">
|
||||
let { size = 18, class: className = '' }: { size?: number; class?: string } = $props();
|
||||
</script>
|
||||
|
||||
<svg
|
||||
width={size}
|
||||
height={size}
|
||||
viewBox="0 0 24 24"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
stroke-width="1.75"
|
||||
stroke-linecap="round"
|
||||
stroke-linejoin="round"
|
||||
class={className}
|
||||
aria-hidden="true"
|
||||
>
|
||||
<circle cx="12" cy="12" r="2" />
|
||||
<path d="M16.24 7.76a6 6 0 0 1 0 8.49" />
|
||||
<path d="M7.76 16.24a6 6 0 0 1 0-8.49" />
|
||||
<path d="M19.07 4.93a10 10 0 0 1 0 14.14" />
|
||||
<path d="M4.93 19.07a10 10 0 0 1 0-14.14" />
|
||||
</svg>
|
||||
@ -27,3 +27,4 @@ export { default as IconServer } from './IconServer.svelte';
|
||||
export { default as IconGear } from './IconGear.svelte';
|
||||
export { default as IconShield } from './IconShield.svelte';
|
||||
export { default as IconMetrics } from './IconMetrics.svelte';
|
||||
export { default as IconBroadcast } from './IconBroadcast.svelte';
|
||||
|
||||
1378
frontend/src/routes/dashboard/channels/+page.svelte
Normal file
1378
frontend/src/routes/dashboard/channels/+page.svelte
Normal file
File diff suppressed because it is too large
Load Diff
4
go.mod
4
go.mod
@ -4,6 +4,7 @@ go 1.25.8
|
||||
|
||||
require (
|
||||
connectrpc.com/connect v1.19.1
|
||||
github.com/containrrr/shoutrrr v0.8.0
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1
|
||||
github.com/google/uuid v1.6.0
|
||||
@ -22,9 +23,12 @@ require (
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fatih/color v1.15.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
|
||||
28
go.sum
28
go.sum
@ -6,17 +6,29 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJUSec=
|
||||
github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
|
||||
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
|
||||
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
|
||||
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
|
||||
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
|
||||
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
@ -29,10 +41,21 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
|
||||
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc=
|
||||
github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
|
||||
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/onsi/ginkgo/v2 v2.9.2 h1:BA2GMJOtfGAfagzYtrAlufIP0lq6QERkFmHLMLPwFSU=
|
||||
github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts=
|
||||
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
|
||||
github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
|
||||
@ -53,16 +76,21 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
|
||||
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
|
||||
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
|
||||
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
|
||||
golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs=
|
||||
golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
242
internal/api/handlers_channels.go
Normal file
242
internal/api/handlers_channels.go
Normal file
@ -0,0 +1,242 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/audit"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/channels"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
)
|
||||
|
||||
type channelHandler struct {
|
||||
svc *channels.Service
|
||||
audit *audit.AuditLogger
|
||||
}
|
||||
|
||||
func newChannelHandler(svc *channels.Service, al *audit.AuditLogger) *channelHandler {
|
||||
return &channelHandler{svc: svc, audit: al}
|
||||
}
|
||||
|
||||
type createChannelRequest struct {
|
||||
Name string `json:"name"`
|
||||
Provider string `json:"provider"`
|
||||
Config map[string]string `json:"config"`
|
||||
Events []string `json:"events"`
|
||||
}
|
||||
|
||||
type updateChannelRequest struct {
|
||||
Name string `json:"name"`
|
||||
Events []string `json:"events"`
|
||||
}
|
||||
|
||||
type rotateConfigRequest struct {
|
||||
Config map[string]string `json:"config"`
|
||||
}
|
||||
|
||||
type testChannelRequest struct {
|
||||
Provider string `json:"provider"`
|
||||
Config map[string]string `json:"config"`
|
||||
}
|
||||
|
||||
type channelResponse struct {
|
||||
ID string `json:"id"`
|
||||
TeamID string `json:"team_id"`
|
||||
Name string `json:"name"`
|
||||
Provider string `json:"provider"`
|
||||
Events []string `json:"events"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
Secret *string `json:"secret,omitempty"`
|
||||
}
|
||||
|
||||
func channelToResponse(ch db.Channel) channelResponse {
|
||||
resp := channelResponse{
|
||||
ID: id.FormatChannelID(ch.ID),
|
||||
TeamID: id.FormatTeamID(ch.TeamID),
|
||||
Name: ch.Name,
|
||||
Provider: ch.Provider,
|
||||
Events: ch.EventTypes,
|
||||
}
|
||||
if ch.CreatedAt.Valid {
|
||||
resp.CreatedAt = ch.CreatedAt.Time.Format(time.RFC3339)
|
||||
}
|
||||
if ch.UpdatedAt.Valid {
|
||||
resp.UpdatedAt = ch.UpdatedAt.Time.Format(time.RFC3339)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// Create handles POST /v1/channels.
|
||||
func (h *channelHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
|
||||
var req createChannelRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.svc.Create(r.Context(), channels.CreateParams{
|
||||
TeamID: ac.TeamID,
|
||||
Name: req.Name,
|
||||
Provider: req.Provider,
|
||||
Config: req.Config,
|
||||
Events: req.Events,
|
||||
})
|
||||
if err != nil {
|
||||
status, code, msg := serviceErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
h.audit.LogChannelCreate(r.Context(), ac, result.Channel.ID, result.Channel.Name, result.Channel.Provider)
|
||||
|
||||
resp := channelToResponse(result.Channel)
|
||||
if result.PlaintextSecret != "" {
|
||||
resp.Secret = &result.PlaintextSecret
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, resp)
|
||||
}
|
||||
|
||||
// List handles GET /v1/channels.
|
||||
func (h *channelHandler) List(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
|
||||
chs, err := h.svc.List(r.Context(), ac.TeamID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "db_error", "failed to list channels")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]channelResponse, len(chs))
|
||||
for i, ch := range chs {
|
||||
resp[i] = channelToResponse(ch)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// Get handles GET /v1/channels/{id}.
|
||||
func (h *channelHandler) Get(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
channelIDStr := chi.URLParam(r, "id")
|
||||
|
||||
channelID, err := id.ParseChannelID(channelIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID")
|
||||
return
|
||||
}
|
||||
|
||||
ch, err := h.svc.Get(r.Context(), channelID, ac.TeamID)
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
writeError(w, http.StatusNotFound, "not_found", "channel not found")
|
||||
} else {
|
||||
writeError(w, http.StatusInternalServerError, "db_error", "failed to get channel")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, channelToResponse(ch))
|
||||
}
|
||||
|
||||
// Update handles PATCH /v1/channels/{id}.
|
||||
func (h *channelHandler) Update(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
channelIDStr := chi.URLParam(r, "id")
|
||||
|
||||
channelID, err := id.ParseChannelID(channelIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID")
|
||||
return
|
||||
}
|
||||
|
||||
var req updateChannelRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
ch, err := h.svc.Update(r.Context(), channelID, ac.TeamID, req.Name, req.Events)
|
||||
if err != nil {
|
||||
status, code, msg := serviceErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
h.audit.LogChannelUpdate(r.Context(), ac, channelID)
|
||||
writeJSON(w, http.StatusOK, channelToResponse(ch))
|
||||
}
|
||||
|
||||
// Test handles POST /v1/channels/test.
|
||||
func (h *channelHandler) Test(w http.ResponseWriter, r *http.Request) {
|
||||
var req testChannelRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.svc.Test(r.Context(), req.Provider, req.Config); err != nil {
|
||||
status, code, msg := serviceErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
// RotateConfig handles PUT /v1/channels/{id}/config.
|
||||
func (h *channelHandler) RotateConfig(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
channelIDStr := chi.URLParam(r, "id")
|
||||
|
||||
channelID, err := id.ParseChannelID(channelIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID")
|
||||
return
|
||||
}
|
||||
|
||||
var req rotateConfigRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid JSON body")
|
||||
return
|
||||
}
|
||||
|
||||
ch, err := h.svc.RotateConfig(r.Context(), channelID, ac.TeamID, req.Config)
|
||||
if err != nil {
|
||||
status, code, msg := serviceErrToHTTP(err)
|
||||
writeError(w, status, code, msg)
|
||||
return
|
||||
}
|
||||
|
||||
h.audit.LogChannelRotateConfig(r.Context(), ac, channelID)
|
||||
writeJSON(w, http.StatusOK, channelToResponse(ch))
|
||||
}
|
||||
|
||||
// Delete handles DELETE /v1/channels/{id}.
|
||||
func (h *channelHandler) Delete(w http.ResponseWriter, r *http.Request) {
|
||||
ac := auth.MustFromContext(r.Context())
|
||||
channelIDStr := chi.URLParam(r, "id")
|
||||
|
||||
channelID, err := id.ParseChannelID(channelIDStr)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid_request", "invalid channel ID")
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.svc.Delete(r.Context(), channelID, ac.TeamID); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "db_error", "failed to delete channel")
|
||||
return
|
||||
}
|
||||
|
||||
h.audit.LogChannelDelete(r.Context(), ac, channelID)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
@ -95,6 +95,8 @@ func serviceErrToHTTP(err error) (int, string, string) {
|
||||
return http.StatusNotFound, "not_found", msg
|
||||
case strings.Contains(msg, "not running"), strings.Contains(msg, "not paused"):
|
||||
return http.StatusConflict, "invalid_state", msg
|
||||
case strings.Contains(msg, "conflict:"):
|
||||
return http.StatusConflict, "conflict", msg
|
||||
case strings.Contains(msg, "forbidden"):
|
||||
return http.StatusForbidden, "forbidden", msg
|
||||
case strings.Contains(msg, "invalid or expired"):
|
||||
|
||||
@ -1547,6 +1547,176 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/channels:
|
||||
post:
|
||||
summary: Create a notification channel
|
||||
operationId: createChannel
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/CreateChannelRequest"
|
||||
responses:
|
||||
"201":
|
||||
description: Channel created
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ChannelResponse"
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
get:
|
||||
summary: List notification channels
|
||||
operationId: listChannels
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
"200":
|
||||
description: Channels list
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/ChannelResponse"
|
||||
|
||||
/v1/channels/test:
|
||||
post:
|
||||
summary: Test a channel configuration
|
||||
description: >
|
||||
Sends a test notification using the provided provider and config without
|
||||
saving anything. Use this to verify credentials before creating a channel.
|
||||
operationId: testChannel
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TestChannelRequest"
|
||||
responses:
|
||||
"200":
|
||||
description: Test notification sent successfully
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
status:
|
||||
type: string
|
||||
example: ok
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
|
||||
/v1/channels/{id}:
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
get:
|
||||
summary: Get a notification channel
|
||||
operationId: getChannel
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
"200":
|
||||
description: Channel details
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ChannelResponse"
|
||||
"404":
|
||||
description: Channel not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
patch:
|
||||
summary: Update a notification channel
|
||||
operationId: updateChannel
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UpdateChannelRequest"
|
||||
responses:
|
||||
"200":
|
||||
description: Channel updated
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ChannelResponse"
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
"404":
|
||||
description: Channel not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
delete:
|
||||
summary: Delete a notification channel
|
||||
operationId: deleteChannel
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
responses:
|
||||
"204":
|
||||
description: Channel deleted
|
||||
|
||||
/v1/channels/{id}/config:
|
||||
parameters:
|
||||
- name: id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
put:
|
||||
summary: Rotate channel secrets
|
||||
description: >
|
||||
Replaces the channel's provider configuration entirely with new secrets.
|
||||
The previous config is discarded. Config fields must match the provider's
|
||||
required fields.
|
||||
operationId: rotateChannelConfig
|
||||
tags: [channels]
|
||||
security:
|
||||
- bearerAuth: []
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/RotateConfigRequest"
|
||||
responses:
|
||||
"200":
|
||||
description: Config rotated
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ChannelResponse"
|
||||
"400":
|
||||
$ref: "#/components/responses/BadRequest"
|
||||
"404":
|
||||
description: Channel not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
apiKeyAuth:
|
||||
@ -2067,6 +2237,112 @@ components:
|
||||
format: int64
|
||||
description: "Allocated disk bytes for the CoW sparse file"
|
||||
|
||||
CreateChannelRequest:
|
||||
type: object
|
||||
required: [name, provider, config, events]
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: Unique channel name within the team.
|
||||
provider:
|
||||
type: string
|
||||
enum: [discord, slack, teams, googlechat, telegram, matrix, webhook]
|
||||
config:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: >
|
||||
Provider-specific configuration fields.
|
||||
Discord/Slack/Teams/Google Chat: {"webhook_url": "..."}.
|
||||
Telegram: {"bot_token": "...", "chat_id": "..."}.
|
||||
Matrix: {"homeserver_url": "...", "access_token": "...", "room_id": "..."}.
|
||||
Webhook: {"url": "...", "secret": "..."} (secret is auto-generated if omitted).
|
||||
events:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
enum:
|
||||
- capsule.created
|
||||
- capsule.running
|
||||
- capsule.paused
|
||||
- capsule.destroyed
|
||||
- template.snapshot.created
|
||||
- template.snapshot.deleted
|
||||
- host.up
|
||||
- host.down
|
||||
|
||||
TestChannelRequest:
|
||||
type: object
|
||||
required: [provider, config]
|
||||
properties:
|
||||
provider:
|
||||
type: string
|
||||
enum: [discord, slack, teams, googlechat, telegram, matrix, webhook]
|
||||
config:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: Provider-specific configuration fields (same as CreateChannelRequest.config).
|
||||
|
||||
RotateConfigRequest:
|
||||
type: object
|
||||
required: [config]
|
||||
properties:
|
||||
config:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: string
|
||||
description: >
|
||||
New provider configuration fields. Must include all required fields
|
||||
for the channel's provider. Replaces the existing config entirely.
|
||||
|
||||
UpdateChannelRequest:
|
||||
type: object
|
||||
required: [name, events]
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
events:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
enum:
|
||||
- capsule.created
|
||||
- capsule.running
|
||||
- capsule.paused
|
||||
- capsule.destroyed
|
||||
- template.snapshot.created
|
||||
- template.snapshot.deleted
|
||||
- host.up
|
||||
- host.down
|
||||
|
||||
ChannelResponse:
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
team_id:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
provider:
|
||||
type: string
|
||||
enum: [discord, slack, teams, googlechat, telegram, matrix, webhook]
|
||||
events:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
created_at:
|
||||
type: string
|
||||
format: date-time
|
||||
updated_at:
|
||||
type: string
|
||||
format: date-time
|
||||
secret:
|
||||
type: string
|
||||
nullable: true
|
||||
description: Webhook secret. Only returned on creation, never again.
|
||||
|
||||
Error:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"git.omukk.dev/wrenn/sandbox/internal/audit"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth/oauth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/channels"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/lifecycle"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/scheduler"
|
||||
@ -38,6 +39,8 @@ func New(
|
||||
oauthRegistry *oauth.Registry,
|
||||
oauthRedirectURL string,
|
||||
ca *auth.CA,
|
||||
al *audit.AuditLogger,
|
||||
channelSvc *channels.Service,
|
||||
) *Server {
|
||||
r := chi.NewRouter()
|
||||
r.Use(requestLogger())
|
||||
@ -52,8 +55,6 @@ func New(
|
||||
statsSvc := &service.StatsService{DB: queries, Pool: pgPool}
|
||||
buildSvc := &service.BuildService{DB: queries, Redis: rdb, Pool: pool, Scheduler: sched}
|
||||
|
||||
al := audit.New(queries)
|
||||
|
||||
sandbox := newSandboxHandler(sandboxSvc, al)
|
||||
exec := newExecHandler(queries, pool)
|
||||
execStream := newExecStreamHandler(queries, pool)
|
||||
@ -70,6 +71,7 @@ func New(
|
||||
statsH := newStatsHandler(statsSvc)
|
||||
metricsH := newSandboxMetricsHandler(queries, pool)
|
||||
buildH := newBuildHandler(buildSvc, queries, pool)
|
||||
channelH := newChannelHandler(channelSvc, al)
|
||||
|
||||
// OpenAPI spec and docs.
|
||||
r.Get("/openapi.yaml", serveOpenAPI)
|
||||
@ -171,6 +173,20 @@ func New(
|
||||
})
|
||||
})
|
||||
|
||||
// JWT-authenticated: notification channels.
|
||||
r.Route("/v1/channels", func(r chi.Router) {
|
||||
r.Use(requireJWT(jwtSecret))
|
||||
r.Post("/", channelH.Create)
|
||||
r.Get("/", channelH.List)
|
||||
r.Post("/test", channelH.Test)
|
||||
r.Route("/{id}", func(r chi.Router) {
|
||||
r.Get("/", channelH.Get)
|
||||
r.Patch("/", channelH.Update)
|
||||
r.Delete("/", channelH.Delete)
|
||||
r.Put("/config", channelH.RotateConfig)
|
||||
})
|
||||
})
|
||||
|
||||
// JWT-authenticated: audit log.
|
||||
r.With(requireJWT(jwtSecret)).Get("/v1/audit-logs", auditH.List)
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/auth"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
)
|
||||
|
||||
@ -16,14 +17,38 @@ import (
|
||||
// All methods are fire-and-forget: failures are logged via slog and never
|
||||
// propagated to the caller.
|
||||
type AuditLogger struct {
|
||||
db *db.Queries
|
||||
db *db.Queries
|
||||
pub events.EventPublisher // optional — nil disables event publishing
|
||||
}
|
||||
|
||||
// New constructs an AuditLogger.
|
||||
// New constructs an AuditLogger without event publishing.
|
||||
func New(queries *db.Queries) *AuditLogger {
|
||||
return &AuditLogger{db: queries}
|
||||
}
|
||||
|
||||
// NewWithPublisher constructs an AuditLogger that also publishes channel events.
|
||||
func NewWithPublisher(queries *db.Queries, pub events.EventPublisher) *AuditLogger {
|
||||
return &AuditLogger{db: queries, pub: pub}
|
||||
}
|
||||
|
||||
// publish sends an event to the notification stream if a publisher is configured.
|
||||
func (l *AuditLogger) publish(ctx context.Context, e events.Event) {
|
||||
if l.pub != nil {
|
||||
l.pub.Publish(ctx, e)
|
||||
}
|
||||
}
|
||||
|
||||
// actorToEvent converts auth context fields to an events.Actor.
|
||||
func actorToEvent(ac auth.AuthContext) events.Actor {
|
||||
at, aid, aname := actorFields(ac)
|
||||
return events.Actor{Type: events.ActorKind(at), ID: aid, Name: aname}
|
||||
}
|
||||
|
||||
// systemActor returns an events.Actor for system-initiated events.
|
||||
func systemActor() events.Actor {
|
||||
return events.Actor{Type: events.ActorSystem}
|
||||
}
|
||||
|
||||
// actorFields extracts actor_type, actor_id, and actor_name from an AuthContext.
|
||||
// actor_id is stored as a prefixed string in the TEXT column.
|
||||
func actorFields(ac auth.AuthContext) (actorType, actorID, actorName string) {
|
||||
@ -82,6 +107,13 @@ func (l *AuditLogger) LogSandboxCreate(ctx context.Context, ac auth.AuthContext,
|
||||
Status: "success",
|
||||
Metadata: marshalMeta(map[string]any{"template": template}),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.CapsuleCreated,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogSandboxPause(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
|
||||
@ -99,6 +131,13 @@ func (l *AuditLogger) LogSandboxPause(ctx context.Context, ac auth.AuthContext,
|
||||
Status: "success",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.CapsulePaused,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
|
||||
})
|
||||
}
|
||||
|
||||
// LogSandboxAutoPause records a system-initiated auto-pause (TTL or host reconciler).
|
||||
@ -116,6 +155,13 @@ func (l *AuditLogger) LogSandboxAutoPause(ctx context.Context, teamID, sandboxID
|
||||
Status: "info",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.CapsulePaused,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(teamID),
|
||||
Actor: systemActor(),
|
||||
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogSandboxResume(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
|
||||
@ -133,6 +179,13 @@ func (l *AuditLogger) LogSandboxResume(ctx context.Context, ac auth.AuthContext,
|
||||
Status: "success",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.CapsuleRunning,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogSandboxDestroy(ctx context.Context, ac auth.AuthContext, sandboxID pgtype.UUID) {
|
||||
@ -150,6 +203,13 @@ func (l *AuditLogger) LogSandboxDestroy(ctx context.Context, ac auth.AuthContext
|
||||
Status: "warning",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.CapsuleDestroyed,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: id.FormatSandboxID(sandboxID), Type: "sandbox"},
|
||||
})
|
||||
}
|
||||
|
||||
// --- Snapshot events (scope: team) ---
|
||||
@ -169,6 +229,13 @@ func (l *AuditLogger) LogSnapshotCreate(ctx context.Context, ac auth.AuthContext
|
||||
Status: "success",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.SnapshotCreated,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: name, Type: "snapshot"},
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogSnapshotDelete(ctx context.Context, ac auth.AuthContext, name string) {
|
||||
@ -186,6 +253,13 @@ func (l *AuditLogger) LogSnapshotDelete(ctx context.Context, ac auth.AuthContext
|
||||
Status: "warning",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.SnapshotDeleted,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(ac.TeamID),
|
||||
Actor: actorToEvent(ac),
|
||||
Resource: events.Resource{ID: name, Type: "snapshot"},
|
||||
})
|
||||
}
|
||||
|
||||
// --- Team events (scope: team) ---
|
||||
@ -207,6 +281,76 @@ func (l *AuditLogger) LogTeamRename(ctx context.Context, ac auth.AuthContext, te
|
||||
})
|
||||
}
|
||||
|
||||
// --- Channel events (scope: team) ---
|
||||
|
||||
func (l *AuditLogger) LogChannelCreate(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID, name, provider string) {
|
||||
actorType, actorID, actorName := actorFields(ac)
|
||||
l.write(ctx, db.InsertAuditLogParams{
|
||||
ID: id.NewAuditLogID(),
|
||||
TeamID: ac.TeamID,
|
||||
ActorType: actorType,
|
||||
ActorID: optText(actorID),
|
||||
ActorName: actorName,
|
||||
ResourceType: "channel",
|
||||
ResourceID: optText(id.FormatChannelID(channelID)),
|
||||
Action: "create",
|
||||
Scope: "team",
|
||||
Status: "success",
|
||||
Metadata: marshalMeta(map[string]any{"name": name, "provider": provider}),
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogChannelUpdate(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) {
|
||||
actorType, actorID, actorName := actorFields(ac)
|
||||
l.write(ctx, db.InsertAuditLogParams{
|
||||
ID: id.NewAuditLogID(),
|
||||
TeamID: ac.TeamID,
|
||||
ActorType: actorType,
|
||||
ActorID: optText(actorID),
|
||||
ActorName: actorName,
|
||||
ResourceType: "channel",
|
||||
ResourceID: optText(id.FormatChannelID(channelID)),
|
||||
Action: "update",
|
||||
Scope: "team",
|
||||
Status: "info",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogChannelRotateConfig(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) {
|
||||
actorType, actorID, actorName := actorFields(ac)
|
||||
l.write(ctx, db.InsertAuditLogParams{
|
||||
ID: id.NewAuditLogID(),
|
||||
TeamID: ac.TeamID,
|
||||
ActorType: actorType,
|
||||
ActorID: optText(actorID),
|
||||
ActorName: actorName,
|
||||
ResourceType: "channel",
|
||||
ResourceID: optText(id.FormatChannelID(channelID)),
|
||||
Action: "rotate_config",
|
||||
Scope: "team",
|
||||
Status: "info",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
}
|
||||
|
||||
func (l *AuditLogger) LogChannelDelete(ctx context.Context, ac auth.AuthContext, channelID pgtype.UUID) {
|
||||
actorType, actorID, actorName := actorFields(ac)
|
||||
l.write(ctx, db.InsertAuditLogParams{
|
||||
ID: id.NewAuditLogID(),
|
||||
TeamID: ac.TeamID,
|
||||
ActorType: actorType,
|
||||
ActorID: optText(actorID),
|
||||
ActorName: actorName,
|
||||
ResourceType: "channel",
|
||||
ResourceID: optText(id.FormatChannelID(channelID)),
|
||||
Action: "delete",
|
||||
Scope: "team",
|
||||
Status: "warning",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
}
|
||||
|
||||
// --- API key events (scope: team) ---
|
||||
|
||||
func (l *AuditLogger) LogAPIKeyCreate(ctx context.Context, ac auth.AuthContext, keyID pgtype.UUID, keyName string) {
|
||||
@ -387,6 +531,13 @@ func (l *AuditLogger) LogHostMarkedDown(ctx context.Context, teamID, hostID pgty
|
||||
Status: "error",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.HostDown,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(teamID),
|
||||
Actor: systemActor(),
|
||||
Resource: events.Resource{ID: id.FormatHostID(hostID), Type: "host"},
|
||||
})
|
||||
}
|
||||
|
||||
// LogHostMarkedUp records a system-initiated host status transition back to online.
|
||||
@ -408,4 +559,11 @@ func (l *AuditLogger) LogHostMarkedUp(ctx context.Context, teamID, hostID pgtype
|
||||
Status: "success",
|
||||
Metadata: []byte("{}"),
|
||||
})
|
||||
l.publish(ctx, events.Event{
|
||||
Event: events.HostUp,
|
||||
Timestamp: events.Now(),
|
||||
TeamID: id.FormatTeamID(teamID),
|
||||
Actor: systemActor(),
|
||||
Resource: events.Resource{ID: id.FormatHostID(hostID), Type: "host"},
|
||||
})
|
||||
}
|
||||
|
||||
63
internal/channels/crypto.go
Normal file
63
internal/channels/crypto.go
Normal file
@ -0,0 +1,63 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"crypto/aes"
|
||||
"crypto/cipher"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// EncryptSecret encrypts plaintext using AES-256-GCM with a random nonce.
|
||||
// Returns base64(nonce || ciphertext).
|
||||
func EncryptSecret(key [32]byte, plaintext string) (string, error) {
|
||||
block, err := aes.NewCipher(key[:])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("aes cipher: %w", err)
|
||||
}
|
||||
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("gcm: %w", err)
|
||||
}
|
||||
|
||||
nonce := make([]byte, gcm.NonceSize())
|
||||
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
||||
return "", fmt.Errorf("nonce: %w", err)
|
||||
}
|
||||
|
||||
ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil)
|
||||
return base64.StdEncoding.EncodeToString(ciphertext), nil
|
||||
}
|
||||
|
||||
// DecryptSecret decrypts a value produced by EncryptSecret.
|
||||
func DecryptSecret(key [32]byte, encoded string) (string, error) {
|
||||
data, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("base64 decode: %w", err)
|
||||
}
|
||||
|
||||
block, err := aes.NewCipher(key[:])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("aes cipher: %w", err)
|
||||
}
|
||||
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("gcm: %w", err)
|
||||
}
|
||||
|
||||
nonceSize := gcm.NonceSize()
|
||||
if len(data) < nonceSize {
|
||||
return "", fmt.Errorf("ciphertext too short")
|
||||
}
|
||||
|
||||
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
|
||||
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("decrypt: %w", err)
|
||||
}
|
||||
|
||||
return string(plaintext), nil
|
||||
}
|
||||
36
internal/channels/deliver.go
Normal file
36
internal/channels/deliver.go
Normal file
@ -0,0 +1,36 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/containrrr/shoutrrr"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
)
|
||||
|
||||
// Deliver sends a notification to a single provider with the given config.
|
||||
// For webhooks it uses HMAC-signed HTTP POST; for all others it uses shoutrrr.
|
||||
func Deliver(ctx context.Context, provider string, config map[string]string, e events.Event) error {
|
||||
payload, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal event: %w", err)
|
||||
}
|
||||
|
||||
if provider == "webhook" {
|
||||
wh := NewWebhookDelivery()
|
||||
return wh.Deliver(ctx, config["url"], config["secret"], payload)
|
||||
}
|
||||
|
||||
shoutrrrURL, err := ShoutrrrURL(provider, config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("build shoutrrr URL: %w", err)
|
||||
}
|
||||
|
||||
msg := FormatMessage(e)
|
||||
if err := shoutrrr.Send(shoutrrrURL, msg); err != nil {
|
||||
return fmt.Errorf("shoutrrr send: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
183
internal/channels/dispatcher.go
Normal file
183
internal/channels/dispatcher.go
Normal file
@ -0,0 +1,183 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
)
|
||||
|
||||
const (
|
||||
groupName = "wrenn-channels-v1"
|
||||
consumerName = "cp-0"
|
||||
)
|
||||
|
||||
// Dispatcher consumes events from the Redis stream and delivers them
|
||||
// to matching notification channels.
|
||||
type Dispatcher struct {
|
||||
rdb *redis.Client
|
||||
db *db.Queries
|
||||
encKey [32]byte
|
||||
webhook *WebhookDelivery
|
||||
}
|
||||
|
||||
// NewDispatcher constructs an event dispatcher.
|
||||
func NewDispatcher(rdb *redis.Client, queries *db.Queries, encKey [32]byte) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
rdb: rdb,
|
||||
db: queries,
|
||||
encKey: encKey,
|
||||
webhook: NewWebhookDelivery(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the consumer goroutine. Returns when ctx is cancelled.
|
||||
func (d *Dispatcher) Start(ctx context.Context) {
|
||||
go d.run(ctx)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) run(ctx context.Context) {
|
||||
// Create consumer group idempotently. "$" means only new messages.
|
||||
err := d.rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "$").Err()
|
||||
if err != nil && !isGroupExistsError(err) {
|
||||
slog.Error("channels: failed to create consumer group", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
streams, err := d.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: groupName,
|
||||
Consumer: consumerName,
|
||||
Streams: []string{streamKey, ">"},
|
||||
Count: 10,
|
||||
Block: 5 * time.Second,
|
||||
}).Result()
|
||||
|
||||
if err != nil {
|
||||
if err == redis.Nil || ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
slog.Warn("channels: xreadgroup error", "error", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, msg := range stream.Messages {
|
||||
d.handleMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) handleMessage(ctx context.Context, msg redis.XMessage) {
|
||||
defer func() {
|
||||
if err := d.rdb.XAck(ctx, streamKey, groupName, msg.ID).Err(); err != nil {
|
||||
slog.Warn("channels: xack failed", "id", msg.ID, "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
payload, ok := msg.Values["payload"].(string)
|
||||
if !ok {
|
||||
slog.Warn("channels: message missing payload", "id", msg.ID)
|
||||
return
|
||||
}
|
||||
|
||||
var event events.Event
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
slog.Warn("channels: failed to unmarshal event", "id", msg.ID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
teamID, err := id.ParseTeamID(event.TeamID)
|
||||
if err != nil {
|
||||
slog.Warn("channels: invalid team ID in event", "team_id", event.TeamID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
channels, err := d.db.ListChannelsForEvent(ctx, db.ListChannelsForEventParams{
|
||||
TeamID: teamID,
|
||||
EventType: event.Event,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("channels: failed to list channels for event", "event", event.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, ch := range channels {
|
||||
d.dispatch(ctx, ch, event)
|
||||
}
|
||||
}
|
||||
|
||||
// retryDelays defines the wait durations before each retry attempt.
|
||||
var retryDelays = []time.Duration{10 * time.Second, 30 * time.Second}
|
||||
|
||||
func (d *Dispatcher) dispatch(ctx context.Context, ch db.Channel, e events.Event) {
|
||||
config, err := d.decryptConfig(ch.Config)
|
||||
if err != nil {
|
||||
slog.Warn("channels: failed to decrypt config",
|
||||
"channel_id", id.FormatChannelID(ch.ID), "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
chID := id.FormatChannelID(ch.ID)
|
||||
|
||||
if err := Deliver(ctx, ch.Provider, config, e); err != nil {
|
||||
slog.Warn("channels: delivery failed, scheduling retries",
|
||||
"channel_id", chID, "provider", ch.Provider, "error", err)
|
||||
go d.retryDeliver(ctx, ch.Provider, config, e, chID)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) retryDeliver(ctx context.Context, provider string, config map[string]string, e events.Event, chID string) {
|
||||
for i, delay := range retryDelays {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(delay):
|
||||
}
|
||||
|
||||
if err := Deliver(ctx, provider, config, e); err != nil {
|
||||
slog.Warn("channels: retry delivery failed",
|
||||
"channel_id", chID, "provider", provider,
|
||||
"attempt", i+2, "error", err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
slog.Error("channels: delivery failed after all retries",
|
||||
"channel_id", chID, "provider", provider, "event", e.Event)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) decryptConfig(configJSON []byte) (map[string]string, error) {
|
||||
var encrypted map[string]string
|
||||
if err := json.Unmarshal(configJSON, &encrypted); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decrypted := make(map[string]string, len(encrypted))
|
||||
for k, v := range encrypted {
|
||||
plaintext, err := DecryptSecret(d.encKey, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
decrypted[k] = plaintext
|
||||
}
|
||||
return decrypted, nil
|
||||
}
|
||||
|
||||
func isGroupExistsError(err error) bool {
|
||||
return err != nil && err.Error() == "BUSYGROUP Consumer Group name already exists"
|
||||
}
|
||||
65
internal/channels/message.go
Normal file
65
internal/channels/message.go
Normal file
@ -0,0 +1,65 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
)
|
||||
|
||||
// FormatMessage produces a human-readable notification string containing
|
||||
// the event summary, resource details, actor, and timestamp.
|
||||
func FormatMessage(e events.Event) string {
|
||||
var b strings.Builder
|
||||
|
||||
b.WriteString(formatSummary(e))
|
||||
fmt.Fprintf(&b, "\n\nEvent: %s", e.Event)
|
||||
fmt.Fprintf(&b, "\nResource: %s %s", e.Resource.Type, e.Resource.ID)
|
||||
fmt.Fprintf(&b, "\nActor: %s", formatActor(e.Actor))
|
||||
fmt.Fprintf(&b, "\nTeam: %s", e.TeamID)
|
||||
fmt.Fprintf(&b, "\nTime: %s", e.Timestamp)
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func formatSummary(e events.Event) string {
|
||||
switch e.Event {
|
||||
case events.CapsuleCreated:
|
||||
return fmt.Sprintf("Capsule %s created", e.Resource.ID)
|
||||
case events.CapsuleRunning:
|
||||
return fmt.Sprintf("Capsule %s is running", e.Resource.ID)
|
||||
case events.CapsulePaused:
|
||||
return fmt.Sprintf("Capsule %s paused", e.Resource.ID)
|
||||
case events.CapsuleDestroyed:
|
||||
return fmt.Sprintf("Capsule %s destroyed", e.Resource.ID)
|
||||
case events.SnapshotCreated:
|
||||
return fmt.Sprintf("Template snapshot %s created", e.Resource.ID)
|
||||
case events.SnapshotDeleted:
|
||||
return fmt.Sprintf("Template snapshot %s deleted", e.Resource.ID)
|
||||
case events.HostUp:
|
||||
return fmt.Sprintf("Host %s is up", e.Resource.ID)
|
||||
case events.HostDown:
|
||||
return fmt.Sprintf("Host %s is down", e.Resource.ID)
|
||||
default:
|
||||
return fmt.Sprintf("%s %s", e.Resource.Type, e.Resource.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func formatActor(a events.Actor) string {
|
||||
switch a.Type {
|
||||
case events.ActorSystem:
|
||||
return "system"
|
||||
case events.ActorUser:
|
||||
if a.Name != "" {
|
||||
return fmt.Sprintf("%s (%s)", a.Name, a.ID)
|
||||
}
|
||||
return a.ID
|
||||
case events.ActorAPIKey:
|
||||
if a.Name != "" {
|
||||
return fmt.Sprintf("api_key %s (%s)", a.Name, a.ID)
|
||||
}
|
||||
return fmt.Sprintf("api_key %s", a.ID)
|
||||
default:
|
||||
return string(a.Type)
|
||||
}
|
||||
}
|
||||
44
internal/channels/publisher.go
Normal file
44
internal/channels/publisher.go
Normal file
@ -0,0 +1,44 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
)
|
||||
|
||||
const streamKey = "wrenn:events"
|
||||
|
||||
// Publisher pushes events onto the Redis stream for the dispatcher to consume.
|
||||
type Publisher struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
// NewPublisher constructs an event publisher.
|
||||
func NewPublisher(rdb *redis.Client) *Publisher {
|
||||
return &Publisher{rdb: rdb}
|
||||
}
|
||||
|
||||
// Publish serializes the event and appends it to the global stream.
|
||||
// Fire-and-forget: failures are logged, never propagated.
|
||||
func (p *Publisher) Publish(ctx context.Context, e events.Event) {
|
||||
payload, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
slog.Warn("channels: failed to marshal event", "event", e.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.rdb.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: streamKey,
|
||||
MaxLen: 10000,
|
||||
Approx: true,
|
||||
Values: map[string]interface{}{
|
||||
"payload": string(payload),
|
||||
},
|
||||
}).Err(); err != nil {
|
||||
slog.Warn("channels: failed to publish event", "event", e.Event, "error", err)
|
||||
}
|
||||
}
|
||||
298
internal/channels/service.go
Normal file
298
internal/channels/service.go
Normal file
@ -0,0 +1,298 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/validate"
|
||||
)
|
||||
|
||||
// Valid providers.
|
||||
var validProviders = map[string]bool{
|
||||
"discord": true,
|
||||
"slack": true,
|
||||
"teams": true,
|
||||
"googlechat": true,
|
||||
"telegram": true,
|
||||
"matrix": true,
|
||||
"webhook": true,
|
||||
}
|
||||
|
||||
// Required config fields per provider.
|
||||
var requiredFields = map[string][]string{
|
||||
"discord": {"webhook_url"},
|
||||
"slack": {"webhook_url"},
|
||||
"teams": {"webhook_url"},
|
||||
"googlechat": {"webhook_url"},
|
||||
"telegram": {"bot_token", "chat_id"},
|
||||
"matrix": {"homeserver_url", "access_token", "room_id"},
|
||||
"webhook": {"url"},
|
||||
}
|
||||
|
||||
// validEvents maps event type strings to true for validation.
|
||||
var validEvents map[string]bool
|
||||
|
||||
func init() {
|
||||
validEvents = make(map[string]bool, len(events.AllEventTypes))
|
||||
for _, et := range events.AllEventTypes {
|
||||
validEvents[et] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Service handles channel CRUD operations.
|
||||
type Service struct {
|
||||
DB *db.Queries
|
||||
EncKey [32]byte
|
||||
}
|
||||
|
||||
// CreateParams holds the parameters for creating a channel.
|
||||
type CreateParams struct {
|
||||
TeamID pgtype.UUID
|
||||
Name string
|
||||
Provider string
|
||||
Config map[string]string
|
||||
Events []string
|
||||
}
|
||||
|
||||
// CreateResult holds the result of creating a channel.
|
||||
type CreateResult struct {
|
||||
Channel db.Channel
|
||||
PlaintextSecret string // non-empty only for webhook provider
|
||||
}
|
||||
|
||||
// Create creates a new notification channel.
|
||||
func (s *Service) Create(ctx context.Context, p CreateParams) (CreateResult, error) {
|
||||
clean, err := cleanName(p.Name)
|
||||
if err != nil {
|
||||
return CreateResult{}, err
|
||||
}
|
||||
p.Name = clean
|
||||
|
||||
if !validProviders[p.Provider] {
|
||||
return CreateResult{}, fmt.Errorf("invalid: unsupported provider %q", p.Provider)
|
||||
}
|
||||
|
||||
if len(p.Events) == 0 {
|
||||
return CreateResult{}, fmt.Errorf("invalid: at least one event type is required")
|
||||
}
|
||||
for _, et := range p.Events {
|
||||
if !validEvents[et] {
|
||||
return CreateResult{}, fmt.Errorf("invalid: unknown event type %q", et)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate required config fields.
|
||||
for _, field := range requiredFields[p.Provider] {
|
||||
if p.Config[field] == "" {
|
||||
return CreateResult{}, fmt.Errorf("invalid: %s is required for %s", field, p.Provider)
|
||||
}
|
||||
}
|
||||
|
||||
// For webhooks, auto-generate secret if not provided.
|
||||
var plaintextSecret string
|
||||
if p.Provider == "webhook" {
|
||||
if p.Config["secret"] == "" {
|
||||
secret := generateSecret()
|
||||
p.Config["secret"] = secret
|
||||
plaintextSecret = secret
|
||||
} else {
|
||||
plaintextSecret = p.Config["secret"]
|
||||
}
|
||||
}
|
||||
|
||||
// Encrypt config fields.
|
||||
encrypted := make(map[string]string, len(p.Config))
|
||||
for k, v := range p.Config {
|
||||
enc, err := EncryptSecret(s.EncKey, v)
|
||||
if err != nil {
|
||||
return CreateResult{}, fmt.Errorf("encrypt config field %s: %w", k, err)
|
||||
}
|
||||
encrypted[k] = enc
|
||||
}
|
||||
|
||||
configJSON, err := json.Marshal(encrypted)
|
||||
if err != nil {
|
||||
return CreateResult{}, fmt.Errorf("marshal config: %w", err)
|
||||
}
|
||||
|
||||
ch, err := s.DB.InsertChannel(ctx, db.InsertChannelParams{
|
||||
ID: id.NewChannelID(),
|
||||
TeamID: p.TeamID,
|
||||
Name: p.Name,
|
||||
Provider: p.Provider,
|
||||
Config: configJSON,
|
||||
EventTypes: p.Events,
|
||||
})
|
||||
if err != nil {
|
||||
var pgErr *pgconn.PgError
|
||||
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
|
||||
return CreateResult{}, fmt.Errorf("conflict: channel name %q already exists", p.Name)
|
||||
}
|
||||
return CreateResult{}, fmt.Errorf("insert channel: %w", err)
|
||||
}
|
||||
|
||||
return CreateResult{Channel: ch, PlaintextSecret: plaintextSecret}, nil
|
||||
}
|
||||
|
||||
// List returns all channels belonging to the given team.
|
||||
func (s *Service) List(ctx context.Context, teamID pgtype.UUID) ([]db.Channel, error) {
|
||||
return s.DB.ListChannelsByTeam(ctx, teamID)
|
||||
}
|
||||
|
||||
// Get returns a single channel by ID, scoped to the given team.
|
||||
func (s *Service) Get(ctx context.Context, channelID, teamID pgtype.UUID) (db.Channel, error) {
|
||||
return s.DB.GetChannelByTeam(ctx, db.GetChannelByTeamParams{ID: channelID, TeamID: teamID})
|
||||
}
|
||||
|
||||
// Update updates a channel's name and event types.
|
||||
func (s *Service) Update(ctx context.Context, channelID, teamID pgtype.UUID, name string, eventTypes []string) (db.Channel, error) {
|
||||
clean, err := cleanName(name)
|
||||
if err != nil {
|
||||
return db.Channel{}, err
|
||||
}
|
||||
name = clean
|
||||
|
||||
if len(eventTypes) == 0 {
|
||||
return db.Channel{}, fmt.Errorf("invalid: at least one event type is required")
|
||||
}
|
||||
for _, et := range eventTypes {
|
||||
if !validEvents[et] {
|
||||
return db.Channel{}, fmt.Errorf("invalid: unknown event type %q", et)
|
||||
}
|
||||
}
|
||||
|
||||
ch, err := s.DB.UpdateChannel(ctx, db.UpdateChannelParams{
|
||||
ID: channelID,
|
||||
TeamID: teamID,
|
||||
Name: name,
|
||||
EventTypes: eventTypes,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return db.Channel{}, fmt.Errorf("channel not found")
|
||||
}
|
||||
var pgErr *pgconn.PgError
|
||||
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
|
||||
return db.Channel{}, fmt.Errorf("conflict: channel name %q already exists", name)
|
||||
}
|
||||
return db.Channel{}, fmt.Errorf("update channel: %w", err)
|
||||
}
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// RotateConfig replaces a channel's config with new provider secrets.
|
||||
func (s *Service) RotateConfig(ctx context.Context, channelID, teamID pgtype.UUID, config map[string]string) (db.Channel, error) {
|
||||
// Look up the existing channel to get its provider for validation.
|
||||
ch, err := s.DB.GetChannelByTeam(ctx, db.GetChannelByTeamParams{ID: channelID, TeamID: teamID})
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return db.Channel{}, fmt.Errorf("channel not found")
|
||||
}
|
||||
return db.Channel{}, fmt.Errorf("get channel: %w", err)
|
||||
}
|
||||
|
||||
// Validate required config fields for this provider.
|
||||
for _, field := range requiredFields[ch.Provider] {
|
||||
if config[field] == "" {
|
||||
return db.Channel{}, fmt.Errorf("invalid: %s is required for %s", field, ch.Provider)
|
||||
}
|
||||
}
|
||||
|
||||
// For webhooks, auto-generate secret if not provided.
|
||||
if ch.Provider == "webhook" && config["secret"] == "" {
|
||||
config["secret"] = generateSecret()
|
||||
}
|
||||
|
||||
// Encrypt all config fields.
|
||||
encrypted := make(map[string]string, len(config))
|
||||
for k, v := range config {
|
||||
enc, err := EncryptSecret(s.EncKey, v)
|
||||
if err != nil {
|
||||
return db.Channel{}, fmt.Errorf("encrypt config field %s: %w", k, err)
|
||||
}
|
||||
encrypted[k] = enc
|
||||
}
|
||||
|
||||
configJSON, err := json.Marshal(encrypted)
|
||||
if err != nil {
|
||||
return db.Channel{}, fmt.Errorf("marshal config: %w", err)
|
||||
}
|
||||
|
||||
updated, err := s.DB.UpdateChannelConfig(ctx, db.UpdateChannelConfigParams{
|
||||
ID: channelID,
|
||||
TeamID: teamID,
|
||||
Config: configJSON,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return db.Channel{}, fmt.Errorf("channel not found")
|
||||
}
|
||||
return db.Channel{}, fmt.Errorf("update channel config: %w", err)
|
||||
}
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
// Test validates config and sends a test notification without persisting anything.
|
||||
func (s *Service) Test(ctx context.Context, provider string, config map[string]string) error {
|
||||
if !validProviders[provider] {
|
||||
return fmt.Errorf("invalid: unsupported provider %q", provider)
|
||||
}
|
||||
|
||||
for _, field := range requiredFields[provider] {
|
||||
if config[field] == "" {
|
||||
return fmt.Errorf("invalid: %s is required for %s", field, provider)
|
||||
}
|
||||
}
|
||||
|
||||
// For webhooks, auto-generate a temporary secret if not provided.
|
||||
if provider == "webhook" && config["secret"] == "" {
|
||||
config["secret"] = generateSecret()
|
||||
}
|
||||
|
||||
testEvent := events.Event{
|
||||
Event: "channel.test",
|
||||
Timestamp: events.Now(),
|
||||
TeamID: "test",
|
||||
Actor: events.Actor{Type: events.ActorSystem},
|
||||
Resource: events.Resource{ID: "test", Type: "channel"},
|
||||
}
|
||||
|
||||
return Deliver(ctx, provider, config, testEvent)
|
||||
}
|
||||
|
||||
// Delete removes a channel by ID, scoped to the given team.
|
||||
func (s *Service) Delete(ctx context.Context, channelID, teamID pgtype.UUID) error {
|
||||
return s.DB.DeleteChannelByTeam(ctx, db.DeleteChannelByTeamParams{ID: channelID, TeamID: teamID})
|
||||
}
|
||||
|
||||
// cleanName normalises a channel name: trim whitespace, lowercase, replace
|
||||
// spaces with hyphens, then validate against SafeName rules.
|
||||
func cleanName(name string) (string, error) {
|
||||
name = strings.TrimSpace(name)
|
||||
name = strings.ToLower(name)
|
||||
name = strings.ReplaceAll(name, " ", "-")
|
||||
if err := validate.SafeName(name); err != nil {
|
||||
return "", fmt.Errorf("invalid: %w", err)
|
||||
}
|
||||
return name, nil
|
||||
}
|
||||
|
||||
func generateSecret() string {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
panic(fmt.Sprintf("crypto/rand failed: %v", err))
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
119
internal/channels/shoutrrr.go
Normal file
119
internal/channels/shoutrrr.go
Normal file
@ -0,0 +1,119 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ShoutrrrURL builds a shoutrrr-compatible URL from structured provider config.
|
||||
func ShoutrrrURL(provider string, config map[string]string) (string, error) {
|
||||
switch provider {
|
||||
case "discord":
|
||||
return discordURL(config)
|
||||
case "slack":
|
||||
return slackURL(config)
|
||||
case "teams":
|
||||
return teamsURL(config)
|
||||
case "googlechat":
|
||||
return googlechatURL(config)
|
||||
case "telegram":
|
||||
return telegramURL(config)
|
||||
case "matrix":
|
||||
return matrixURL(config)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported shoutrrr provider: %s", provider)
|
||||
}
|
||||
}
|
||||
|
||||
// discordURL converts https://discord.com/api/webhooks/{id}/{token} → discord://{token}@{id}
|
||||
func discordURL(config map[string]string) (string, error) {
|
||||
u, err := url.Parse(config["webhook_url"])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid discord webhook URL: %w", err)
|
||||
}
|
||||
// Path: /api/webhooks/{id}/{token}
|
||||
parts := strings.Split(strings.TrimPrefix(u.Path, "/"), "/")
|
||||
if len(parts) < 4 || parts[0] != "api" || parts[1] != "webhooks" {
|
||||
return "", fmt.Errorf("unexpected discord webhook URL format")
|
||||
}
|
||||
webhookID, token := parts[2], parts[3]
|
||||
return fmt.Sprintf("discord://%s@%s?splitLines=No", token, webhookID), nil
|
||||
}
|
||||
|
||||
// slackURL converts https://hooks.slack.com/services/T.../B.../XXX → slack://T.../B.../XXX
|
||||
func slackURL(config map[string]string) (string, error) {
|
||||
u, err := url.Parse(config["webhook_url"])
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid slack webhook URL: %w", err)
|
||||
}
|
||||
// Path: /services/TXXXXX/BXXXXX/XXXXXXXX
|
||||
parts := strings.Split(strings.TrimPrefix(u.Path, "/"), "/")
|
||||
if len(parts) < 4 || parts[0] != "services" {
|
||||
return "", fmt.Errorf("unexpected slack webhook URL format")
|
||||
}
|
||||
return fmt.Sprintf("slack://hook:%s-%s-%s@webhook", parts[1], parts[2], parts[3]), nil
|
||||
}
|
||||
|
||||
// teamsWebhookRe extracts the 4 components from a Teams webhook URL.
|
||||
// Format: https://<host>/<path>/{group}@{tenant}/IncomingWebhook/{altID}/{groupOwner}
|
||||
var teamsWebhookRe = regexp.MustCompile(`([0-9a-f-]{36})@([0-9a-f-]{36})/[^/]+/([0-9a-f]{32})/([0-9a-f-]{36})`)
|
||||
|
||||
// teamsURL converts a Teams webhook URL → teams://Group@Tenant/AltID/GroupOwner
|
||||
func teamsURL(config map[string]string) (string, error) {
|
||||
webhookURL := config["webhook_url"]
|
||||
if webhookURL == "" {
|
||||
return "", fmt.Errorf("teams webhook_url is required")
|
||||
}
|
||||
groups := teamsWebhookRe.FindStringSubmatch(webhookURL)
|
||||
if len(groups) != 5 {
|
||||
return "", fmt.Errorf("unexpected teams webhook URL format")
|
||||
}
|
||||
group, tenant, altID, groupOwner := groups[1], groups[2], groups[3], groups[4]
|
||||
return fmt.Sprintf("teams://%s@%s/%s/%s", group, tenant, altID, groupOwner), nil
|
||||
}
|
||||
|
||||
// googlechatURL converts a Google Chat webhook URL to shoutrrr format.
|
||||
// Input: https://chat.googleapis.com/v1/spaces/SPACE/messages?key=KEY&token=TOKEN
|
||||
// Output: googlechat://chat.googleapis.com/v1/spaces/SPACE/messages?key=KEY&token=TOKEN
|
||||
func googlechatURL(config map[string]string) (string, error) {
|
||||
webhookURL := config["webhook_url"]
|
||||
if webhookURL == "" {
|
||||
return "", fmt.Errorf("googlechat webhook_url is required")
|
||||
}
|
||||
u, err := url.Parse(webhookURL)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid googlechat webhook URL: %w", err)
|
||||
}
|
||||
if u.Host != "chat.googleapis.com" {
|
||||
return "", fmt.Errorf("unexpected googlechat webhook URL host: %s", u.Host)
|
||||
}
|
||||
// Rebuild as googlechat:// scheme with same host, path, and query.
|
||||
u.Scheme = "googlechat"
|
||||
return u.String(), nil
|
||||
}
|
||||
|
||||
// telegramURL builds telegram://token@telegram/?chats=chatID
|
||||
func telegramURL(config map[string]string) (string, error) {
|
||||
token := config["bot_token"]
|
||||
chatID := config["chat_id"]
|
||||
if token == "" || chatID == "" {
|
||||
return "", fmt.Errorf("telegram bot_token and chat_id are required")
|
||||
}
|
||||
return fmt.Sprintf("telegram://%s@telegram/?chats=%s", token, chatID), nil
|
||||
}
|
||||
|
||||
// matrixURL builds matrix://user:token@homeserver/room
|
||||
func matrixURL(config map[string]string) (string, error) {
|
||||
homeserver := config["homeserver_url"]
|
||||
token := config["access_token"]
|
||||
roomID := config["room_id"]
|
||||
if homeserver == "" || token == "" || roomID == "" {
|
||||
return "", fmt.Errorf("matrix homeserver_url, access_token, and room_id are required")
|
||||
}
|
||||
// Strip protocol from homeserver URL.
|
||||
host := strings.TrimPrefix(strings.TrimPrefix(homeserver, "https://"), "http://")
|
||||
// Room ID often starts with ! — URL-encode it.
|
||||
return fmt.Sprintf("matrix://:%s@%s/%s", url.PathEscape(token), host, url.PathEscape(roomID)), nil
|
||||
}
|
||||
62
internal/channels/webhook.go
Normal file
62
internal/channels/webhook.go
Normal file
@ -0,0 +1,62 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// WebhookDelivery delivers events to webhook URLs with HMAC signing.
|
||||
type WebhookDelivery struct {
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewWebhookDelivery constructs a webhook delivery client.
|
||||
func NewWebhookDelivery() *WebhookDelivery {
|
||||
return &WebhookDelivery{
|
||||
client: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
CheckRedirect: func(*http.Request, []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver signs and POSTs the event payload to the configured URL.
|
||||
func (d *WebhookDelivery) Deliver(ctx context.Context, targetURL, secret string, payload []byte) error {
|
||||
timestamp := time.Now().UTC().Format(time.RFC3339)
|
||||
deliveryID := uuid.New().String()
|
||||
|
||||
// Compute HMAC-SHA256: sign over "timestamp.body".
|
||||
mac := hmac.New(sha256.New, []byte(secret))
|
||||
mac.Write([]byte(timestamp + "." + string(payload)))
|
||||
signature := "sha256=" + hex.EncodeToString(mac.Sum(nil))
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, strings.NewReader(string(payload)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-WRENN-SIGNATURE", signature)
|
||||
req.Header.Set("X-Wrenn-Delivery", deliveryID)
|
||||
req.Header.Set("X-Wrenn-Timestamp", timestamp)
|
||||
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("http post: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("webhook returned %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"os"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
@ -22,6 +23,10 @@ type Config struct {
|
||||
OAuthGitHubClientSecret string
|
||||
OAuthRedirectURL string
|
||||
CPPublicURL string
|
||||
|
||||
// Channels — encryption for channel secrets (AES-256-GCM).
|
||||
EncryptionKeyHex string // WRENN_ENCRYPTION_KEY raw hex string (for validation)
|
||||
EncryptionKey [32]byte // parsed 32-byte key
|
||||
}
|
||||
|
||||
// Load reads configuration from a .env file (if present) and environment variables.
|
||||
@ -30,7 +35,7 @@ func Load() Config {
|
||||
// Best-effort load — missing .env file is fine.
|
||||
_ = godotenv.Load()
|
||||
|
||||
return Config{
|
||||
cfg := Config{
|
||||
DatabaseURL: envOrDefault("DATABASE_URL", "postgres://wrenn:wrenn@localhost:5432/wrenn?sslmode=disable"),
|
||||
RedisURL: envOrDefault("REDIS_URL", "redis://localhost:6379/0"),
|
||||
ListenAddr: envOrDefault("WRENN_CP_LISTEN_ADDR", ":8080"),
|
||||
@ -43,7 +48,18 @@ func Load() Config {
|
||||
OAuthGitHubClientSecret: os.Getenv("OAUTH_GITHUB_CLIENT_SECRET"),
|
||||
OAuthRedirectURL: envOrDefault("OAUTH_REDIRECT_URL", "https://app.wrenn.dev"),
|
||||
CPPublicURL: os.Getenv("CP_PUBLIC_URL"),
|
||||
|
||||
EncryptionKeyHex: os.Getenv("WRENN_ENCRYPTION_KEY"),
|
||||
}
|
||||
|
||||
if cfg.EncryptionKeyHex != "" {
|
||||
b, err := hex.DecodeString(cfg.EncryptionKeyHex)
|
||||
if err == nil && len(b) == 32 {
|
||||
copy(cfg.EncryptionKey[:], b)
|
||||
}
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
func envOrDefault(key, def string) string {
|
||||
|
||||
225
internal/db/channels.sql.go
Normal file
225
internal/db/channels.sql.go
Normal file
@ -0,0 +1,225 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: channels.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const deleteChannelByTeam = `-- name: DeleteChannelByTeam :exec
|
||||
DELETE FROM channels WHERE id = $1 AND team_id = $2
|
||||
`
|
||||
|
||||
type DeleteChannelByTeamParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) DeleteChannelByTeam(ctx context.Context, arg DeleteChannelByTeamParams) error {
|
||||
_, err := q.db.Exec(ctx, deleteChannelByTeam, arg.ID, arg.TeamID)
|
||||
return err
|
||||
}
|
||||
|
||||
const getChannelByTeam = `-- name: GetChannelByTeam :one
|
||||
SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels WHERE id = $1 AND team_id = $2
|
||||
`
|
||||
|
||||
type GetChannelByTeamParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetChannelByTeam(ctx context.Context, arg GetChannelByTeamParams) (Channel, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelByTeam, arg.ID, arg.TeamID)
|
||||
var i Channel
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const insertChannel = `-- name: InsertChannel :one
|
||||
INSERT INTO channels (id, team_id, name, provider, config, event_types)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at
|
||||
`
|
||||
|
||||
type InsertChannelParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
Name string `json:"name"`
|
||||
Provider string `json:"provider"`
|
||||
Config []byte `json:"config"`
|
||||
EventTypes []string `json:"event_types"`
|
||||
}
|
||||
|
||||
func (q *Queries) InsertChannel(ctx context.Context, arg InsertChannelParams) (Channel, error) {
|
||||
row := q.db.QueryRow(ctx, insertChannel,
|
||||
arg.ID,
|
||||
arg.TeamID,
|
||||
arg.Name,
|
||||
arg.Provider,
|
||||
arg.Config,
|
||||
arg.EventTypes,
|
||||
)
|
||||
var i Channel
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const listChannelsByTeam = `-- name: ListChannelsByTeam :many
|
||||
SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels WHERE team_id = $1 ORDER BY created_at DESC
|
||||
`
|
||||
|
||||
func (q *Queries) ListChannelsByTeam(ctx context.Context, teamID pgtype.UUID) ([]Channel, error) {
|
||||
rows, err := q.db.Query(ctx, listChannelsByTeam, teamID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []Channel
|
||||
for rows.Next() {
|
||||
var i Channel
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChannelsForEvent = `-- name: ListChannelsForEvent :many
|
||||
SELECT id, team_id, name, provider, config, event_types, created_at, updated_at FROM channels
|
||||
WHERE team_id = $1
|
||||
AND $2::text = ANY(event_types)
|
||||
ORDER BY created_at
|
||||
`
|
||||
|
||||
type ListChannelsForEventParams struct {
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
EventType string `json:"event_type"`
|
||||
}
|
||||
|
||||
func (q *Queries) ListChannelsForEvent(ctx context.Context, arg ListChannelsForEventParams) ([]Channel, error) {
|
||||
rows, err := q.db.Query(ctx, listChannelsForEvent, arg.TeamID, arg.EventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []Channel
|
||||
for rows.Next() {
|
||||
var i Channel
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const updateChannel = `-- name: UpdateChannel :one
|
||||
UPDATE channels SET name = $3, event_types = $4, updated_at = NOW()
|
||||
WHERE id = $1 AND team_id = $2
|
||||
RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at
|
||||
`
|
||||
|
||||
type UpdateChannelParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
Name string `json:"name"`
|
||||
EventTypes []string `json:"event_types"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateChannel(ctx context.Context, arg UpdateChannelParams) (Channel, error) {
|
||||
row := q.db.QueryRow(ctx, updateChannel,
|
||||
arg.ID,
|
||||
arg.TeamID,
|
||||
arg.Name,
|
||||
arg.EventTypes,
|
||||
)
|
||||
var i Channel
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const updateChannelConfig = `-- name: UpdateChannelConfig :one
|
||||
UPDATE channels SET config = $3, updated_at = NOW()
|
||||
WHERE id = $1 AND team_id = $2
|
||||
RETURNING id, team_id, name, provider, config, event_types, created_at, updated_at
|
||||
`
|
||||
|
||||
type UpdateChannelConfigParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
Config []byte `json:"config"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateChannelConfig(ctx context.Context, arg UpdateChannelConfigParams) (Channel, error) {
|
||||
row := q.db.QueryRow(ctx, updateChannelConfig, arg.ID, arg.TeamID, arg.Config)
|
||||
var i Channel
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.TeamID,
|
||||
&i.Name,
|
||||
&i.Provider,
|
||||
&i.Config,
|
||||
&i.EventTypes,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@ -30,6 +30,17 @@ type AuditLog struct {
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type Channel struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TeamID pgtype.UUID `json:"team_id"`
|
||||
Name string `json:"name"`
|
||||
Provider string `json:"provider"`
|
||||
Config []byte `json:"config"`
|
||||
EventTypes []string `json:"event_types"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type Host struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Type string `json:"type"`
|
||||
|
||||
73
internal/events/event.go
Normal file
73
internal/events/event.go
Normal file
@ -0,0 +1,73 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EventPublisher pushes events onto the notification stream.
|
||||
// Satisfied by *channels.Publisher.
|
||||
type EventPublisher interface {
|
||||
Publish(ctx context.Context, e Event)
|
||||
}
|
||||
|
||||
// ActorKind identifies what initiated an event.
|
||||
type ActorKind string
|
||||
|
||||
const (
|
||||
ActorUser ActorKind = "user"
|
||||
ActorAPIKey ActorKind = "api_key"
|
||||
ActorSystem ActorKind = "system"
|
||||
)
|
||||
|
||||
// Actor describes who triggered an event.
|
||||
type Actor struct {
|
||||
Type ActorKind `json:"type"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
}
|
||||
|
||||
// Resource identifies the object the event relates to.
|
||||
type Resource struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// Event is the canonical notification payload published to the Redis stream
|
||||
// and delivered to channel subscribers.
|
||||
type Event struct {
|
||||
Event string `json:"event"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
TeamID string `json:"team_id"`
|
||||
Actor Actor `json:"actor"`
|
||||
Resource Resource `json:"resource"`
|
||||
}
|
||||
|
||||
// Event type constants.
|
||||
const (
|
||||
CapsuleCreated = "capsule.created"
|
||||
CapsuleRunning = "capsule.running"
|
||||
CapsulePaused = "capsule.paused"
|
||||
CapsuleDestroyed = "capsule.destroyed"
|
||||
SnapshotCreated = "template.snapshot.created"
|
||||
SnapshotDeleted = "template.snapshot.deleted"
|
||||
HostUp = "host.up"
|
||||
HostDown = "host.down"
|
||||
)
|
||||
|
||||
// AllEventTypes is the complete set of valid event type strings.
|
||||
var AllEventTypes = []string{
|
||||
CapsuleCreated,
|
||||
CapsuleRunning,
|
||||
CapsulePaused,
|
||||
CapsuleDestroyed,
|
||||
SnapshotCreated,
|
||||
SnapshotDeleted,
|
||||
HostUp,
|
||||
HostDown,
|
||||
}
|
||||
|
||||
// Now returns the current time formatted for event timestamps.
|
||||
func Now() string {
|
||||
return time.Now().UTC().Format(time.RFC3339)
|
||||
}
|
||||
@ -35,6 +35,7 @@ func NewRefreshTokenID() pgtype.UUID { return newUUID() }
|
||||
func NewAuditLogID() pgtype.UUID { return newUUID() }
|
||||
func NewBuildID() pgtype.UUID { return newUUID() }
|
||||
func NewAdminPermissionID() pgtype.UUID { return newUUID() }
|
||||
func NewChannelID() pgtype.UUID { return newUUID() }
|
||||
|
||||
func NewTemplateID() pgtype.UUID { return newUUID() }
|
||||
|
||||
@ -75,6 +76,7 @@ const (
|
||||
PrefixAuditLog = "log-"
|
||||
PrefixBuild = "bld-"
|
||||
PrefixAdminPermission = "perm-"
|
||||
PrefixChannel = "ch-"
|
||||
)
|
||||
|
||||
// UUIDToBase36 encodes 16 UUID bytes as a 25-char base36 string (0-9a-z).
|
||||
@ -123,6 +125,7 @@ func FormatHostTokenID(id pgtype.UUID) string { return formatUUID(PrefixHostT
|
||||
func FormatRefreshTokenID(id pgtype.UUID) string { return formatUUID(PrefixRefreshToken, id) }
|
||||
func FormatAuditLogID(id pgtype.UUID) string { return formatUUID(PrefixAuditLog, id) }
|
||||
func FormatBuildID(id pgtype.UUID) string { return formatUUID(PrefixBuild, id) }
|
||||
func FormatChannelID(id pgtype.UUID) string { return formatUUID(PrefixChannel, id) }
|
||||
|
||||
// --- Parsing (prefixed string from API/RPC input → pgtype.UUID) ---
|
||||
|
||||
@ -145,6 +148,7 @@ func ParseHostID(s string) (pgtype.UUID, error) { return parseUUID(PrefixHo
|
||||
func ParseHostTokenID(s string) (pgtype.UUID, error) { return parseUUID(PrefixHostToken, s) }
|
||||
func ParseAuditLogID(s string) (pgtype.UUID, error) { return parseUUID(PrefixAuditLog, s) }
|
||||
func ParseBuildID(s string) (pgtype.UUID, error) { return parseUUID(PrefixBuild, s) }
|
||||
func ParseChannelID(s string) (pgtype.UUID, error) { return parseUUID(PrefixChannel, s) }
|
||||
|
||||
// --- Well-known IDs ---
|
||||
|
||||
|
||||
Reference in New Issue
Block a user