autogits/obs-staging-bot/rabbitmq.go

102 lines
2.4 KiB
Go
Raw Normal View History

2024-07-17 17:20:24 +02:00
package main
import (
"crypto/tls"
"encoding/json"
"log"
"strings"
"time"
rabbitmq "github.com/rabbitmq/amqp091-go"
)
type BuildNotification struct {
BuildSuccess bool
Project, Package, Repository, Arch, Release, Rev, Buildtype, Workerid string
Starttime, Endtime, Readytime uint64
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", err, msg)
}
}
func processObsMessage(msg *rabbitmq.Delivery) {
key := strings.SplitN(msg.RoutingKey, ".", 4)
if len(key) != 4 || len(key[3]) < 7 || key[3][:6] != "build_" {
return
}
buildSuccess := false
switch key[3][6:] {
case "success", "unchanged":
buildSuccess = true
case "fail":
buildSuccess = false
default:
log.Printf("unknown build_ logging message: %s\n", msg.RoutingKey)
return
}
notification := &BuildNotification{
BuildSuccess: buildSuccess,
}
err := json.Unmarshal(msg.Body, notification)
if err != nil {
log.Printf("Cannot unmarshall json object: %s\n", msg.Body)
return
}
log.Printf("%v\n", notification)
}
func startProcessingObsMessages(host, username, password string) {
auth := ""
if len(username) > 0 && len(password) > 0 {
auth = username + ":" + password + "@"
}
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
ServerName: host,
})
failOnError(err, "Cannot connect to rabbit.opensuse.org")
defer connection.Close()
ch, err := connection.Channel()
failOnError(err, "Cannot create a channel")
defer ch.Close()
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
failOnError(err, "Cannot find pubsub exchange")
q, err := ch.QueueDeclare("", false, true, false, false, nil)
failOnError(err, "Cannot declare queue")
log.Printf("queue: %s:%d", q.Name, q.Consumers)
err = ch.QueueBind(q.Name, "*.obs.package.*", "pubsub", false, nil)
failOnError(err, "Cannot bind queue to exchange")
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
failOnError(err, "Cannot start consumer")
log.Printf("queue: %s:%d", q.Name, q.Consumers)
// TODO: add recovery for NotifyClose and related
for {
msg, ok := <-msgs
if !ok {
log.Printf("channel closed?\n")
if connection.IsClosed() {
// reconnect
log.Printf("reconnecting...")
time.Sleep(5 * time.Second)
go startProcessingObsMessages(host, username, password)
}
return
}
processObsMessage(&msg)
}
}