package common /* * This file is part of Autogits. * * Copyright © 2024 SUSE LLC * * Autogits is free software: you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation, either version 2 of the License, or (at your option) any later * version. * * Autogits is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with * Foobar. If not, see . */ import ( "crypto/tls" "fmt" "net/url" "runtime/debug" "slices" "strings" "time" rabbitmq "github.com/rabbitmq/amqp091-go" ) const RequestType_CreateBrachTag = "create" const RequestType_DeleteBranchTag = "delete" const RequestType_Fork = "fork" const RequestType_Issue = "issues" const RequestType_IssueAssign = "issue_assign" const RequestType_IssueComment = "issue_comment" const RequestType_IssueLabel = "issue_label" const RequestType_IssueMilestone = "issue_milestone" const RequestType_Push = "push" const RequestType_Repository = "repository" const RequestType_Release = "release" const RequestType_PR = "pull_request" const RequestType_PRAssign = "pull_request_assign" const RequestType_PRLabel = "pull_request_label" const RequestType_PRComment = "pull_request_comment" const RequestType_PRMilestone = "pull_request_milestone" const RequestType_PRSync = "pull_request_sync" const RequestType_PRReviewAccepted = "pull_request_review_approved" const RequestType_PRReviewRejected = "pull_request_review_rejected" const RequestType_PRReviewRequest = "pull_request_review_request" const RequestType_PRReviewComment = "pull_request_review_comment" const RequestType_Wiki = "wiki" type RequestProcessor interface { ProcessFunc(*Request) error } type ListenDefinitions struct { RabbitURL *url.URL // amqps://user:password@host/queue GitAuthor string Handlers map[string]RequestProcessor Orgs []string topics []string topicSubChanges chan string // +topic = subscribe, -topic = unsubscribe } type RabbitMessage rabbitmq.Delivery func (l *ListenDefinitions) processTopicChanges(ch *rabbitmq.Channel, queueName string) { for { topic, ok := <-l.topicSubChanges if !ok { return } LogDebug(" topic change:", topic) switch topic[0] { case '+': if err := ch.QueueBind(queueName, topic[1:], "pubsub", false, nil); err != nil { LogError(err) } case '-': if err := ch.QueueUnbind(queueName, topic[1:], "pubsub", nil); err != nil { LogError(err) } default: LogInfo("Ignoring unknown topic change:", topic) } } } func (l *ListenDefinitions) processRabbitMQ(msgCh chan<- RabbitMessage) error { queueName := l.RabbitURL.Path l.RabbitURL.Path = "" if len(queueName) > 0 && queueName[0] == '/' { queueName = queueName[1:] } connection, err := rabbitmq.DialTLS(l.RabbitURL.String(), &tls.Config{ ServerName: l.RabbitURL.Hostname(), }) if err != nil { return fmt.Errorf("Cannot connect to %s . Err: %w", l.RabbitURL.Hostname(), err) } defer connection.Close() ch, err := connection.Channel() if err != nil { return fmt.Errorf("Cannot create a channel. Err: %w", err) } defer ch.Close() if err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil { return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err) } var q rabbitmq.Queue if len(queueName) == 0 { q, err = ch.QueueDeclare("", false, true, true, false, nil) } else { q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil) if err != nil { LogInfo("queue not found .. trying to create it:", err) if ch.IsClosed() { ch, err = connection.Channel() if err != nil { return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } } q, err = ch.QueueDeclare(queueName, true, false, true, false, nil) if err != nil { LogInfo("can't create persistent queue ... falling back to temporaty queue:", err) if ch.IsClosed() { ch, err = connection.Channel() return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } q, err = ch.QueueDeclare("", false, true, true, false, nil) } } } if err != nil { return fmt.Errorf("Cannot declare queue. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) LogDebug(" -- listening to topics:") l.topicSubChanges = make(chan string) defer close(l.topicSubChanges) go l.processTopicChanges(ch, q.Name) for _, topic := range l.topics { l.topicSubChanges <- "+" + topic } msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil) if err != nil { return fmt.Errorf("Cannot start consumer. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) for { msg, ok := <-msgs if !ok { return fmt.Errorf("channel/connection closed?\n") } msgCh <- RabbitMessage(msg) } } func (l *ListenDefinitions) connectAndProcessRabbitMQ(ch chan<- RabbitMessage) { defer func() { if r := recover(); r != nil { LogError(r) LogError("'crash' RabbitMQ worker. Recovering... reconnecting...") time.Sleep(5 * time.Second) go l.connectAndProcessRabbitMQ(ch) } }() for { err := l.processRabbitMQ(ch) if err != nil { LogError("Error in RabbitMQ connection. %#v", err) LogInfo("Reconnecting in 2 seconds...") time.Sleep(2 * time.Second) } } } func (l *ListenDefinitions) connectToRabbitMQ() chan RabbitMessage { ch := make(chan RabbitMessage, 100) go l.connectAndProcessRabbitMQ(ch) return ch } func ProcessEvent(f RequestProcessor, request *Request) { defer func() { if r := recover(); r != nil { LogError("panic caught") if err, ok := r.(error); !ok { LogError(err) } LogError(string(debug.Stack())) } }() if err := f.ProcessFunc(request); err != nil { LogError(err) } } func (l *ListenDefinitions) generateTopics() []string { topics := make([]string, 0, len(l.Handlers)*len(l.Orgs)) scope := "suse" if l.RabbitURL.Hostname() == "rabbit.opensuse.org" { scope = "opensuse" } for _, org := range l.Orgs { for requestType, _ := range l.Handlers { topics = append(topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, requestType)) } } slices.Sort(topics) return slices.Compact(topics) } func (l *ListenDefinitions) UpdateTopics() { newTopics := l.generateTopics() j := 0 next_new_topic: for i := 0; i < len(newTopics); i++ { topic := newTopics[i] for j < len(l.topics) { cmp := strings.Compare(topic, l.topics[j]) if cmp == 0 { j++ continue next_new_topic } if cmp < 0 { l.topicSubChanges <- "+" + topic break } l.topicSubChanges <- "-" + l.topics[j] j++ } if j == len(l.topics) { l.topicSubChanges <- "+" + topic } } for j < len(l.topics) { l.topicSubChanges <- "-" + l.topics[j] j++ } l.topics = newTopics } func (l *ListenDefinitions) ProcessRabbitMQEvents() error { LogInfo("RabbitMQ connection:", l.RabbitURL.String()) LogDebug("# Handlers:", len(l.Handlers)) LogDebug("# Orgs:", len(l.Orgs)) l.RabbitURL.User = url.UserPassword(rabbitUser, rabbitPassword) l.topics = l.generateTopics() ch := l.connectToRabbitMQ() for { msg, ok := <-ch if !ok { return nil } LogDebug("event:", msg.RoutingKey) route := strings.Split(msg.RoutingKey, ".") if len(route) > 3 { reqType := route[3] org := route[2] if !slices.Contains(l.Orgs, org) { LogInfo("Got event for unhandeled org:", org) continue } LogDebug("org:", org, "type:", reqType) if handler, found := l.Handlers[reqType]; found { /* h, err := CreateRequestHandler() if err != nil { log.Println("Cannot create request handler", err) continue } */ req, err := ParseRequestJSON(reqType, msg.Body) if err != nil { LogError("Error parsing request JSON:", err) continue } else { LogDebug("processing req", req.Type) // h.Request = req ProcessEvent(handler, req) } } } } }