131 lines
2.8 KiB
Go
131 lines
2.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
rabbitmq "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
func failOnError(err error, msg string) {
|
|
if err != nil {
|
|
log.Panicf("%s: %s\n", err, msg)
|
|
}
|
|
}
|
|
|
|
type Message struct {
|
|
Topic string
|
|
Body []byte
|
|
}
|
|
|
|
var messageQueue chan Message
|
|
|
|
func PublishActionMessage(giteaOrg, giteaWebhookType, action string, data []byte) error {
|
|
if messageQueue == nil {
|
|
return fmt.Errorf("Queue not initialized")
|
|
}
|
|
|
|
if !json.Valid(data) {
|
|
return errors.New("Invalid JSON in request")
|
|
}
|
|
|
|
var msg Message
|
|
if len(action) > 0 {
|
|
msg = Message{
|
|
Topic: fmt.Sprintf("opensuse.gitea.%s.%s.%s", giteaOrg, giteaWebhookType, action),
|
|
Body: data,
|
|
}
|
|
} else {
|
|
msg = Message{
|
|
Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType),
|
|
Body: data,
|
|
}
|
|
}
|
|
|
|
select {
|
|
case messageQueue <- msg:
|
|
default:
|
|
return errors.New("Enable to queue message. Possibly queue full.")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func PublishMessage(giteaOrg, giteaWebhookType string, data []byte) error {
|
|
return PublishActionMessage(giteaOrg, giteaWebhookType, "", data)
|
|
}
|
|
|
|
func ConnectToExchangeForPublish(host, username, password string) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Print("recovering... reconnecting...\n")
|
|
time.Sleep(5 * time.Second)
|
|
go ConnectToExchangeForPublish(host, username, password)
|
|
}
|
|
}()
|
|
|
|
if messageQueue == nil {
|
|
messageQueue = make(chan Message, 10240)
|
|
}
|
|
|
|
auth := ""
|
|
if len(username) > 0 && len(password) > 0 {
|
|
auth = username + ":" + password + "@"
|
|
}
|
|
|
|
connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{
|
|
ServerName: host,
|
|
})
|
|
failOnError(err, "Cannot connect to rabbit.opensuse.org")
|
|
defer connection.Close()
|
|
|
|
ch, err := connection.Channel()
|
|
failOnError(err, "Cannot create a channel")
|
|
defer ch.Close()
|
|
|
|
err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil)
|
|
failOnError(err, "Cannot find pubsub exchange")
|
|
|
|
for {
|
|
msg, ok := <-messageQueue
|
|
if !ok {
|
|
log.Println("Shutdown ...")
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{
|
|
ContentType: "application/json",
|
|
AppId: AppName,
|
|
Body: []byte(msg.Body),
|
|
Timestamp: time.Now(),
|
|
DeliveryMode: 2,
|
|
ContentEncoding: "application/json",
|
|
})
|
|
|
|
if err != nil {
|
|
if ch.IsClosed() || connection.IsClosed() {
|
|
select {
|
|
case messageQueue <- msg:
|
|
log.Println("requeued ...")
|
|
default:
|
|
log.Println("queue full... message lost")
|
|
}
|
|
// reconnect
|
|
log.Println("reconnecting...")
|
|
time.Sleep(5 * time.Second)
|
|
go ConnectToExchangeForPublish(host, username, password)
|
|
return
|
|
} else {
|
|
log.Printf("Error sending request. %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
}
|