autogits/gitea-events-rabbitmq-publisher/rabbitmq.go
2024-09-10 18:24:41 +02:00

146 lines
3.5 KiB
Go

package main
/*
* This file is part of Autogits.
*
* Copyright © 2024 SUSE LLC
*
* Autogits is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 2 of the License, or (at your option) any later
* version.
*
* Autogits is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Foobar. If not, see <https://www.gnu.org/licenses/>.
*/
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"time"
rabbitmq "github.com/rabbitmq/amqp091-go"
"src.opensuse.org/autogits/common"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s\n", err, msg)
}
}
type Message struct {
Topic string
Body []byte
}
var messageQueue chan Message
func PublishMessage(giteaOrg, giteaWebhookType, action string, data []byte) error {
if messageQueue == nil {
return fmt.Errorf("Queue not initialized")
}
if !json.Valid(data) {
return errors.New("Invalid JSON in request")
}
var msg Message
if len(action) > 0 {
msg = Message{
Topic: fmt.Sprintf("%s.%s.%s.%s.%s", topicScope, common.TopicApp, giteaOrg, giteaWebhookType, action),
Body: data,
}
} else {
msg = Message{
Topic: fmt.Sprintf("%s.%s.%s.%s", topicScope, common.TopicApp, giteaOrg, giteaWebhookType),
Body: data,
}
}
select {
case messageQueue <- msg:
default:
return errors.New("Unable to queue message. Possibly queue full.")
}
return nil
}
func ConnectToExchangeForPublish(host, username, password string) {
defer func() {
if r := recover(); r != nil {
log.Print("recovering... reconnecting...\n")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
}
}()
if messageQueue == nil {
messageQueue = make(chan Message, 10240)
}
auth := ""
if len(username) > 0 && len(password) > 0 {
auth = username + ":" + password + "@"
}
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
ServerName: host,
})
failOnError(err, "Cannot connect to rabbit.opensuse.org")
defer connection.Close()
ch, err := connection.Channel()
failOnError(err, "Cannot create a channel")
defer ch.Close()
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
failOnError(err, "Cannot find pubsub exchange")
for {
msg, ok := <-messageQueue
if !ok {
log.Println("Shutdown ...")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{
ContentType: "application/json",
AppId: AppName,
Body: []byte(msg.Body),
Timestamp: time.Now(),
DeliveryMode: 2,
ContentEncoding: "application/json",
})
if err != nil {
if ch.IsClosed() || connection.IsClosed() {
select {
case messageQueue <- msg:
log.Println("requeued ...")
default:
log.Println("queue full... message lost")
}
// reconnect
log.Println("reconnecting...")
time.Sleep(5 * time.Second)
go ConnectToExchangeForPublish(host, username, password)
return
} else {
log.Printf("Error sending request. %v\n", err)
}
}
}
}