package common /* * This file is part of Autogits. * * Copyright © 2024 SUSE LLC * * Autogits is free software: you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation, either version 2 of the License, or (at your option) any later * version. * * Autogits is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with * Foobar. If not, see . */ import ( "crypto/tls" "fmt" "net/url" "strings" "time" rabbitmq "github.com/rabbitmq/amqp091-go" ) type RabbitConnection struct { RabbitURL *url.URL // amqps://user:password@host/queue queueName string ch *rabbitmq.Channel topics []string topicSubChanges chan string // +topic = subscribe, -topic = unsubscribe } type RabbitProcessor interface { GenerateTopics() []string Connection() *RabbitConnection ProcessRabbitMessage(msg RabbitMessage) error } type RabbitMessage rabbitmq.Delivery func (l *RabbitConnection) ProcessTopicChanges() { for { topic, ok := <-l.topicSubChanges if !ok { return } LogDebug(" topic change:", topic) switch topic[0] { case '+': if err := l.ch.QueueBind(l.queueName, topic[1:], "pubsub", false, nil); err != nil { LogError(err) } case '-': if err := l.ch.QueueUnbind(l.queueName, topic[1:], "pubsub", nil); err != nil { LogError(err) } default: LogInfo("Ignoring unknown topic change:", topic) } } } func (l *RabbitConnection) ProcessRabbitMQ(msgCh chan<- RabbitMessage) error { queueName := l.RabbitURL.Path l.RabbitURL.Path = "" if len(queueName) > 0 && queueName[0] == '/' { queueName = queueName[1:] } connection, err := rabbitmq.DialTLS(l.RabbitURL.String(), &tls.Config{ ServerName: l.RabbitURL.Hostname(), }) if err != nil { return fmt.Errorf("Cannot connect to %s . Err: %w", l.RabbitURL.Hostname(), err) } defer connection.Close() l.ch, err = connection.Channel() if err != nil { return fmt.Errorf("Cannot create a channel. Err: %w", err) } defer l.ch.Close() if err = l.ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil { return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err) } var q rabbitmq.Queue if len(queueName) == 0 { q, err = l.ch.QueueDeclare("", false, true, true, false, nil) } else { q, err = l.ch.QueueDeclarePassive(queueName, true, false, true, false, nil) if err != nil { LogInfo("queue not found .. trying to create it:", err) if l.ch.IsClosed() { l.ch, err = connection.Channel() if err != nil { return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } } q, err = l.ch.QueueDeclare(queueName, true, false, true, false, nil) if err != nil { LogInfo("can't create persistent queue ... falling back to temporaty queue:", err) if l.ch.IsClosed() { l.ch, err = connection.Channel() return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } q, err = l.ch.QueueDeclare("", false, true, true, false, nil) } } } if err != nil { return fmt.Errorf("Cannot declare queue. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) LogDebug(" -- listening to topics:") l.topicSubChanges = make(chan string) defer close(l.topicSubChanges) go l.ProcessTopicChanges() for _, topic := range l.topics { l.topicSubChanges <- "+" + topic } msgs, err := l.ch.Consume(q.Name, "", true, true, false, false, nil) if err != nil { return fmt.Errorf("Cannot start consumer. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) for { msg, ok := <-msgs if !ok { return fmt.Errorf("channel/connection closed?\n") } msgCh <- RabbitMessage(msg) } } func (l *RabbitConnection) ConnectAndProcessRabbitMQ(ch chan<- RabbitMessage) { defer func() { if r := recover(); r != nil { LogError(r) LogError("'crash' RabbitMQ worker. Recovering... reconnecting...") time.Sleep(5 * time.Second) go l.ConnectAndProcessRabbitMQ(ch) } }() for { err := l.ProcessRabbitMQ(ch) if err != nil { LogError("Error in RabbitMQ connection. %#v", err) LogInfo("Reconnecting in 2 seconds...") time.Sleep(2 * time.Second) } } } func (l *RabbitConnection) ConnectToRabbitMQ(processor RabbitProcessor) <-chan RabbitMessage { LogInfo("RabbitMQ connection:", l.RabbitURL.String()) l.RabbitURL.User = url.UserPassword(rabbitUser, rabbitPassword) l.topics = processor.GenerateTopics() ch := make(chan RabbitMessage, 100) go l.ConnectAndProcessRabbitMQ(ch) return ch } func (l *RabbitConnection) UpdateTopics(processor RabbitProcessor) { newTopics := processor.GenerateTopics() j := 0 next_new_topic: for i := 0; i < len(newTopics); i++ { topic := newTopics[i] for j < len(l.topics) { cmp := strings.Compare(topic, l.topics[j]) if cmp == 0 { j++ continue next_new_topic } if cmp < 0 { l.topicSubChanges <- "+" + topic break } l.topicSubChanges <- "-" + l.topics[j] j++ } if j == len(l.topics) { l.topicSubChanges <- "+" + topic } } for j < len(l.topics) { l.topicSubChanges <- "-" + l.topics[j] j++ } l.topics = newTopics } func ProcessRabbitMQEvents(processor RabbitProcessor) error { ch := processor.Connection().ConnectToRabbitMQ(processor) for { msg, ok := <-ch if !ok { return nil } LogDebug("event:", msg.RoutingKey) if err := processor.ProcessRabbitMessage(msg); err != nil { LogError("Error processing", msg.RoutingKey, err) } } }