forked from wrenn/wrenn
feat: add notification channels with provider integrations and retry
Implement a channels system for notifying teams via external providers
(Discord, Slack, Teams, Google Chat, Telegram, Matrix, webhook) when
lifecycle events occur (capsule/template/host state changes).
- Channel CRUD API under /v1/channels (JWT-only auth)
- Test endpoint to verify config before saving (POST /v1/channels/test)
- Secret rotation endpoint (PUT /v1/channels/{id}/config)
- AES-256-GCM encryption for provider secrets (WRENN_ENCRYPTION_KEY)
- Redis stream event publishing from audit logger
- Background dispatcher with consumer group and retry (10s, 30s)
- Webhook delivery with HMAC-SHA256 signing (X-WRENN-SIGNATURE)
- shoutrrr integration for chat providers
- Secrets never exposed in API responses
This commit is contained in:
183
internal/channels/dispatcher.go
Normal file
183
internal/channels/dispatcher.go
Normal file
@ -0,0 +1,183 @@
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"git.omukk.dev/wrenn/sandbox/internal/db"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/events"
|
||||
"git.omukk.dev/wrenn/sandbox/internal/id"
|
||||
)
|
||||
|
||||
const (
|
||||
groupName = "wrenn-channels-v1"
|
||||
consumerName = "cp-0"
|
||||
)
|
||||
|
||||
// Dispatcher consumes events from the Redis stream and delivers them
|
||||
// to matching notification channels.
|
||||
type Dispatcher struct {
|
||||
rdb *redis.Client
|
||||
db *db.Queries
|
||||
encKey [32]byte
|
||||
webhook *WebhookDelivery
|
||||
}
|
||||
|
||||
// NewDispatcher constructs an event dispatcher.
|
||||
func NewDispatcher(rdb *redis.Client, queries *db.Queries, encKey [32]byte) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
rdb: rdb,
|
||||
db: queries,
|
||||
encKey: encKey,
|
||||
webhook: NewWebhookDelivery(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the consumer goroutine. Returns when ctx is cancelled.
|
||||
func (d *Dispatcher) Start(ctx context.Context) {
|
||||
go d.run(ctx)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) run(ctx context.Context) {
|
||||
// Create consumer group idempotently. "$" means only new messages.
|
||||
err := d.rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "$").Err()
|
||||
if err != nil && !isGroupExistsError(err) {
|
||||
slog.Error("channels: failed to create consumer group", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
streams, err := d.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: groupName,
|
||||
Consumer: consumerName,
|
||||
Streams: []string{streamKey, ">"},
|
||||
Count: 10,
|
||||
Block: 5 * time.Second,
|
||||
}).Result()
|
||||
|
||||
if err != nil {
|
||||
if err == redis.Nil || ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
slog.Warn("channels: xreadgroup error", "error", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, msg := range stream.Messages {
|
||||
d.handleMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) handleMessage(ctx context.Context, msg redis.XMessage) {
|
||||
defer func() {
|
||||
if err := d.rdb.XAck(ctx, streamKey, groupName, msg.ID).Err(); err != nil {
|
||||
slog.Warn("channels: xack failed", "id", msg.ID, "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
payload, ok := msg.Values["payload"].(string)
|
||||
if !ok {
|
||||
slog.Warn("channels: message missing payload", "id", msg.ID)
|
||||
return
|
||||
}
|
||||
|
||||
var event events.Event
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
slog.Warn("channels: failed to unmarshal event", "id", msg.ID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
teamID, err := id.ParseTeamID(event.TeamID)
|
||||
if err != nil {
|
||||
slog.Warn("channels: invalid team ID in event", "team_id", event.TeamID, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
channels, err := d.db.ListChannelsForEvent(ctx, db.ListChannelsForEventParams{
|
||||
TeamID: teamID,
|
||||
EventType: event.Event,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("channels: failed to list channels for event", "event", event.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, ch := range channels {
|
||||
d.dispatch(ctx, ch, event)
|
||||
}
|
||||
}
|
||||
|
||||
// retryDelays defines the wait durations before each retry attempt.
|
||||
var retryDelays = []time.Duration{10 * time.Second, 30 * time.Second}
|
||||
|
||||
func (d *Dispatcher) dispatch(ctx context.Context, ch db.Channel, e events.Event) {
|
||||
config, err := d.decryptConfig(ch.Config)
|
||||
if err != nil {
|
||||
slog.Warn("channels: failed to decrypt config",
|
||||
"channel_id", id.FormatChannelID(ch.ID), "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
chID := id.FormatChannelID(ch.ID)
|
||||
|
||||
if err := Deliver(ctx, ch.Provider, config, e); err != nil {
|
||||
slog.Warn("channels: delivery failed, scheduling retries",
|
||||
"channel_id", chID, "provider", ch.Provider, "error", err)
|
||||
go d.retryDeliver(ctx, ch.Provider, config, e, chID)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) retryDeliver(ctx context.Context, provider string, config map[string]string, e events.Event, chID string) {
|
||||
for i, delay := range retryDelays {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(delay):
|
||||
}
|
||||
|
||||
if err := Deliver(ctx, provider, config, e); err != nil {
|
||||
slog.Warn("channels: retry delivery failed",
|
||||
"channel_id", chID, "provider", provider,
|
||||
"attempt", i+2, "error", err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
slog.Error("channels: delivery failed after all retries",
|
||||
"channel_id", chID, "provider", provider, "event", e.Event)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) decryptConfig(configJSON []byte) (map[string]string, error) {
|
||||
var encrypted map[string]string
|
||||
if err := json.Unmarshal(configJSON, &encrypted); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decrypted := make(map[string]string, len(encrypted))
|
||||
for k, v := range encrypted {
|
||||
plaintext, err := DecryptSecret(d.encKey, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
decrypted[k] = plaintext
|
||||
}
|
||||
return decrypted, nil
|
||||
}
|
||||
|
||||
func isGroupExistsError(err error) bool {
|
||||
return err != nil && err.Error() == "BUSYGROUP Consumer Group name already exists"
|
||||
}
|
||||
Reference in New Issue
Block a user