distribution/storage/notifications/endpoint.go
Stephen J Day 9f0c8d6616 Implement notification endpoint webhook dispatch
This changeset implements webhook notification endpoints for dispatching
registry events. Repository instances can be decorated by a listener that
converts calls into context-aware events, using a bridge. Events generated in
the bridge are written to a sink. Implementations of sink include a broadcast
and endpoint sink which can be used to configure event dispatch. Endpoints
represent a webhook notification target, with queueing and retries built in.
They can be added to a Broadcaster, which is a simple sink that writes a block
of events to several sinks, to provide a complete dispatch mechanism.

The main caveat to the current approach is that all unsent notifications are
inmemory. Best effort is made to ensure that notifications are not dropped, to
the point where queues may back up on faulty endpoints. If the endpoint is
fixed, the events will be retried and all messages will go through.

Internally, this functionality is all made up of Sink objects. The queuing
functionality is implemented with an eventQueue sink and retries are
implemented with retryingSink. Replacing the inmemory queuing with something
persistent should be as simple as replacing broadcaster with a remote queue and
that sets up the sinks to be local workers listening to that remote queue.

Metrics are kept for each endpoint and exported via expvar. This may not be a
permanent appraoch but should provide enough information for troubleshooting
notification problems.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-02-03 13:30:20 -08:00

87 lines
2.0 KiB
Go

package notifications
import (
"net/http"
"time"
)
// EndpointConfig covers the optional configuration parameters for an active
// endpoint.
type EndpointConfig struct {
Headers http.Header
Timeout time.Duration
Threshold int
Backoff time.Duration
}
// defaults set any zero-valued fields to a reasonable default.
func (ec *EndpointConfig) defaults() {
if ec.Timeout <= 0 {
ec.Timeout = time.Second
}
if ec.Threshold <= 0 {
ec.Threshold = 10
}
if ec.Backoff <= 0 {
ec.Backoff = time.Second
}
}
// Endpoint is a reliable, queued, thread-safe sink that notify external http
// services when events are written. Writes are non-blocking and always
// succeed for callers but events may be queued internally.
type Endpoint struct {
Sink
url string
name string
EndpointConfig
metrics *safeMetrics
}
// NewEndpoint returns a running endpoint, ready to receive events.
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
var endpoint Endpoint
endpoint.name = name
endpoint.url = url
endpoint.EndpointConfig = config
endpoint.defaults()
endpoint.metrics = newSafeMetrics()
// Configures the inmemory queue, retry, http pipeline.
endpoint.Sink = newHTTPSink(
endpoint.url, endpoint.Timeout, endpoint.Headers,
endpoint.metrics.httpStatusListener())
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
register(&endpoint)
return &endpoint
}
// Name returns the name of the endpoint, generally used for debugging.
func (e *Endpoint) Name() string {
return e.name
}
// URL returns the url of the endpoint.
func (e *Endpoint) URL() string {
return e.url
}
// ReadMetrics populates em with metrics from the endpoint.
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
e.metrics.Lock()
defer e.metrics.Unlock()
*em = e.metrics.EndpointMetrics
// Map still need to copied in a threadsafe manner.
em.Statuses = make(map[string]int)
for k, v := range e.metrics.Statuses {
em.Statuses[k] = v
}
}