autogits/gitea-events-rabbitmq-publisher/rabbitmq.go
2024-08-24 13:32:39 +02:00

139 lines
2.9 KiB
Go

package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"time"
rabbitmq "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s\n", err, msg)
}
}
type Message struct {
Topic string
Body []byte
}
var messageQueue chan Message
func PublicActionMessage(giteaOrg, giteaWebhookType, action string, data []byte) error {
if messageQueue == nil {
return fmt.Errorf("Queue not initialized")
}
if !json.Valid(data) {
return errors.New("Invalid JSON in request")
}
var msg Message
if len(action) > 0 {
msg = Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s.%s", giteaOrg, giteaWebhookType, action),
Body: data,
}
} else {
msg = Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType),
Body: data,
}
}
if len(action) > 0 {
msg.Topic = msg.Topic + "." + action
}
select {
case messageQueue <- msg:
default:
return errors.New("Enable to queue message. Possibly queue full.")
}
return nil
}
func PublishMessage(giteaWebhookType, giteaOrg string, data []byte) error {
return PublicActionMessage(giteaOrg, giteaWebhookType, "", data)
}
func ConnectToExchangeForPublish(host, username, password string) {
defer func() {
if r := recover(); r != nil {
log.Print("recovering... reconnecting...\n")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
}
}()
if messageQueue == nil {
messageQueue = make(chan Message, 10240)
}
auth := ""
if len(username) > 0 && len(password) > 0 {
auth = username + ":" + password + "@"
}
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
ServerName: host,
})
failOnError(err, "Cannot connect to rabbit.opensuse.org")
defer connection.Close()
ch, err := connection.Channel()
failOnError(err, "Cannot create a channel")
defer ch.Close()
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
failOnError(err, "Cannot find pubsub exchange")
for {
msg, ok := <-messageQueue
if !ok {
log.Println("Shutdown ...")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if DebugMode {
log.Println(msg)
}
err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{
ContentType: "application/json",
AppId: AppName,
Body: []byte(msg.Body),
Timestamp: time.Now(),
DeliveryMode: 2,
ContentEncoding: "application/json",
})
if err != nil {
if ch.IsClosed() || connection.IsClosed() {
select {
case messageQueue <- msg:
log.Println("requeued ...")
default:
log.Println("queue full... message lost")
}
// reconnect
log.Println("reconnecting...")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
return
} else {
log.Printf("Error sending request. %v\n", err)
}
}
}
}