Fix crash on connection
Setting channel object in Rabbit listener object.
This commit is contained in:
@@ -86,38 +86,38 @@ func (l *RabbitConnection) ProcessRabbitMQ(msgCh chan<- RabbitMessage) error {
|
|||||||
}
|
}
|
||||||
defer connection.Close()
|
defer connection.Close()
|
||||||
|
|
||||||
ch, err := connection.Channel()
|
l.ch, err = connection.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot create a channel. Err: %w", err)
|
return fmt.Errorf("Cannot create a channel. Err: %w", err)
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
defer l.ch.Close()
|
||||||
|
|
||||||
if err = ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil {
|
if err = l.ch.ExchangeDeclarePassive("pubsub", "topic", true, false, false, false, nil); err != nil {
|
||||||
return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err)
|
return fmt.Errorf("Cannot find pubsub exchange? Err: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var q rabbitmq.Queue
|
var q rabbitmq.Queue
|
||||||
if len(queueName) == 0 {
|
if len(queueName) == 0 {
|
||||||
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
q, err = l.ch.QueueDeclare("", false, true, true, false, nil)
|
||||||
} else {
|
} else {
|
||||||
q, err = ch.QueueDeclarePassive(queueName, true, false, true, false, nil)
|
q, err = l.ch.QueueDeclarePassive(queueName, true, false, true, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
LogInfo("queue not found .. trying to create it:", err)
|
LogInfo("queue not found .. trying to create it:", err)
|
||||||
if ch.IsClosed() {
|
if l.ch.IsClosed() {
|
||||||
ch, err = connection.Channel()
|
l.ch, err = connection.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Channel cannot be re-opened. Err: %w", err)
|
return fmt.Errorf("Channel cannot be re-opened. Err: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
q, err = ch.QueueDeclare(queueName, true, false, true, false, nil)
|
q, err = l.ch.QueueDeclare(queueName, true, false, true, false, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
LogInfo("can't create persistent queue ... falling back to temporaty queue:", err)
|
LogInfo("can't create persistent queue ... falling back to temporaty queue:", err)
|
||||||
if ch.IsClosed() {
|
if l.ch.IsClosed() {
|
||||||
ch, err = connection.Channel()
|
l.ch, err = connection.Channel()
|
||||||
return fmt.Errorf("Channel cannot be re-opened. Err: %w", err)
|
return fmt.Errorf("Channel cannot be re-opened. Err: %w", err)
|
||||||
}
|
}
|
||||||
q, err = ch.QueueDeclare("", false, true, true, false, nil)
|
q, err = l.ch.QueueDeclare("", false, true, true, false, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -135,7 +135,7 @@ func (l *RabbitConnection) ProcessRabbitMQ(msgCh chan<- RabbitMessage) error {
|
|||||||
l.topicSubChanges <- "+" + topic
|
l.topicSubChanges <- "+" + topic
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
|
msgs, err := l.ch.Consume(q.Name, "", true, true, false, false, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot start consumer. Err: %w", err)
|
return fmt.Errorf("Cannot start consumer. Err: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user