vendor: move vendored sources in-tree
This should make it easier to see changes instead of just a blob
This commit is contained in:
169
vendor/github.com/rabbitmq/amqp091-go/consumers.go
generated
vendored
Normal file
169
vendor/github.com/rabbitmq/amqp091-go/consumers.go
generated
vendored
Normal file
@@ -0,0 +1,169 @@
|
||||
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
|
||||
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package amqp091
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var consumerSeq uint64
|
||||
|
||||
const consumerTagLengthMax = 0xFF // see writeShortstr
|
||||
|
||||
func uniqueConsumerTag() string {
|
||||
return commandNameBasedUniqueConsumerTag(os.Args[0])
|
||||
}
|
||||
|
||||
func commandNameBasedUniqueConsumerTag(commandName string) string {
|
||||
tagPrefix := "ctag-"
|
||||
tagInfix := commandName
|
||||
tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)
|
||||
|
||||
if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
|
||||
tagInfix = "streadway/amqp"
|
||||
}
|
||||
|
||||
return tagPrefix + tagInfix + tagSuffix
|
||||
}
|
||||
|
||||
type consumerBuffers map[string]chan *Delivery
|
||||
|
||||
// Concurrent type that manages the consumerTag ->
|
||||
// ingress consumerBuffer mapping
|
||||
type consumers struct {
|
||||
sync.WaitGroup // one for buffer
|
||||
closed chan struct{} // signal buffer
|
||||
|
||||
sync.Mutex // protects below
|
||||
chans consumerBuffers
|
||||
}
|
||||
|
||||
func makeConsumers() *consumers {
|
||||
return &consumers{
|
||||
closed: make(chan struct{}),
|
||||
chans: make(consumerBuffers),
|
||||
}
|
||||
}
|
||||
|
||||
func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
|
||||
defer close(out)
|
||||
defer subs.Done()
|
||||
|
||||
var inflight = in
|
||||
var queue []*Delivery
|
||||
|
||||
for delivery := range in {
|
||||
queue = append(queue, delivery)
|
||||
|
||||
for len(queue) > 0 {
|
||||
select {
|
||||
case <-subs.closed:
|
||||
// closed before drained, drop in-flight
|
||||
return
|
||||
|
||||
case delivery, consuming := <-inflight:
|
||||
if consuming {
|
||||
queue = append(queue, delivery)
|
||||
} else {
|
||||
inflight = nil
|
||||
}
|
||||
|
||||
case out <- *queue[0]:
|
||||
/*
|
||||
* https://github.com/rabbitmq/amqp091-go/issues/179
|
||||
* https://github.com/rabbitmq/amqp091-go/pull/180
|
||||
*
|
||||
* Comment from @lars-t-hansen:
|
||||
*
|
||||
* Given Go's slice semantics, and barring any information
|
||||
* available to the compiler that proves that queue is the only
|
||||
* pointer to the memory it references, the only meaning that
|
||||
* queue = queue[1:] can have is basically queue += sizeof(queue
|
||||
* element), ie, it bumps a pointer. Looking at the generated
|
||||
* code for a simple example (on ARM64 in this case) bears this
|
||||
* out. So what we're left with is an array that we have a
|
||||
* pointer into the middle of. When the GC traces this pointer,
|
||||
* it too does not know whether the array has multiple
|
||||
* referents, and so its only sensible choice is to find the
|
||||
* beginning of the array, and if the array is not already
|
||||
* visited, mark every element in it, including the "dead"
|
||||
* pointer.
|
||||
*
|
||||
* (Depending on the program dynamics, an element may eventually
|
||||
* be appended to the queue when the queue is at capacity, and
|
||||
* in this case the live elements are copied into a new array
|
||||
* and the old array is left to be GC'd eventually, along with
|
||||
* the dead object. But that can take time.)
|
||||
*/
|
||||
queue[0] = nil
|
||||
queue = queue[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// On key conflict, close the previous channel.
|
||||
func (subs *consumers) add(tag string, consumer chan Delivery) {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
if prev, found := subs.chans[tag]; found {
|
||||
close(prev)
|
||||
}
|
||||
|
||||
in := make(chan *Delivery)
|
||||
subs.chans[tag] = in
|
||||
|
||||
subs.Add(1)
|
||||
go subs.buffer(in, consumer)
|
||||
}
|
||||
|
||||
func (subs *consumers) cancel(tag string) (found bool) {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
ch, found := subs.chans[tag]
|
||||
|
||||
if found {
|
||||
delete(subs.chans, tag)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
return found
|
||||
}
|
||||
|
||||
func (subs *consumers) close() {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
close(subs.closed)
|
||||
|
||||
for tag, ch := range subs.chans {
|
||||
delete(subs.chans, tag)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
subs.Wait()
|
||||
}
|
||||
|
||||
// Sends a delivery to a the consumer identified by `tag`.
|
||||
// If unbuffered channels are used for Consume this method
|
||||
// could block all deliveries until the consumer
|
||||
// receives on the other end of the channel.
|
||||
func (subs *consumers) send(tag string, msg *Delivery) bool {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
buffer, found := subs.chans[tag]
|
||||
if found {
|
||||
buffer <- msg
|
||||
}
|
||||
|
||||
return found
|
||||
}
|
||||
Reference in New Issue
Block a user