800cb95821
TBD: Queue not converted yet Signed-off-by: Elliot Pahl <elliot.pahl@gmail.com>
261 lines
6.6 KiB
Go
261 lines
6.6 KiB
Go
package events
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// RetryingSink retries the write until success or an ErrSinkClosed is
|
|
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
|
// block. Retry is configured with a RetryStrategy. Concurrent calls to a
|
|
// retrying sink are serialized through the sink, meaning that if one is
|
|
// in-flight, another will not proceed.
|
|
type RetryingSink struct {
|
|
sink Sink
|
|
strategy RetryStrategy
|
|
closed chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
// NewRetryingSink returns a sink that will retry writes to a sink, backing
|
|
// off on failure. Parameters threshold and backoff adjust the behavior of the
|
|
// circuit breaker.
|
|
func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
|
|
rs := &RetryingSink{
|
|
sink: sink,
|
|
strategy: strategy,
|
|
closed: make(chan struct{}),
|
|
}
|
|
|
|
return rs
|
|
}
|
|
|
|
// Write attempts to flush the events to the downstream sink until it succeeds
|
|
// or the sink is closed.
|
|
func (rs *RetryingSink) Write(event Event) error {
|
|
logger := logrus.WithField("event", event)
|
|
|
|
retry:
|
|
select {
|
|
case <-rs.closed:
|
|
return ErrSinkClosed
|
|
default:
|
|
}
|
|
|
|
if backoff := rs.strategy.Proceed(event); backoff > 0 {
|
|
select {
|
|
case <-time.After(backoff):
|
|
// TODO(stevvooe): This branch holds up the next try. Before, we
|
|
// would simply break to the "retry" label and then possibly wait
|
|
// again. However, this requires all retry strategies to have a
|
|
// large probability of probing the sync for success, rather than
|
|
// just backing off and sending the request.
|
|
case <-rs.closed:
|
|
return ErrSinkClosed
|
|
}
|
|
}
|
|
|
|
if err := rs.sink.Write(event); err != nil {
|
|
if err == ErrSinkClosed {
|
|
// terminal!
|
|
return err
|
|
}
|
|
|
|
logger := logger.WithError(err) // shadow!!
|
|
|
|
if rs.strategy.Failure(event, err) {
|
|
logger.Errorf("retryingsink: dropped event")
|
|
return nil
|
|
}
|
|
|
|
logger.Errorf("retryingsink: error writing event, retrying")
|
|
goto retry
|
|
}
|
|
|
|
rs.strategy.Success(event)
|
|
return nil
|
|
}
|
|
|
|
// Close closes the sink and the underlying sink.
|
|
func (rs *RetryingSink) Close() error {
|
|
rs.once.Do(func() {
|
|
close(rs.closed)
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rs *RetryingSink) String() string {
|
|
// Serialize a copy of the RetryingSink without the sync.Once, to avoid
|
|
// a data race.
|
|
rs2 := map[string]interface{}{
|
|
"sink": rs.sink,
|
|
"strategy": rs.strategy,
|
|
"closed": rs.closed,
|
|
}
|
|
return fmt.Sprint(rs2)
|
|
}
|
|
|
|
// RetryStrategy defines a strategy for retrying event sink writes.
|
|
//
|
|
// All methods should be goroutine safe.
|
|
type RetryStrategy interface {
|
|
// Proceed is called before every event send. If proceed returns a
|
|
// positive, non-zero integer, the retryer will back off by the provided
|
|
// duration.
|
|
//
|
|
// An event is provided, by may be ignored.
|
|
Proceed(event Event) time.Duration
|
|
|
|
// Failure reports a failure to the strategy. If this method returns true,
|
|
// the event should be dropped.
|
|
Failure(event Event, err error) bool
|
|
|
|
// Success should be called when an event is sent successfully.
|
|
Success(event Event)
|
|
}
|
|
|
|
// Breaker implements a circuit breaker retry strategy.
|
|
//
|
|
// The current implementation never drops events.
|
|
type Breaker struct {
|
|
threshold int
|
|
recent int
|
|
last time.Time
|
|
backoff time.Duration // time after which we retry after failure.
|
|
mu sync.Mutex
|
|
}
|
|
|
|
var _ RetryStrategy = &Breaker{}
|
|
|
|
// NewBreaker returns a breaker that will backoff after the threshold has been
|
|
// tripped. A Breaker is thread safe and may be shared by many goroutines.
|
|
func NewBreaker(threshold int, backoff time.Duration) *Breaker {
|
|
return &Breaker{
|
|
threshold: threshold,
|
|
backoff: backoff,
|
|
}
|
|
}
|
|
|
|
// Proceed checks the failures against the threshold.
|
|
func (b *Breaker) Proceed(event Event) time.Duration {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if b.recent < b.threshold {
|
|
return 0
|
|
}
|
|
|
|
return b.last.Add(b.backoff).Sub(time.Now())
|
|
}
|
|
|
|
// Success resets the breaker.
|
|
func (b *Breaker) Success(event Event) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
b.recent = 0
|
|
b.last = time.Time{}
|
|
}
|
|
|
|
// Failure records the failure and latest failure time.
|
|
func (b *Breaker) Failure(event Event, err error) bool {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
b.recent++
|
|
b.last = time.Now().UTC()
|
|
return false // never drop events.
|
|
}
|
|
|
|
var (
|
|
// DefaultExponentialBackoffConfig provides a default configuration for
|
|
// exponential backoff.
|
|
DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
|
|
Base: time.Second,
|
|
Factor: time.Second,
|
|
Max: 20 * time.Second,
|
|
}
|
|
)
|
|
|
|
// ExponentialBackoffConfig configures backoff parameters.
|
|
//
|
|
// Note that these parameters operate on the upper bound for choosing a random
|
|
// value. For example, at Base=1s, a random value in [0,1s) will be chosen for
|
|
// the backoff value.
|
|
type ExponentialBackoffConfig struct {
|
|
// Base is the minimum bound for backing off after failure.
|
|
Base time.Duration
|
|
|
|
// Factor sets the amount of time by which the backoff grows with each
|
|
// failure.
|
|
Factor time.Duration
|
|
|
|
// Max is the absolute maxiumum bound for a single backoff.
|
|
Max time.Duration
|
|
}
|
|
|
|
// ExponentialBackoff implements random backoff with exponentially increasing
|
|
// bounds as the number consecutive failures increase.
|
|
type ExponentialBackoff struct {
|
|
failures uint64 // consecutive failure counter (needs to be 64-bit aligned)
|
|
config ExponentialBackoffConfig
|
|
}
|
|
|
|
// NewExponentialBackoff returns an exponential backoff strategy with the
|
|
// desired config. If config is nil, the default is returned.
|
|
func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
|
|
return &ExponentialBackoff{
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
// Proceed returns the next randomly bound exponential backoff time.
|
|
func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
|
|
return b.backoff(atomic.LoadUint64(&b.failures))
|
|
}
|
|
|
|
// Success resets the failures counter.
|
|
func (b *ExponentialBackoff) Success(event Event) {
|
|
atomic.StoreUint64(&b.failures, 0)
|
|
}
|
|
|
|
// Failure increments the failure counter.
|
|
func (b *ExponentialBackoff) Failure(event Event, err error) bool {
|
|
atomic.AddUint64(&b.failures, 1)
|
|
return false
|
|
}
|
|
|
|
// backoff calculates the amount of time to wait based on the number of
|
|
// consecutive failures.
|
|
func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
|
|
if failures <= 0 {
|
|
// proceed normally when there are no failures.
|
|
return 0
|
|
}
|
|
|
|
factor := b.config.Factor
|
|
if factor <= 0 {
|
|
factor = DefaultExponentialBackoffConfig.Factor
|
|
}
|
|
|
|
backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
|
|
|
|
max := b.config.Max
|
|
if max <= 0 {
|
|
max = DefaultExponentialBackoffConfig.Max
|
|
}
|
|
|
|
if backoff > max || backoff < 0 {
|
|
backoff = max
|
|
}
|
|
|
|
// Choose a uniformly distributed value from [0, backoff).
|
|
return time.Duration(rand.Int63n(int64(backoff)))
|
|
}
|