autogits/gitea-events-rabbitmq-publisher/rabbitmq.go

127 lines
2.7 KiB
Go
Raw Normal View History

2024-08-13 16:42:20 +02:00
package main
import (
2024-08-14 18:24:02 +02:00
"context"
2024-08-13 16:42:20 +02:00
"crypto/tls"
"encoding/json"
2024-08-15 10:47:59 +02:00
"errors"
2024-08-13 16:42:20 +02:00
"fmt"
"log"
"time"
rabbitmq "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
2024-08-15 10:47:59 +02:00
log.Panicf("%s: %s\n", err, msg)
2024-08-13 16:42:20 +02:00
}
}
type Message struct {
Topic string
2024-08-14 17:43:56 +02:00
Body []byte
2024-08-13 16:42:20 +02:00
}
2024-08-14 17:43:56 +02:00
var messageQueue chan Message
2024-08-27 11:42:48 +02:00
func PublishMessage(giteaOrg, giteaWebhookType, action string, data []byte) error {
2024-08-14 17:43:56 +02:00
if messageQueue == nil {
return fmt.Errorf("Queue not initialized")
}
2024-08-13 16:42:20 +02:00
if !json.Valid(data) {
2024-08-15 10:47:59 +02:00
return errors.New("Invalid JSON in request")
2024-08-13 16:42:20 +02:00
}
2024-08-24 13:32:39 +02:00
var msg Message
if len(action) > 0 {
msg = Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s.%s", giteaOrg, giteaWebhookType, action),
Body: data,
}
} else {
msg = Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType),
Body: data,
}
}
select {
case messageQueue <- msg:
2024-08-15 10:47:59 +02:00
default:
2024-09-10 17:34:16 +02:00
return errors.New("Unable to queue message. Possibly queue full.")
2024-08-13 16:42:20 +02:00
}
2024-08-14 17:43:56 +02:00
return nil
2024-08-13 16:42:20 +02:00
}
func ConnectToExchangeForPublish(host, username, password string) {
defer func() {
if r := recover(); r != nil {
log.Print("recovering... reconnecting...\n")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
}
}()
2024-08-14 17:43:56 +02:00
2024-08-13 16:42:20 +02:00
if messageQueue == nil {
messageQueue = make(chan Message, 10240)
}
auth := ""
if len(username) > 0 && len(password) > 0 {
auth = username + ":" + password + "@"
}
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
ServerName: host,
})
failOnError(err, "Cannot connect to rabbit.opensuse.org")
defer connection.Close()
ch, err := connection.Channel()
failOnError(err, "Cannot create a channel")
defer ch.Close()
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
failOnError(err, "Cannot find pubsub exchange")
for {
msg, ok := <-messageQueue
if !ok {
2024-08-15 10:47:59 +02:00
log.Println("Shutdown ...")
2024-08-14 17:43:56 +02:00
return
}
2024-08-13 16:42:20 +02:00
2024-08-14 18:24:02 +02:00
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{
2024-08-14 18:16:55 +02:00
ContentType: "application/json",
2024-08-15 10:47:59 +02:00
AppId: AppName,
2024-08-14 18:16:55 +02:00
Body: []byte(msg.Body),
Timestamp: time.Now(),
DeliveryMode: 2,
ContentEncoding: "application/json",
2024-08-14 17:43:56 +02:00
})
if err != nil {
2024-08-14 18:24:02 +02:00
if ch.IsClosed() || connection.IsClosed() {
2024-08-14 17:43:56 +02:00
select {
case messageQueue <- msg:
2024-08-19 12:05:43 +02:00
log.Println("requeued ...")
2024-08-14 17:43:56 +02:00
default:
2024-08-15 10:47:59 +02:00
log.Println("queue full... message lost")
2024-08-14 17:43:56 +02:00
}
2024-08-13 16:42:20 +02:00
// reconnect
2024-08-15 10:47:59 +02:00
log.Println("reconnecting...")
2024-08-13 16:42:20 +02:00
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
2024-08-14 17:43:56 +02:00
return
} else {
2024-08-15 10:47:59 +02:00
log.Printf("Error sending request. %v\n", err)
2024-08-13 16:42:20 +02:00
}
}
}
}