package main import ( "crypto/tls" "encoding/json" "fmt" "io" "log" "time" _ "github.com/mattn/go-sqlite3" rabbitmq "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", err, msg) } } type Message struct { Topic string Body string } var messageQueue chan Messag func PublicMessage(giteaWebhookType string, msgBody io.Reader) error { data, err := io.ReadAll(msgBody) if !json.Valid(data) { return fmt.Errorf("Invalid JSON in request") } messageQueue <- Message { Topic: "opensuse.gitea." + giteaWebhookType + "." + } } func ConnectToExchangeForPublish(host, username, password string) { defer func() { if r := recover(); r != nil { log.Print("recovering... reconnecting...\n") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) } }() if messageQueue == nil { messageQueue = make(chan Message, 10240) } auth := "" if len(username) > 0 && len(password) > 0 { auth = username + ":" + password + "@" } connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{ ServerName: host, }) failOnError(err, "Cannot connect to rabbit.opensuse.org") defer connection.Close() ch, err := connection.Channel() failOnError(err, "Cannot create a channel") defer ch.Close() err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil) failOnError(err, "Cannot find pubsub exchange") for { msg, ok := <-messageQueue if !ok { log.Printf("channel/connection closed?\n") if connection.IsClosed() { // reconnect log.Printf("reconnecting...") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) } return } ch.Publish("pubsub", msg.Topic, false, false, rabbitmq.Publishing { ContentType: "application/json", AppId: RabbitForwarderPath, Body: []byte(msg.Body), }) } }