package main import ( "crypto/tls" "encoding/json" "fmt" "io" "log" "time" _ "github.com/mattn/go-sqlite3" rabbitmq "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", err, msg) } } type Message struct { Topic string Body []byte } var messageQueue chan Message func PublicMessage(giteaWebhookType, giteaOrg string, msgBody io.Reader) error { if messageQueue == nil { return fmt.Errorf("Queue not initialized") } data, err := io.ReadAll(msgBody) if err != nil { return fmt.Errorf("Error reading JSON data. Err: %v", err) } if !json.Valid(data) { return fmt.Errorf("Invalid JSON in request") } messageQueue <- Message{ Topic: fmt.Sprintf("opensuse.gitea.%s.%s", giteaOrg, giteaWebhookType), Body: data, } return nil } func ConnectToExchangeForPublish(host, username, password string) { defer func() { if r := recover(); r != nil { log.Print("recovering... reconnecting...\n") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) } }() if messageQueue == nil { messageQueue = make(chan Message, 10240) } auth := "" if len(username) > 0 && len(password) > 0 { auth = username + ":" + password + "@" } connection, err := rabbitmq.DialTLS("amqps://"+auth+host, &tls.Config{ ServerName: host, }) failOnError(err, "Cannot connect to rabbit.opensuse.org") defer connection.Close() ch, err := connection.Channel() failOnError(err, "Cannot create a channel") defer ch.Close() err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil) failOnError(err, "Cannot find pubsub exchange") for { msg, ok := <-messageQueue if !ok { log.Printf("Shutdown ... \n") return } err = ch.Publish("pubsub", msg.Topic, false, false, rabbitmq.Publishing{ ContentType: "application/json", AppId: RabbitForwarderPath, Body: []byte(msg.Body), Timestamp: time.Now(), DeliveryMode: 2, ContentEncoding: "application/json", }) if err != nil { if connection.IsClosed() { select { case messageQueue <- msg: log.Printf("requed ...") default: log.Printf("queue full... message lost") } // reconnect log.Printf("reconnecting...") time.Sleep(5 * time.Second) go ConnectToExchangeForPublish(host, username, password) return } else { log.Printf("Error sending request. %v", err) } } } }