ad6bb66faf
The Hub registry generates a large volume of notifications, many of which are uninteresting based on target media type. Discarding them within the notification endpoint consumes considerable resources that could be saved by discarding them within the registry. To that end, this change adds registry configuration options to restrict the notifications sent to an endpoint based on target media type. Signed-off-by: Noah Treuhaft <noah.treuhaft@docker.com>
94 lines
2.3 KiB
Go
94 lines
2.3 KiB
Go
package notifications
|
|
|
|
import (
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
// EndpointConfig covers the optional configuration parameters for an active
|
|
// endpoint.
|
|
type EndpointConfig struct {
|
|
Headers http.Header
|
|
Timeout time.Duration
|
|
Threshold int
|
|
Backoff time.Duration
|
|
IgnoredMediaTypes []string
|
|
Transport *http.Transport
|
|
}
|
|
|
|
// defaults set any zero-valued fields to a reasonable default.
|
|
func (ec *EndpointConfig) defaults() {
|
|
if ec.Timeout <= 0 {
|
|
ec.Timeout = time.Second
|
|
}
|
|
|
|
if ec.Threshold <= 0 {
|
|
ec.Threshold = 10
|
|
}
|
|
|
|
if ec.Backoff <= 0 {
|
|
ec.Backoff = time.Second
|
|
}
|
|
|
|
if ec.Transport == nil {
|
|
ec.Transport = http.DefaultTransport.(*http.Transport)
|
|
}
|
|
}
|
|
|
|
// Endpoint is a reliable, queued, thread-safe sink that notify external http
|
|
// services when events are written. Writes are non-blocking and always
|
|
// succeed for callers but events may be queued internally.
|
|
type Endpoint struct {
|
|
Sink
|
|
url string
|
|
name string
|
|
|
|
EndpointConfig
|
|
|
|
metrics *safeMetrics
|
|
}
|
|
|
|
// NewEndpoint returns a running endpoint, ready to receive events.
|
|
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
|
var endpoint Endpoint
|
|
endpoint.name = name
|
|
endpoint.url = url
|
|
endpoint.EndpointConfig = config
|
|
endpoint.defaults()
|
|
endpoint.metrics = newSafeMetrics()
|
|
|
|
// Configures the inmemory queue, retry, http pipeline.
|
|
endpoint.Sink = newHTTPSink(
|
|
endpoint.url, endpoint.Timeout, endpoint.Headers,
|
|
endpoint.Transport, endpoint.metrics.httpStatusListener())
|
|
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
|
|
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
|
endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes)
|
|
|
|
register(&endpoint)
|
|
return &endpoint
|
|
}
|
|
|
|
// Name returns the name of the endpoint, generally used for debugging.
|
|
func (e *Endpoint) Name() string {
|
|
return e.name
|
|
}
|
|
|
|
// URL returns the url of the endpoint.
|
|
func (e *Endpoint) URL() string {
|
|
return e.url
|
|
}
|
|
|
|
// ReadMetrics populates em with metrics from the endpoint.
|
|
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
|
|
e.metrics.Lock()
|
|
defer e.metrics.Unlock()
|
|
|
|
*em = e.metrics.EndpointMetrics
|
|
// Map still need to copied in a threadsafe manner.
|
|
em.Statuses = make(map[string]int)
|
|
for k, v := range e.metrics.Statuses {
|
|
em.Statuses[k] = v
|
|
}
|
|
}
|