autogits/gitea-events-rabbitmq-publisher/rabbitmq.go
2024-08-14 17:43:56 +02:00

112 lines
2.3 KiB
Go

package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"time"
_ "github.com/mattn/go-sqlite3"
rabbitmq "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", err, msg)
}
}
type Message struct {
Topic string
Body []byte
}
var messageQueue chan Message
func PublicMessage(giteaWebhookType, giteaOrg string, msgBody io.Reader) error {
if messageQueue == nil {
return fmt.Errorf("Queue not initialized")
}
data, err := io.ReadAll(msgBody)
if err != nil {
return fmt.Errorf("Error reading JSON data. Err: %v", err)
}
if !json.Valid(data) {
return fmt.Errorf("Invalid JSON in request")
}
messageQueue <- Message{
Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType),
Body: data,
}
return nil
}
func ConnectToExchangeForPublish(host, username, password string) {
defer func() {
if r := recover(); r != nil {
log.Print("recovering... reconnecting...\n")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
}
}()
if messageQueue == nil {
messageQueue = make(chan Message, 10240)
}
auth := ""
if len(username) > 0 && len(password) > 0 {
auth = username + ":" + password + "@"
}
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
ServerName: host,
})
failOnError(err, "Cannot connect to rabbit.opensuse.org")
defer connection.Close()
ch, err := connection.Channel()
failOnError(err, "Cannot create a channel")
defer ch.Close()
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
failOnError(err, "Cannot find pubsub exchange")
for {
msg, ok := <-messageQueue
if !ok {
log.Printf("Shutdown ... \n")
return
}
err = ch.Publish("pubsub", msg.Topic, false, false, rabbitmq.Publishing{
ContentType: "application/json",
AppId: RabbitForwarderPath,
Body: []byte(msg.Body),
})
if err != nil {
if connection.IsClosed() {
select {
case messageQueue <- msg:
log.Printf("requed ...")
default:
log.Printf("queue full... message lost")
}
// reconnect
log.Printf("reconnecting...")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
return
} else {
log.Printf("Error sending request. %v", err)
}
}
}
}