package main import ( "context" "crypto/tls" "encoding/json" "errors" "fmt" "log" "time" rabbitmq "github.com/rabbitmq/amqp091-go" "src.opensuse.org/autogits/common" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s\n", err, msg) } } type Message struct { Topic string Body []byte } var messageQueue chan Message func PublishMessage(giteaOrg, giteaWebhookType, action string, data []byte) error { if messageQueue == nil { return fmt.Errorf("Queue not initialized") } if !json.Valid(data) { return errors.New("Invalid JSON in request") } var msg Message if len(action) > 0 { msg = Message{ Topic: fmt.Sprintf("%s.%s.%s.%s.%s", topicScope, common.TopicApp, giteaOrg, giteaWebhookType, action), Body: data, } } else { msg = Message{ Topic: fmt.Sprintf("%s.%s.%s.%s", topicScope, common.TopicApp, giteaOrg, giteaWebhookType), Body: data, } } select { case messageQueue <- msg: default: return errors.New("Unable to queue message. Possibly queue full.") } return nil } func ConnectToExchangeForPublish(host, username, password string) { defer func() { if r := recover(); r != nil { log.Print("recovering... reconnecting...\n") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) } }() if messageQueue == nil { messageQueue = make(chan Message, 10240) } auth := "" if len(username) > 0 && len(password) > 0 { auth = username + ":" + password + "@" } connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{ ServerName: host, }) failOnError(err, "Cannot connect to rabbit.opensuse.org") defer connection.Close() ch, err := connection.Channel() failOnError(err, "Cannot create a channel") defer ch.Close() err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil) failOnError(err, "Cannot find pubsub exchange") for { msg, ok := <-messageQueue if !ok { log.Println("Shutdown ...") return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err = ch.PublishWithContext(ctx, "pubsub", msg.Topic, false, false, rabbitmq.Publishing{ ContentType: "application/json", AppId: AppName, Body: []byte(msg.Body), Timestamp: time.Now(), DeliveryMode: 2, ContentEncoding: "application/json", }) if err != nil { if ch.IsClosed() || connection.IsClosed() { select { case messageQueue <- msg: log.Println("requeued ...") default: log.Println("queue full... message lost") } // reconnect log.Println("reconnecting...") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) return } else { log.Printf("Error sending request. %v\n", err) } } } }