common: rabbit refactor

Generalize interface to allow processing of any events, not just
Gitea events.
This commit is contained in:
2025-07-21 22:30:04 +02:00
parent 69dac4ec31
commit 3d24dce5c0
11 changed files with 260 additions and 151 deletions

View File

@@ -59,6 +59,7 @@ type AutogitConfig struct {
Reviewers []string // only used by `pr` workflow Reviewers []string // only used by `pr` workflow
ReviewGroups []ReviewGroup ReviewGroups []ReviewGroup
Committers []string // group in addition to Reviewers and Maintainers that can order the bot around, mostly as helper for factory-maintainers Committers []string // group in addition to Reviewers and Maintainers that can order the bot around, mostly as helper for factory-maintainers
Subdirs []string // list of directories to sort submodules into. Needed b/c _manifest cannot list non-existent directories
ManualMergeOnly bool // only merge with "Merge OK" comment by Project Maintainers and/or Package Maintainers and/or reviewers ManualMergeOnly bool // only merge with "Merge OK" comment by Project Maintainers and/or Package Maintainers and/or reviewers
ManualMergeProject bool // require merge of ProjectGit PRs with "Merge OK" by ProjectMaintainers and/or reviewers ManualMergeProject bool // require merge of ProjectGit PRs with "Merge OK" by ProjectMaintainers and/or reviewers

View File

@@ -562,7 +562,21 @@ func (c *ObsClient) DeleteProject(project string) error {
} }
return nil return nil
}
func (c *ObsClient) BuildLog(prj, pkg, repo, arch string) (io.ReadCloser, error) {
url := c.baseUrl.JoinPath("build", prj, repo, arch, pkg, "_log")
query := url.Query()
query.Add("nostream", "1")
query.Add("start", "0")
url.RawQuery = query.Encode()
res, err := c.ObsRequestRaw("GET", url.String(), nil)
if err != nil {
return nil, err
}
return res.Body, nil
} }
type PackageBuildStatus struct { type PackageBuildStatus struct {

View File

@@ -22,55 +22,32 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/url" "net/url"
"runtime/debug"
"slices"
"strings" "strings"
"time" "time"
rabbitmq "github.com/rabbitmq/amqp091-go" rabbitmq "github.com/rabbitmq/amqp091-go"
) )
const RequestType_CreateBrachTag = "create" type RabbitConnection struct {
const RequestType_DeleteBranchTag = "delete"
const RequestType_Fork = "fork"
const RequestType_Issue = "issues"
const RequestType_IssueAssign = "issue_assign"
const RequestType_IssueComment = "issue_comment"
const RequestType_IssueLabel = "issue_label"
const RequestType_IssueMilestone = "issue_milestone"
const RequestType_Push = "push"
const RequestType_Repository = "repository"
const RequestType_Release = "release"
const RequestType_PR = "pull_request"
const RequestType_PRAssign = "pull_request_assign"
const RequestType_PRLabel = "pull_request_label"
const RequestType_PRComment = "pull_request_comment"
const RequestType_PRMilestone = "pull_request_milestone"
const RequestType_PRSync = "pull_request_sync"
const RequestType_PRReviewAccepted = "pull_request_review_approved"
const RequestType_PRReviewRejected = "pull_request_review_rejected"
const RequestType_PRReviewRequest = "pull_request_review_request"
const RequestType_PRReviewComment = "pull_request_review_comment"
const RequestType_Wiki = "wiki"
type RequestProcessor interface {
ProcessFunc(*Request) error
}
type ListenDefinitions struct {
RabbitURL *url.URL // amqps://user:password@host/queue RabbitURL *url.URL // amqps://user:password@host/queue
GitAuthor string queueName string
Handlers map[string]RequestProcessor ch *rabbitmq.Channel
Orgs []string
topics []string topics []string
topicSubChanges chan string // +topic = subscribe, -topic = unsubscribe topicSubChanges chan string // +topic = subscribe, -topic = unsubscribe
} }
type RabbitProcessor interface {
GenerateTopics() []string
Connection() *RabbitConnection
ProcessRabbitMessage(msg RabbitMessage) error
}
type RabbitMessage rabbitmq.Delivery type RabbitMessage rabbitmq.Delivery
func (l *ListenDefinitions) processTopicChanges(ch *rabbitmq.Channel, queueName string) { func (l *RabbitConnection) ProcessTopicChanges() {
for { for {
topic, ok := <-l.topicSubChanges topic, ok := <-l.topicSubChanges
if !ok { if !ok {
@@ -80,11 +57,11 @@ func (l *ListenDefinitions) processTopicChanges(ch *rabbitmq.Channel, queueName
LogDebug(" topic change:", topic) LogDebug(" topic change:", topic)
switch topic[0] { switch topic[0] {
case '+': case '+':
if err := ch.QueueBind(queueName, topic[1:], "pubsub", false, nil); err != nil { if err := l.ch.QueueBind(l.queueName, topic[1:], "pubsub", false, nil); err != nil {
LogError(err) LogError(err)
} }
case '-': case '-':
if err := ch.QueueUnbind(queueName, topic[1:], "pubsub", nil); err != nil { if err := l.ch.QueueUnbind(l.queueName, topic[1:], "pubsub", nil); err != nil {
LogError(err) LogError(err)
} }
default: default:
@@ -93,7 +70,7 @@ func (l *ListenDefinitions) processTopicChanges(ch *rabbitmq.Channel, queueName
} }
} }
func (l *ListenDefinitions) processRabbitMQ(msgCh chan<- RabbitMessage) error { func (l *RabbitConnection) ProcessRabbitMQ(msgCh chan<- RabbitMessage) error {
queueName := l.RabbitURL.Path queueName := l.RabbitURL.Path
l.RabbitURL.Path = "" l.RabbitURL.Path = ""
@@ -152,7 +129,7 @@ func (l *ListenDefinitions) processRabbitMQ(msgCh chan<- RabbitMessage) error {
LogDebug(" -- listening to topics:") LogDebug(" -- listening to topics:")
l.topicSubChanges = make(chan string) l.topicSubChanges = make(chan string)
defer close(l.topicSubChanges) defer close(l.topicSubChanges)
go l.processTopicChanges(ch, q.Name) go l.ProcessTopicChanges()
for _, topic := range l.topics { for _, topic := range l.topics {
l.topicSubChanges <- "+" + topic l.topicSubChanges <- "+" + topic
@@ -174,18 +151,18 @@ func (l *ListenDefinitions) processRabbitMQ(msgCh chan<- RabbitMessage) error {
} }
} }
func (l *ListenDefinitions) connectAndProcessRabbitMQ(ch chan<- RabbitMessage) { func (l *RabbitConnection) ConnectAndProcessRabbitMQ(ch chan<- RabbitMessage) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
LogError(r) LogError(r)
LogError("'crash' RabbitMQ worker. Recovering... reconnecting...") LogError("'crash' RabbitMQ worker. Recovering... reconnecting...")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
go l.connectAndProcessRabbitMQ(ch) go l.ConnectAndProcessRabbitMQ(ch)
} }
}() }()
for { for {
err := l.processRabbitMQ(ch) err := l.ProcessRabbitMQ(ch)
if err != nil { if err != nil {
LogError("Error in RabbitMQ connection. %#v", err) LogError("Error in RabbitMQ connection. %#v", err)
LogInfo("Reconnecting in 2 seconds...") LogInfo("Reconnecting in 2 seconds...")
@@ -194,49 +171,20 @@ func (l *ListenDefinitions) connectAndProcessRabbitMQ(ch chan<- RabbitMessage) {
} }
} }
func (l *ListenDefinitions) connectToRabbitMQ() chan RabbitMessage { func (l *RabbitConnection) ConnectToRabbitMQ(processor RabbitProcessor) <-chan RabbitMessage {
LogInfo("RabbitMQ connection:", l.RabbitURL.String())
l.RabbitURL.User = url.UserPassword(rabbitUser, rabbitPassword)
l.topics = processor.GenerateTopics()
ch := make(chan RabbitMessage, 100) ch := make(chan RabbitMessage, 100)
go l.connectAndProcessRabbitMQ(ch) go l.ConnectAndProcessRabbitMQ(ch)
return ch return ch
} }
func ProcessEvent(f RequestProcessor, request *Request) { func (l *RabbitConnection) UpdateTopics(processor RabbitProcessor) {
defer func() { newTopics := processor.GenerateTopics()
if r := recover(); r != nil {
LogError("panic caught")
if err, ok := r.(error); !ok {
LogError(err)
}
LogError(string(debug.Stack()))
}
}()
if err := f.ProcessFunc(request); err != nil {
LogError(err)
}
}
func (l *ListenDefinitions) generateTopics() []string {
topics := make([]string, 0, len(l.Handlers)*len(l.Orgs))
scope := "suse"
if l.RabbitURL.Hostname() == "rabbit.opensuse.org" {
scope = "opensuse"
}
for _, org := range l.Orgs {
for requestType, _ := range l.Handlers {
topics = append(topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, requestType))
}
}
slices.Sort(topics)
return slices.Compact(topics)
}
func (l *ListenDefinitions) UpdateTopics() {
newTopics := l.generateTopics()
j := 0 j := 0
next_new_topic: next_new_topic:
@@ -273,14 +221,8 @@ next_new_topic:
l.topics = newTopics l.topics = newTopics
} }
func (l *ListenDefinitions) ProcessRabbitMQEvents() error { func ProcessRabbitMQEvents(processor RabbitProcessor) error {
LogInfo("RabbitMQ connection:", l.RabbitURL.String()) ch := processor.Connection().ConnectToRabbitMQ(processor)
LogDebug("# Handlers:", len(l.Handlers))
LogDebug("# Orgs:", len(l.Orgs))
l.RabbitURL.User = url.UserPassword(rabbitUser, rabbitPassword)
l.topics = l.generateTopics()
ch := l.connectToRabbitMQ()
for { for {
msg, ok := <-ch msg, ok := <-ch
@@ -289,36 +231,8 @@ func (l *ListenDefinitions) ProcessRabbitMQEvents() error {
} }
LogDebug("event:", msg.RoutingKey) LogDebug("event:", msg.RoutingKey)
if err := processor.ProcessRabbitMessage(msg); err != nil {
route := strings.Split(msg.RoutingKey, ".") LogError("Error processing", msg.RoutingKey, err)
if len(route) > 3 {
reqType := route[3]
org := route[2]
if !slices.Contains(l.Orgs, org) {
LogInfo("Got event for unhandeled org:", org)
continue
}
LogDebug("org:", org, "type:", reqType)
if handler, found := l.Handlers[reqType]; found {
/* h, err := CreateRequestHandler()
if err != nil {
log.Println("Cannot create request handler", err)
continue
}
*/
req, err := ParseRequestJSON(reqType, msg.Body)
if err != nil {
LogError("Error parsing request JSON:", err)
continue
} else {
LogDebug("processing req", req.Type)
// h.Request = req
ProcessEvent(handler, req)
}
}
} }
} }
} }

128
common/rabbitmq_gitea.go Normal file
View File

@@ -0,0 +1,128 @@
package common
/*
* This file is part of Autogits.
*
* Copyright © 2024 SUSE LLC
*
* Autogits is free software: you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free Software
* Foundation, either version 2 of the License, or (at your option) any later
* version.
*
* Autogits is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Foobar. If not, see <https://www.gnu.org/licenses/>.
*/
import (
"fmt"
"runtime/debug"
"slices"
"strings"
)
const RequestType_CreateBrachTag = "create"
const RequestType_DeleteBranchTag = "delete"
const RequestType_Fork = "fork"
const RequestType_Issue = "issues"
const RequestType_IssueAssign = "issue_assign"
const RequestType_IssueComment = "issue_comment"
const RequestType_IssueLabel = "issue_label"
const RequestType_IssueMilestone = "issue_milestone"
const RequestType_Push = "push"
const RequestType_Repository = "repository"
const RequestType_Release = "release"
const RequestType_PR = "pull_request"
const RequestType_PRAssign = "pull_request_assign"
const RequestType_PRLabel = "pull_request_label"
const RequestType_PRComment = "pull_request_comment"
const RequestType_PRMilestone = "pull_request_milestone"
const RequestType_PRSync = "pull_request_sync"
const RequestType_PRReviewAccepted = "pull_request_review_approved"
const RequestType_PRReviewRejected = "pull_request_review_rejected"
const RequestType_PRReviewRequest = "pull_request_review_request"
const RequestType_PRReviewComment = "pull_request_review_comment"
const RequestType_Wiki = "wiki"
type RequestProcessor interface {
ProcessFunc(*Request) error
}
type RabbitMQGiteaEventsProcessor struct {
Handlers map[string]RequestProcessor
Orgs []string
c *RabbitConnection
}
func (gitea *RabbitMQGiteaEventsProcessor) Connection() *RabbitConnection {
if gitea.c == nil {
gitea.c = &RabbitConnection{}
}
return gitea.c
}
func (gitea *RabbitMQGiteaEventsProcessor) GenerateTopics() []string {
topics := make([]string, 0, len(gitea.Handlers)*len(gitea.Orgs))
scope := "suse"
if gitea.c.RabbitURL.Hostname() == "rabbit.opensuse.org" {
scope = "opensuse"
}
for _, org := range gitea.Orgs {
for requestType, _ := range gitea.Handlers {
topics = append(topics, fmt.Sprintf("%s.src.%s.%s.#", scope, org, requestType))
}
}
slices.Sort(topics)
return slices.Compact(topics)
}
func (gitea *RabbitMQGiteaEventsProcessor) ProcessRabbitMessage(msg RabbitMessage) error {
route := strings.Split(msg.RoutingKey, ".")
if len(route) > 3 {
reqType := route[3]
org := route[2]
if !slices.Contains(gitea.Orgs, org) {
LogInfo("Got event for unhandeled org:", org)
return nil
}
LogDebug("org:", org, "type:", reqType)
if handler, found := gitea.Handlers[reqType]; found {
req, err := ParseRequestJSON(reqType, msg.Body)
if err != nil {
LogError("Error parsing request JSON:", err)
return nil
} else {
LogDebug("processing req", req.Type)
// h.Request = req
ProcessEvent(handler, req)
}
}
}
return fmt.Errorf("Invalid routing key: %s", route)
}
func ProcessEvent(f RequestProcessor, request *Request) {
defer func() {
if r := recover(); r != nil {
LogError("panic caught")
if err, ok := r.(error); !ok {
LogError(err)
}
LogError(string(debug.Stack()))
}
}()
if err := f.ProcessFunc(request); err != nil {
LogError(err)
}
}

20
common/rabbitmq_obs.go Normal file
View File

@@ -0,0 +1,20 @@
package common
type RabbitMQObsBuildStatusProcessor struct {
c *RabbitConnection
}
func (o *RabbitMQObsBuildStatusProcessor) GenerateTopics() []string {
}
func (o *RabbitMQObsBuildStatusProcessor) Connection() *RabbitConnection {
if o.c == nil {
o.c = &RabbitConnection{}
}
return o.c
}
func (o *RabbitMQObsBuildStatusProcessor) ProcessRabbitMessage(msg RabbitMessage) error {
}

View File

@@ -50,11 +50,13 @@ func TestListenDefinitionsTopicUpdate(t *testing.T) {
u, _ := url.Parse("amqps://rabbit.example.com") u, _ := url.Parse("amqps://rabbit.example.com")
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
l := ListenDefinitions{ l := &RabbitMQGiteaEventsProcessor{
Orgs: test.orgs1, Orgs: test.orgs1,
Handlers: make(map[string]RequestProcessor), Handlers: make(map[string]RequestProcessor),
topicSubChanges: make(chan string, len(test.topicDelta)*10), c: &RabbitConnection{
RabbitURL: u, RabbitURL: u,
topicSubChanges: make(chan string, len(test.topicDelta)*10),
},
} }
slices.Sort(test.topicDelta) slices.Sort(test.topicDelta)
@@ -64,11 +66,11 @@ func TestListenDefinitionsTopicUpdate(t *testing.T) {
} }
changes := []string{} changes := []string{}
l.UpdateTopics() l.c.UpdateTopics(l)
a: a:
for { for {
select { select {
case c := <-l.topicSubChanges: case c := <-l.c.topicSubChanges:
changes = append(changes, c) changes = append(changes, c)
default: default:
changes = []string{} changes = []string{}
@@ -78,13 +80,13 @@ func TestListenDefinitionsTopicUpdate(t *testing.T) {
l.Orgs = test.orgs2 l.Orgs = test.orgs2
l.UpdateTopics() l.c.UpdateTopics(l)
changes = []string{} changes = []string{}
b: b:
for { for {
select { select {
case c := <-l.topicSubChanges: case c := <-l.c.topicSubChanges:
changes = append(changes, c) changes = append(changes, c)
default: default:
slices.Sort(changes) slices.Sort(changes)

View File

@@ -113,6 +113,10 @@ func (s *Submodule) parseKeyValue(line string) error {
return nil return nil
} }
func (s *Submodule) ManifestSubmodulePath(manifest *Manifest) string {
return manifest.SubdirForPackage(s.Path)
}
func ParseSubmodulesFile(reader io.Reader) ([]Submodule, error) { func ParseSubmodulesFile(reader io.Reader) ([]Submodule, error) {
data, err := io.ReadAll(reader) data, err := io.ReadAll(reader)
if err != nil { if err != nil {

View File

@@ -22,6 +22,7 @@ import (
"bytes" "bytes"
"flag" "flag"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
@@ -78,7 +79,7 @@ func ProjectStatusSummarySvg(project string) []byte {
return ret.Bytes() return ret.Bytes()
} }
func PackageStatusSummarySvg(status common.PackageBuildStatus) []byte { func PackageStatusSummarySvg(status *common.PackageBuildStatus) []byte {
buildStatus, ok := common.ObsBuildStatusDetails[status.Code] buildStatus, ok := common.ObsBuildStatusDetails[status.Code]
if !ok { if !ok {
buildStatus = common.ObsBuildStatusDetails["error"] buildStatus = common.ObsBuildStatusDetails["error"]
@@ -108,10 +109,13 @@ func main() {
key := flag.String("key-file", "", "Private key for the TLS certificate") key := flag.String("key-file", "", "Private key for the TLS certificate")
listen := flag.String("listen", "[::1]:8080", "Listening string") listen := flag.String("listen", "[::1]:8080", "Listening string")
disableTls := flag.Bool("no-tls", false, "Disable TLS") disableTls := flag.Bool("no-tls", false, "Disable TLS")
obsHost := flag.String("obs-host", "api.opensuse.org", "OBS API endpoint for package status information") obsHost := flag.String("obs-host", "https://api.opensuse.org", "OBS API endpoint for package status information")
flag.BoolVar(&debug, "debug", false, "Enable debug logging") flag.BoolVar(&debug, "debug", false, "Enable debug logging")
RabbitMQHost := flag.String("rabbit-mq", "amqps://rabbit.opensuse.org", "RabbitMQ message bus server")
Topic := flag.String("topic", "opensuse.obs", "RabbitMQ topic prefix")
flag.Parse() flag.Parse()
common.PanicOnError(common.RequireObsSecretToken()) common.PanicOnError(common.RequireObsSecretToken())
var err error var err error
@@ -143,21 +147,25 @@ func main() {
res.Header().Add("content-type", "image/svg+xml") res.Header().Add("content-type", "image/svg+xml")
prjStatus := GetCurrentStatus(prj) status := GetDetailedBuildStatus(prj, pkg, repo, arch)
if prjStatus == nil {
return
}
for _, r := range prjStatus.Result {
if r.Arch == arch && r.Repository == repo {
for _, status := range r.Status {
if status.Package == pkg {
res.Write(PackageStatusSummarySvg(status)) res.Write(PackageStatusSummarySvg(status))
})
http.HandleFunc("GET /{Project}/{Package}/{Repository}/{Arch}/buildlog", func(res http.ResponseWriter, req *http.Request) {
prj := req.PathValue("Project")
pkg := req.PathValue("Package")
repo := req.PathValue("Repository")
arch := req.PathValue("Arch")
// status := GetDetailedBuildStatus(prj, pkg, repo, arch)
data, err := obs.BuildLog(prj, pkg, repo, arch)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
common.LogError("Failed to fetch build log for:", prj, pkg, repo, arch, err)
return return
} }
} defer data.Close()
}
} io.Copy(res, data)
}) })
go ProcessUpdates() go ProcessUpdates()

View File

@@ -34,6 +34,25 @@ func GetCurrentStatus(project string) *common.BuildResultList {
} }
} }
func GetDetailedBuildStatus(prj, pkg, repo, arch string) *common.PackageBuildStatus {
prjStatus := GetCurrentStatus(prj)
if prjStatus == nil {
return nil
}
for _, r := range prjStatus.Result {
if r.Arch == arch && r.Repository == repo {
for _, status := range r.Status {
if status.Package == pkg {
return &status
}
}
}
}
return nil
}
func ProcessUpdates() { func ProcessUpdates() {
for { for {
msg := <-StatusUpdateCh msg := <-StatusUpdateCh

View File

@@ -526,7 +526,7 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
var defs common.ListenDefinitions defs := &common.RabbitMQGiteaEventsProcessor{}
var err error var err error
if len(*basePath) == 0 { if len(*basePath) == 0 {
@@ -557,7 +557,7 @@ func main() {
} }
log.Println("*** Reconfiguring ***") log.Println("*** Reconfiguring ***")
updateConfiguration(*configFilename, &defs.Orgs) updateConfiguration(*configFilename, &defs.Orgs)
defs.UpdateTopics() defs.Connection().UpdateTopics(defs)
} }
}() }()
signal.Notify(signalChannel, syscall.SIGHUP) signal.Notify(signalChannel, syscall.SIGHUP)
@@ -573,18 +573,17 @@ func main() {
updateConfiguration(*configFilename, &defs.Orgs) updateConfiguration(*configFilename, &defs.Orgs)
defs.GitAuthor = GitAuthor defs.Connection().RabbitURL, err = url.Parse(*rabbitUrl)
defs.RabbitURL, err = url.Parse(*rabbitUrl)
if err != nil { if err != nil {
log.Panicf("cannot parse server URL. Err: %#v\n", err) log.Panicf("cannot parse server URL. Err: %#v\n", err)
} }
go consistencyCheckProcess() go consistencyCheckProcess()
log.Println("defs:", defs) log.Println("defs:", *defs)
defs.Handlers = make(map[string]common.RequestProcessor) defs.Handlers = make(map[string]common.RequestProcessor)
defs.Handlers[common.RequestType_Push] = &PushActionProcessor{} defs.Handlers[common.RequestType_Push] = &PushActionProcessor{}
defs.Handlers[common.RequestType_Repository] = &RepositoryActionProcessor{} defs.Handlers[common.RequestType_Repository] = &RepositoryActionProcessor{}
log.Fatal(defs.ProcessRabbitMQEvents()) log.Fatal(common.ProcessRabbitMQEvents(defs))
} }

View File

@@ -162,9 +162,9 @@ func main() {
checker := CreateDefaultStateChecker(*checkOnStart, req, Gitea, time.Duration(*checkIntervalHours)*time.Hour) checker := CreateDefaultStateChecker(*checkOnStart, req, Gitea, time.Duration(*checkIntervalHours)*time.Hour)
go checker.ConsistencyCheckProcess() go checker.ConsistencyCheckProcess()
listenDefs := common.ListenDefinitions{ listenDefs := &common.RabbitMQGiteaEventsProcessor{
Orgs: orgs, Orgs: orgs,
GitAuthor: GitAuthor, // GitAuthor: GitAuthor,
Handlers: map[string]common.RequestProcessor{ Handlers: map[string]common.RequestProcessor{
common.RequestType_PR: req, common.RequestType_PR: req,
common.RequestType_PRSync: req, common.RequestType_PRSync: req,
@@ -172,7 +172,7 @@ func main() {
common.RequestType_PRReviewRejected: req, common.RequestType_PRReviewRejected: req,
}, },
} }
listenDefs.RabbitURL, _ = url.Parse(*rabbitUrl) listenDefs.Connection().RabbitURL, _ = url.Parse(*rabbitUrl)
common.PanicOnError(listenDefs.ProcessRabbitMQEvents()) common.PanicOnError(common.ProcessRabbitMQEvents(listenDefs))
} }