distribution/storage/notifications/http.go

146 lines
3.6 KiB
Go
Raw Normal View History

Implement notification endpoint webhook dispatch This changeset implements webhook notification endpoints for dispatching registry events. Repository instances can be decorated by a listener that converts calls into context-aware events, using a bridge. Events generated in the bridge are written to a sink. Implementations of sink include a broadcast and endpoint sink which can be used to configure event dispatch. Endpoints represent a webhook notification target, with queueing and retries built in. They can be added to a Broadcaster, which is a simple sink that writes a block of events to several sinks, to provide a complete dispatch mechanism. The main caveat to the current approach is that all unsent notifications are inmemory. Best effort is made to ensure that notifications are not dropped, to the point where queues may back up on faulty endpoints. If the endpoint is fixed, the events will be retried and all messages will go through. Internally, this functionality is all made up of Sink objects. The queuing functionality is implemented with an eventQueue sink and retries are implemented with retryingSink. Replacing the inmemory queuing with something persistent should be as simple as replacing broadcaster with a remote queue and that sets up the sinks to be local workers listening to that remote queue. Metrics are kept for each endpoint and exported via expvar. This may not be a permanent appraoch but should provide enough information for troubleshooting notification problems. Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-01-28 08:27:46 +01:00
package notifications
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// httpSink implements a single-flight, http notification endpoint. This is
// very lightweight in that it only makes an attempt at an http request.
// Reliability should be provided by the caller.
type httpSink struct {
url string
mu sync.Mutex
closed bool
client *http.Client
listeners []httpStatusListener
// TODO(stevvooe): Allow one to configure the media type accepted by this
// sink and choose the serialization based on that.
}
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
return &httpSink{
url: u,
listeners: listeners,
client: &http.Client{
Transport: &headerRoundTripper{
Transport: http.DefaultTransport.(*http.Transport),
headers: headers,
},
Timeout: timeout,
},
}
}
// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
success(status int, events ...Event)
failure(status int, events ...Event)
err(err error, events ...Event)
}
// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(events ...Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return ErrSinkClosed
}
envelope := Envelope{
Events: events,
}
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
// retry but we are going to do it to keep the code simple. It is likely
// we could change the event struct to manage its own buffer.
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
}
body := bytes.NewReader(p)
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error posting: %v", hs, err)
}
// The notifier will treat any 2xx or 3xx response as accepted by the
// endpoint.
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400:
for _, listener := range hs.listeners {
listener.success(resp.StatusCode, events...)
}
// TODO(stevvooe): This is a little accepting: we may want to support
// unsupported media type responses with retries using the correct
// media type. There may also be cases that will never work.
return nil
default:
for _, listener := range hs.listeners {
listener.failure(resp.StatusCode, events...)
}
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
}
}
// Close the endpoint
func (hs *httpSink) Close() error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return fmt.Errorf("httpsink: already closed")
}
hs.closed = true
return nil
}
func (hs *httpSink) String() string {
return fmt.Sprintf("httpSink{%s}", hs.url)
}
type headerRoundTripper struct {
*http.Transport // must be transport to support CancelRequest
headers http.Header
}
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var nreq http.Request
nreq = *req
nreq.Header = make(http.Header)
merge := func(headers http.Header) {
for k, v := range headers {
nreq.Header[k] = append(nreq.Header[k], v...)
}
}
merge(req.Header)
merge(hrt.headers)
return hrt.Transport.RoundTrip(&nreq)
}