package common /* * This file is part of Autogits. * * Copyright © 2024 SUSE LLC * * Autogits is free software: you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free Software * Foundation, either version 2 of the License, or (at your option) any later * version. * * Autogits is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along with * Foobar. If not, see . */ import ( "crypto/tls" "fmt" "log" "net/url" "slices" "strings" "time" rabbitmq "github.com/rabbitmq/amqp091-go" ) const RequestType_CreateBrachTag = "create" const RequestType_DeleteBranchTag = "delete" const RequestType_Fork = "fork" const RequestType_Issue = "issues" const RequestType_IssueAssign = "issue_assign" const RequestType_IssueComment = "issue_comment" const RequestType_IssueLabel = "issue_label" const RequestType_IssueMilestone = "issue_milestone" const RequestType_Push = "push" const RequestType_Repository = "repository" const RequestType_Release = "release" const RequestType_PR = "pull_request" const RequestType_PRAssign = "pull_request_assign" const RequestType_PRLabel = "pull_request_label" const RequestType_PRComment = "pull_request_comment" const RequestType_PRMilestone = "pull_request_milestone" const RequestType_PRSync = "pull_request_sync" const RequestType_PRReviewAccepted = "pull_request_review_approved" const RequestType_PRReviewRejected = "pull_request_review_rejected" const RequestType_PRReviewRequest = "pull_request_review_request" const RequestType_PRReviewComment = "pull_request_review_comment" const RequestType_Wiki = "wiki" type RequestProcessor func(*RequestHandler) error type ListenDefinitions struct { RabbitURL string // amqps://user:password@host/queue GitAuthor string Handlers map[string]RequestProcessor } type RabbitMessage rabbitmq.Delivery func processRabbitMQ(msgCh chan<- RabbitMessage, server url.URL, topics []string) error { queueName := server.Path server.Path = "" if len(queueName) > 0 && queueName[0] == '/' { queueName = queueName[1:] } connection, err := rabbitmq.DialTLS(server.String(), &tls.Config{ ServerName: server.Hostname(), }) if err != nil { return fmt.Errorf("Cannot connect to %s . Err: %w", server.Hostname(), err) } defer connection.Close() ch, err := connection.Channel() if err != nil { return fmt.Errorf("Cannot create a channel. Err: %w", err) } defer ch.Close() if err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil { return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err) } var q rabbitmq.Queue if len(queueName) == 0 { q, err = ch.QueueDeclare("", false, true, true, false, nil) } else { q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil) if err != nil { log.Printf("queue not found .. trying to create it: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() if err != nil { return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } } q, err = ch.QueueDeclare(queueName, true, false, true, false, nil) if err != nil { log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } q, err = ch.QueueDeclare("", false, true, true, false, nil) } } } if err != nil { return fmt.Errorf("Cannot declare queue. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) log.Println(" -- listening to topics:") for _, topic := range topics { err = ch.QueueBind(q.Name, topic, "pubsub", false, nil) log.Println(" +", topic) if err != nil { return fmt.Errorf("Cannot find queue to exchange with topic %s. Err: %w", topic, err) } } msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil) if err != nil { return fmt.Errorf("Cannot start consumer. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) for { msg, ok := <-msgs if !ok { return fmt.Errorf("channel/connection closed?\n") } msgCh <- RabbitMessage(msg) } } func connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage, server url.URL, topics []string) { defer func() { if r := recover(); r != nil { log.Println(r) log.Println("'crash' RabbitMQ worker. Recovering... reconnecting...") time.Sleep(5 * time.Second) go connectAndProcessRabbitMQ(log, ch, server, topics) } }() for { err := processRabbitMQ(ch, server, topics) if err != nil { log.Printf("Error in RabbitMQ connection. %#v", err) log.Println("Reconnecting in 2 seconds...") time.Sleep(2 * time.Second) } } } func connectToRabbitMQ(log *log.Logger, server url.URL, topics []string) chan RabbitMessage { ch := make(chan RabbitMessage, 100) go connectAndProcessRabbitMQ(log, ch, server, topics) return ch } func ProcessEvent(f RequestProcessor, h *RequestHandler) { defer func() { if r := recover(); r != nil { log.Println(r) } }() if err := f(h); err != nil { log.Println(err) } } func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error { server, err := url.Parse(listenDefs.RabbitURL) if err != nil { log.Panicf("cannot parse server URL. Err: %#v\n", err) } log.Println("RabbitMQ connection:", *server) topics := make([]string, 0, len(listenDefs.Handlers)*len(orgs)) log.Println(len(listenDefs.Handlers), len(orgs)) server.User = url.UserPassword(rabbitUser, rabbitPassword) scope := "suse" if server.Hostname() == "rabbit.opensuse.org" { scope = "opensuse" } for _, org := range orgs { for k := range listenDefs.Handlers { topics = append(topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, k)) } } slices.Sort(topics) topics = slices.Compact(topics) ch := connectToRabbitMQ(log.Default(), *server, topics) for { msg, ok := <-ch if !ok { return nil } log.Println("event:", msg.RoutingKey) route := strings.Split(msg.RoutingKey, ".") if len(route) > 3 { reqType := route[3] org := route[2] if !slices.Contains(orgs, org) { log.Println("Got even for unhandeled org:", org) continue } log.Println("org:", org, "type:", reqType) if handler, found := listenDefs.Handlers[reqType]; found { h, err := CreateRequestHandler() if err != nil { log.Println("Cannot create request handler", err) continue } req, err := ParseRequestJSON(reqType, msg.Body) if err != nil { log.Println("Error parsing request JSON:", err) continue } else { log.Println("processing req", req.Type) h.Request = req ProcessEvent(handler, h) } } } } }