package common import ( "crypto/tls" "fmt" "log" "net/url" "strings" "time" rabbitmq "github.com/rabbitmq/amqp091-go" ) const RequestType_CreateBrachTag = "create" const RequestType_DeleteBranchTag = "delete" const RequestType_Fork = "fork" const RequestType_Issue = "issues" const RequestType_IssueAssign = "issue_assign" const RequestType_IssueComment = "issue_comment" const RequestType_IssueLabel = "issue_label" const RequestType_IssueMilestone = "issue_milestone" const RequestType_Push = "push" const RequestType_Repository = "repository" const RequestType_Release = "release" const RequestType_PR = "pull_request" const RequestType_PRAssign = "pull_request_assign" const RequestType_PRLabel = "pull_request_label" const RequestType_PRComment = "pull_request_comment" const RequestType_PRMilestone = "pull_request_milestone" const RequestType_PRSync = "pull_request_sync" const RequestType_PRReviewAccepted = "pull_request_review_approved" const RequestType_PRReviewRejected = "pull_request_review_rejected" const RequestType_PRReviewRequest = "pull_request_review_request" const RequestType_Wiki = "wiki" type RequestProcessor func(*RequestHandler) error type ListenDefinitions struct { RabbitURL string // amqps://user:password@host/queue GitAuthor string Handlers map[string]RequestProcessor } type RabbitMessage rabbitmq.Delivery func processRabbitMQ(msgCh chan<- RabbitMessage, server url.URL, topics []string) error { queueName := server.Path server.Path = "" if queueName[0] == '/' { queueName = queueName[1:] } connection, err := rabbitmq.DialTLS(server.String(), &tls.Config{ ServerName: server.Hostname(), }) if err != nil { return fmt.Errorf("Cannot connect to %s . Err: %w", server.Hostname(), err) } defer connection.Close() ch, err := connection.Channel() if err != nil { return fmt.Errorf("Cannot create a channel. Err: %w", err) } defer ch.Close() if err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil { return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err) } var q rabbitmq.Queue if len(queueName) == 0 { q, err = ch.QueueDeclare("", false, true, true, false, nil) } else { q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil) if err != nil { log.Printf("queue not found .. trying to create it: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() if err != nil { return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } } q, err = ch.QueueDeclare(queueName, true, false, true, false, nil) if err != nil { log.Printf("can't create persistent queue ... falling back to temporaty queue: %v\n", err) if ch.IsClosed() { ch, err = connection.Channel() return fmt.Errorf("Channel cannot be re-opened. Err: %w", err) } q, err = ch.QueueDeclare("", false, true, true, false, nil) } } } if err != nil { return fmt.Errorf("Cannot declare queue. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) for _, topic := range topics { err = ch.QueueBind(q.Name, topic, "pubsub", false, nil) if err != nil { return fmt.Errorf("Cannot find queue to exchange with topic %s. Err: %w", topic, err) } } msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil) if err != nil { return fmt.Errorf("Cannot start consumer. Err: %w", err) } // log.Printf("queue: %s:%d", q.Name, q.Consumers) for { msg, ok := <-msgs if !ok { return fmt.Errorf("channel/connection closed?\n") } msgCh <- RabbitMessage(msg) } } func connectAndProcessRabbitMQ(log *log.Logger, ch chan<- RabbitMessage, server url.URL, topics []string) { defer func() { if r := recover(); r != nil { log.Println("'crash' RabbitMQ worker. Recovering... reconnecting...") time.Sleep(5 * time.Second) go connectAndProcessRabbitMQ(log, ch, server, topics) } }() for { err := processRabbitMQ(ch, server, topics) if err != nil { log.Printf("Error in RabbitMQ connection. %#v", err) log.Println("Reconnecting in 2 seconds...") time.Sleep(2 * time.Second) } } } func connectToRabbitMQ(log *log.Logger, server url.URL, topics []string) chan RabbitMessage { ch := make(chan RabbitMessage, 100) go connectAndProcessRabbitMQ(log, ch, server, topics) return ch } func ProcessEvent(f RequestProcessor, h *RequestHandler) { defer func(){ recover() }() f(h) } func ProcessRabbitMQEvents(listenDefs ListenDefinitions, orgs []string) error { server, err := url.Parse(listenDefs.RabbitURL) if err != nil { log.Panicf("cannot parse server URL. Err: %#v", err) } topics := make([]string, 0, len(listenDefs.Handlers)*len(orgs)) for _, org := range orgs { for k := range listenDefs.Handlers { topics = append(topics, fmt.Sprintf("*suse.gitea.%s.%s#", org, k)) } } ch := connectToRabbitMQ(log.Default(), *server, topics) for { msg, ok := <-ch if !ok { return nil } route := strings.Split(msg.RoutingKey, ".") if len(route) > 3 { reqType := route[3] org := route[2] if handler, found := listenDefs.Handlers[org]; found { h, err := CreateRequestHandler(listenDefs.GitAuthor, listenDefs.GitAuthor) if err != nil { log.Printf("Cannot create request handler: %v\n", err) continue } req, err := ParseRequestJSON(reqType, msg.Body) if err != nil { log.Printf("Error parsing request JSON: %v\n", err) continue } else { h.Request = req ProcessEvent(handler, h) } } } } }