diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go new file mode 100644 index 00000000..2ff0dff6 --- /dev/null +++ b/storage/notifications/bridge.go @@ -0,0 +1,139 @@ +package notifications + +import ( + "time" + + "github.com/docker/distribution/manifest" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storage" +) + +type bridge struct { + ub URLBuilder + actor ActorRecord + source SourceRecord + sink Sink +} + +var _ Listener = &bridge{} + +// URLBuilder defines a subset of url builder to be used by the event listener. +type URLBuilder interface { + BuildManifestURL(name, tag string) (string, error) + BuildBlobURL(name string, dgst digest.Digest) (string, error) +} + +// NewBridge returns a notification listener that writes records to sink, +// using the actor and source. Any urls populated in the events created by +// this bridge will be created using the URLBuilder. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, sink Sink) Listener { + return &bridge{ + ub: ub, + actor: actor, + source: source, + sink: sink, + } +} + +func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPush, repo, sm) +} + +func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPull, repo, sm) +} + +func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +} + +func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) +} + +func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) +} + +func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) +} + +func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { + event, err := b.createManifestEvent(action, repo, sm) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "manifest" + event.Target.Name = repo.Name() + event.Target.Tag = sm.Tag + + p, err := sm.Payload() + if err != nil { + return nil, err + } + + event.Target.Digest, err = digest.FromBytes(p) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is + // implemented, this should be replaced. + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + if err != nil { + return nil, err + } + + return event, nil +} + +func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { + event, err := b.createLayerEvent(action, repo, dgst) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "layer" + event.Target.Name = repo.Name() + event.Target.Digest = dgst + + var err error + event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst) + if err != nil { + return nil, err + } + + return event, nil +} + +// createEvent creates an event with actor and source populated. +func (b *bridge) createEvent(action string) *Event { + event := createEvent(action) + event.Source = b.source + event.Actor = b.actor + + return event +} + +// createEvent returns a new event, timestamped, with the specified action. +func createEvent(action string) *Event { + return &Event{ + ID: uuid.New(), + Timestamp: time.Now(), + Action: action, + } +} diff --git a/storage/notifications/endpoint.go b/storage/notifications/endpoint.go new file mode 100644 index 00000000..dfdb111c --- /dev/null +++ b/storage/notifications/endpoint.go @@ -0,0 +1,86 @@ +package notifications + +import ( + "net/http" + "time" +) + +// EndpointConfig covers the optional configuration parameters for an active +// endpoint. +type EndpointConfig struct { + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration +} + +// defaults set any zero-valued fields to a reasonable default. +func (ec *EndpointConfig) defaults() { + if ec.Timeout <= 0 { + ec.Timeout = time.Second + } + + if ec.Threshold <= 0 { + ec.Threshold = 10 + } + + if ec.Backoff <= 0 { + ec.Backoff = time.Second + } +} + +// Endpoint is a reliable, queued, thread-safe sink that notify external http +// services when events are written. Writes are non-blocking and always +// succeed for callers but events may be queued internally. +type Endpoint struct { + Sink + url string + name string + + EndpointConfig + + metrics *safeMetrics +} + +// NewEndpoint returns a running endpoint, ready to receive events. +func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { + var endpoint Endpoint + endpoint.name = name + endpoint.url = url + endpoint.EndpointConfig = config + endpoint.defaults() + endpoint.metrics = newSafeMetrics() + + // Configures the inmemory queue, retry, http pipeline. + endpoint.Sink = newHTTPSink( + endpoint.url, endpoint.Timeout, endpoint.Headers, + endpoint.metrics.httpStatusListener()) + endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + + register(&endpoint) + return &endpoint +} + +// Name returns the name of the endpoint, generally used for debugging. +func (e *Endpoint) Name() string { + return e.name +} + +// URL returns the url of the endpoint. +func (e *Endpoint) URL() string { + return e.url +} + +// ReadMetrics populates em with metrics from the endpoint. +func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { + e.metrics.Lock() + defer e.metrics.Unlock() + + *em = e.metrics.EndpointMetrics + // Map still need to copied in a threadsafe manner. + em.Statuses = make(map[string]int) + for k, v := range e.metrics.Statuses { + em.Statuses[k] = v + } +} diff --git a/storage/notifications/event.go b/storage/notifications/event.go index 3c000dc2..f03920ca 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -36,7 +36,7 @@ type Envelope struct { // Event provides the fields required to describe a registry event. type Event struct { // ID provides a unique identifier for the event. - ID string `json:"uuid,omitempty"` + ID string `json:"id,omitempty"` // Timestamp is the time at which the event occurred. Timestamp time.Time `json:"timestamp,omitempty"` @@ -74,6 +74,8 @@ type Event struct { // ActorRecord specifies the agent that initiated the event. For most // situations, this could be from the authorizaton context of the request. +// Data in this record can refer to both the initiating client and the +// generating request. type ActorRecord struct { // Name corresponds to the subject or username associated with the // request context that generated the event. @@ -82,6 +84,22 @@ type ActorRecord struct { // Addr contains the ip or hostname and possibly port of the client // connection that initiated the event. Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // RequestID uniquely identifies the registry request that generated the + // event. + RequestID string `json:"requestID,omitempty"` + + // TODO(stevvooe): Look into setting a session cookie to get this + // without docker daemon. + // SessionID + + // TODO(stevvooe): Push the "Docker-Command" header to replace cookie and + // get the actual command. + // Command } // SourceRecord identifies the registry node that generated the event. Put @@ -93,12 +111,9 @@ type SourceRecord struct { // os.Hostname() along with the running port. Addr string `json:"addr,omitempty"` - // Host is the dns name of the registry cluster, as configured. - Host string `json:"host,omitempty"` - - // RequestID uniquely identifies the registry request that generated the - // event. - RequestID string `json:"request_id,omitempty"` + // InstanceID identifies a running instance of an application. Changes + // after each restart. + InstanceID string `json:"instanceID,omitempty"` } var ( diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 37e43c66..77bd19f9 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -15,7 +15,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { { "events": [ { - "uuid": "asdf-asdf-asdf-asdf-0", + "id": "asdf-asdf-asdf-asdf-0", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -27,16 +27,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-1", + "id": "asdf-asdf-asdf-asdf-1", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -47,16 +47,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-2", + "id": "asdf-asdf-asdf-asdf-2", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -67,12 +67,12 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } } ] @@ -87,11 +87,11 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { var prototype Event prototype.Action = "push" prototype.Timestamp = tm - prototype.Actor.Addr = "hostname.local" + prototype.Actor.Addr = "client.local" prototype.Actor.Name = "test-actor" - prototype.Source.Addr = "hostname.local" - prototype.Source.Host = "registrycluster.local" - prototype.Source.RequestID = "asdfasdf" + prototype.Actor.RequestID = "asdfasdf" + prototype.Actor.Host = "registrycluster.local" + prototype.Source.Addr = "hostname.local:port" var manifestPush Event manifestPush = prototype diff --git a/storage/notifications/http.go b/storage/notifications/http.go new file mode 100644 index 00000000..15b3574c --- /dev/null +++ b/storage/notifications/http.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// httpSink implements a single-flight, http notification endpoint. This is +// very lightweight in that it only makes an attempt at an http request. +// Reliability should be provided by the caller. +type httpSink struct { + url string + + mu sync.Mutex + closed bool + client *http.Client + listeners []httpStatusListener + + // TODO(stevvooe): Allow one to configure the media type accepted by this + // sink and choose the serialization based on that. +} + +// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other +// sinks for increased reliability. +func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { + return &httpSink{ + url: u, + listeners: listeners, + client: &http.Client{ + Transport: &headerRoundTripper{ + Transport: http.DefaultTransport.(*http.Transport), + headers: headers, + }, + Timeout: timeout, + }, + } +} + +// httpStatusListener is called on various outcomes of sending notifications. +type httpStatusListener interface { + success(status int, events ...Event) + failure(status int, events ...Event) + err(err error, events ...Event) +} + +// Accept makes an attempt to notify the endpoint, returning an error if it +// fails. It is the caller's responsibility to retry on error. The events are +// accepted or rejected as a group. +func (hs *httpSink) Write(events ...Event) error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return ErrSinkClosed + } + + envelope := Envelope{ + Events: events, + } + + // TODO(stevvooe): It is not ideal to keep re-encoding the request body on + // retry but we are going to do it to keep the code simple. It is likely + // we could change the event struct to manage its own buffer. + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) + } + + body := bytes.NewReader(p) + resp, err := hs.client.Post(hs.url, EventsMediaType, body) + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + + return fmt.Errorf("%v: error posting: %v", hs, err) + } + + // The notifier will treat any 2xx or 3xx response as accepted by the + // endpoint. + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 400: + for _, listener := range hs.listeners { + listener.success(resp.StatusCode, events...) + } + + // TODO(stevvooe): This is a little accepting: we may want to support + // unsupported media type responses with retries using the correct + // media type. There may also be cases that will never work. + + return nil + default: + for _, listener := range hs.listeners { + listener.failure(resp.StatusCode, events...) + } + return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) + } +} + +// Close the endpoint +func (hs *httpSink) Close() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return fmt.Errorf("httpsink: already closed") + } + + hs.closed = true + return nil +} + +func (hs *httpSink) String() string { + return fmt.Sprintf("httpSink{%s}", hs.url) +} + +type headerRoundTripper struct { + *http.Transport // must be transport to support CancelRequest + headers http.Header +} + +func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + var nreq http.Request + nreq = *req + nreq.Header = make(http.Header) + + merge := func(headers http.Header) { + for k, v := range headers { + nreq.Header[k] = append(nreq.Header[k], v...) + } + } + + merge(req.Header) + merge(hrt.headers) + + return hrt.Transport.RoundTrip(&nreq) +} diff --git a/storage/notifications/http_test.go b/storage/notifications/http_test.go new file mode 100644 index 00000000..c2cfbc02 --- /dev/null +++ b/storage/notifications/http_test.go @@ -0,0 +1,155 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "mime" + "net/http" + "net/http/httptest" + "reflect" + "strconv" + "testing" +) + +// TestHTTPSink mocks out an http endpoint and notifies it under a couple of +// conditions, ensuring correct behavior. +func TestHTTPSink(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + t.Fatalf("unexpected request method: %v", r.Method) + return + } + + // Extract the content type and make sure it matches + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) + return + } + + if mediaType != EventsMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) + return + } + + var envelope Envelope + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&envelope); err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error decoding request body: %v", err) + return + } + + // Let caller choose the status + status, err := strconv.Atoi(r.FormValue("status")) + if err != nil { + t.Logf("error parsing status: %v", err) + + // May just be empty, set status to 200 + status = http.StatusOK + } + + w.WriteHeader(status) + })) + + metrics := newSafeMetrics() + sink := newHTTPSink(server.URL, 0, nil, + &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) + + var expectedMetrics EndpointMetrics + expectedMetrics.Statuses = make(map[string]int) + + for _, tc := range []struct { + events []Event // events to send + url string + failure bool // true if there should be a failure. + statusCode int // if not set, no status code should be incremented. + }{ + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest")}, + }, + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest"), + createTestEvent("push", "library/test", "layer"), + createTestEvent("push", "library/test", "layer"), + }, + }, + { + statusCode: http.StatusTemporaryRedirect, + }, + { + statusCode: http.StatusBadRequest, + failure: true, + }, + { + // Case where connection never goes through. + url: "http://shoudlntresolve/", + failure: true, + }, + } { + + if tc.failure { + expectedMetrics.Failures += len(tc.events) + } else { + expectedMetrics.Successes += len(tc.events) + } + + if tc.statusCode > 0 { + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + } + + url := tc.url + if url == "" { + url = server.URL + "/" + } + // setup endpoint to respond with expected status code. + url += fmt.Sprintf("?status=%v", tc.statusCode) + sink.url = url + + t.Logf("testcase: %v, fail=%v", url, tc.failure) + // Try a simple event emission. + err := sink.Write(tc.events...) + + if !tc.failure { + if err != nil { + t.Fatalf("unexpected error send event: %v", err) + } + } else { + if err == nil { + t.Fatalf("the endpoint should have rejected the request") + } + } + + if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { + t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) + } + } + + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing http sink: %v", err) + } + + // double close returns error + if err := sink.Close(); err == nil { + t.Fatalf("second close should have returned error: %v", err) + } + +} + +func createTestEvent(action, repo, typ string) Event { + event := createEvent(action) + + event.Target.Type = typ + event.Target.Name = repo + + return *event +} diff --git a/storage/notifications/listener.go b/storage/notifications/listener.go new file mode 100644 index 00000000..2d7bb112 --- /dev/null +++ b/storage/notifications/listener.go @@ -0,0 +1,140 @@ +package notifications + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" +) + +// ManifestListener describes a set of methods for listening to events related to manifests. +type ManifestListener interface { + ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error + ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error +} + +// LayerListener describes a listener that can respond to layer related events. +type LayerListener interface { + LayerPushed(repo storage.Repository, layer storage.Layer) error + LayerPulled(repo storage.Repository, layer storage.Layer) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + LayerDeleted(repo storage.Repository, layer storage.Layer) error +} + +// Listener combines all repository events into a single interface. +type Listener interface { + ManifestListener + LayerListener +} + +type repositoryListener struct { + storage.Repository + listener Listener +} + +// Listen dispatches events on the repository to the listener. +func Listen(repo storage.Repository, listener Listener) storage.Repository { + return &repositoryListener{ + Repository: repo, + listener: listener, + } +} + +func (rl *repositoryListener) Manifests() storage.ManifestService { + return &manifestServiceListener{ + ManifestService: rl.Repository.Manifests(), + parent: rl, + } +} + +func (rl *repositoryListener) Layers() storage.LayerService { + return &layerServiceListener{ + LayerService: rl.Repository.Layers(), + parent: rl, + } +} + +type manifestServiceListener struct { + storage.ManifestService + parent *repositoryListener +} + +func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) { + sm, err := msl.ManifestService.Get(tag) + if err == nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest pull to listener: %v", err) + } + } + + return sm, err +} + +func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error { + err := msl.ManifestService.Put(tag, sm) + + if err == nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest push to listener: %v", err) + } + } + + return err +} + +type layerServiceListener struct { + storage.LayerService + parent *repositoryListener +} + +func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { + layer, err := lsl.LayerService.Fetch(dgst) + if err == nil { + if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer pull to listener: %v", err) + } + } + + return layer, err +} + +func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Upload() + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Resume(uuid) + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { + return &layerUploadListener{ + LayerUpload: lu, + parent: lsl, + } +} + +type layerUploadListener struct { + storage.LayerUpload + parent *layerServiceListener +} + +func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { + layer, err := lul.LayerUpload.Finish(dgst) + if err == nil { + if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer push to listener: %v", err) + } + } + + return layer, err +} diff --git a/storage/notifications/listener_test.go b/storage/notifications/listener_test.go new file mode 100644 index 00000000..17af8b15 --- /dev/null +++ b/storage/notifications/listener_test.go @@ -0,0 +1,151 @@ +package notifications + +import ( + "io" + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" +) + +func TestListener(t *testing.T) { + registry := storage.NewRegistryWithDriver(inmemory.New()) + tl := &testListener{ + ops: make(map[string]int), + } + repository := Listen(registry.Repository("foo/bar"), tl) + + // Now take the registry through a number of operations + checkExerciseRepository(t, repository) + + expectedOps := map[string]int{ + "manifest:push": 1, + "manifest:pull": 1, + // "manifest:delete": 0, // deletes not supported for now + "layer:push": 2, + "layer:pull": 2, + // "layer:delete": 0, // deletes not supported for now + } + + if !reflect.DeepEqual(tl.ops, expectedOps) { + t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps) + } + +} + +type testListener struct { + ops map[string]int +} + +func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:push"]++ + + return nil +} + +func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:pull"]++ + return nil +} + +func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:delete"]++ + return nil +} + +func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:push"]++ + return nil +} + +func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:pull"]++ + return nil +} + +func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:delete"]++ + return nil +} + +// checkExerciseRegistry takes the registry through all of its operations, +// carrying out generic checks. +func checkExerciseRepository(t *testing.T, repository storage.Repository) { + // TODO(stevvooe): This would be a nice testutil function. Basically, it + // takes the registry through a common set of operations. This could be + // used to make cross-cutting updates by changing internals that affect + // update counts. Basically, it would make writing tests a lot easier. + + tag := "thetag" + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: repository.Name(), + Tag: tag, + } + + layers := repository.Layers() + for i := 0; i < 2; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating test layer: %v", err) + } + dgst := digest.Digest(ds) + upload, err := layers.Upload() + if err != nil { + t.Fatalf("error creating layer upload: %v", err) + } + + // Use the resumes, as well! + upload, err = layers.Resume(upload.UUID()) + if err != nil { + t.Fatalf("error resuming layer upload: %v", err) + } + + io.Copy(upload, rs) + + if _, err := upload.Finish(dgst); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + + m.FSLayers = append(m.FSLayers, manifest.FSLayer{ + BlobSum: dgst, + }) + + // Then fetch the layers + if _, err := layers.Fetch(dgst); err != nil { + t.Fatalf("error fetching layer: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("unexpected error signing manifest: %v", err) + } + + manifests := repository.Manifests() + + if err := manifests.Put(tag, sm); err != nil { + t.Fatalf("unexpected error putting the manifest: %v", err) + } + + fetched, err := manifests.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if fetched.Tag != fetched.Tag { + t.Fatalf("retrieved unexpected manifest: %v", err) + } +} diff --git a/storage/notifications/metrics.go b/storage/notifications/metrics.go new file mode 100644 index 00000000..2a8ffcbd --- /dev/null +++ b/storage/notifications/metrics.go @@ -0,0 +1,152 @@ +package notifications + +import ( + "expvar" + "fmt" + "net/http" + "sync" +) + +// EndpointMetrics track various actions taken by the endpoint, typically by +// number of events. The goal of this to export it via expvar but we may find +// some other future solution to be better. +type EndpointMetrics struct { + Pending int // events pending in queue + Events int // total events incoming + Successes int // total events written successfully + Failures int // total events failed + Errors int // total events errored + Statuses map[string]int // status code histogram, per call event +} + +// safeMetrics guards the metrics implementation with a lock and provides a +// safe update function. +type safeMetrics struct { + EndpointMetrics + sync.Mutex // protects statuses map +} + +// newSafeMetrics returns safeMetrics with map allocated. +func newSafeMetrics() *safeMetrics { + var sm safeMetrics + sm.Statuses = make(map[string]int) + return &sm +} + +// httpStatusListener returns the listener for the http sink that updates the +// relevent counters. +func (sm *safeMetrics) httpStatusListener() httpStatusListener { + return &endpointMetricsHTTPStatusListener{ + safeMetrics: sm, + } +} + +// eventQueueListener returns a listener that maintains queue related counters. +func (sm *safeMetrics) eventQueueListener() eventQueueListener { + return &endpointMetricsEventQueueListener{ + safeMetrics: sm, + } +} + +// endpointMetricsHTTPStatusListener increments counters related to http sinks +// for the relevent events. +type endpointMetricsHTTPStatusListener struct { + *safeMetrics +} + +var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} + +func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Successes += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Failures += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Errors += len(events) +} + +// endpointMetricsEventQueueListener maintains the incoming events counter and +// the queues pending count. +type endpointMetricsEventQueueListener struct { + *safeMetrics +} + +func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Events += len(events) + eqc.Pending += len(events) +} + +func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Pending -= len(events) +} + +// endpoints is global registry of endpoints used to report metrics to expvar +var endpoints struct { + registered []*Endpoint + mu sync.Mutex +} + +// register places the endpoint into expvar so that stats are tracked. +func register(e *Endpoint) { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + endpoints.registered = append(endpoints.registered, e) +} + +func init() { + // NOTE(stevvooe): Setup registry metrics structure to report to expvar. + // Ideally, we do more metrics through logging but we need some nice + // realtime metrics for queue state for now. + + registry := expvar.Get("registry") + + if registry == nil { + registry = expvar.NewMap("registry") + } + + var notifications expvar.Map + notifications.Init() + notifications.Set("endpoints", expvar.Func(func() interface{} { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + var names []interface{} + for _, v := range endpoints.registered { + var epjson struct { + Name string `json:"name"` + URL string `json:"url"` + EndpointConfig + + Metrics EndpointMetrics + } + + epjson.Name = v.Name() + epjson.URL = v.URL() + epjson.EndpointConfig = v.EndpointConfig + + v.ReadMetrics(&epjson.Metrics) + + names = append(names, epjson) + } + + return names + })) + + registry.(*expvar.Map).Set("notifications", ¬ifications) +} diff --git a/storage/notifications/sinks.go b/storage/notifications/sinks.go new file mode 100644 index 00000000..2bf63e2d --- /dev/null +++ b/storage/notifications/sinks.go @@ -0,0 +1,337 @@ +package notifications + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// NOTE(stevvooe): This file contains definitions for several utility sinks. +// Typically, the broadcaster is the only sink that should be required +// externally, but others are suitable for export if the need arises. Albeit, +// the tight integration with endpoint metrics should be removed. + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan []Event + closed chan chan struct{} +} + +// NewBroadcaster ... +// Add appends one or more sinks to the list of sinks. The broadcaster +// behavior will be affected by the properties of the sink. Generally, the +// sink should accept all messages and deal with reliability on its own. Use +// of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan []Event), + closed: make(chan chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts a block of events to be dispatched to all sinks. This method +// will never fail and should never block (hopefully!). The caller cedes the +// slice memory to the broadcaster and should not modify it after calling +// write. +func (b *Broadcaster) Write(events ...Event) error { + select { + case b.events <- events: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + logrus.Infof("broadcaster: closing") + select { + case <-b.closed: + // already closed + return fmt.Errorf("broadcaster: already closed") + default: + // do a little chan handoff dance to synchronize closing + closed := make(chan struct{}) + b.closed <- closed + close(b.closed) + <-closed + return nil + } +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + for { + select { + case block := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(block...); err != nil { + logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) + } + } + case closing := <-b.closed: + + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil { + logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) + } + } + closing <- struct{}{} + + logrus.Debugf("broadcaster: closed") + return + } + } +} + +// eventQueue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type eventQueue struct { + sink Sink + events *list.List + listeners []eventQueueListener + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// eventQueueListener is called when various events happen on the queue. +type eventQueueListener interface { + ingress(events ...Event) + egress(events ...Event) +} + +// newEventQueue returns a queue to the provided sink. If the updater is non- +// nil, it will be called to update pending metrics on ingress and egress. +func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { + eq := eventQueue{ + sink: sink, + events: list.New(), + listeners: listeners, + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// beend closed. +func (eq *eventQueue) Write(events ...Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + for _, listener := range eq.listeners { + listener.ingress(events...) + } + eq.events.PushBack(events) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *eventQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return fmt.Errorf("eventqueue: already closed") + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + + return eq.sink.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *eventQueue) run() { + for { + block := eq.next() + + if block == nil { + return // nil block means event queue is closed. + } + + if err := eq.sink.Write(block...); err != nil { + logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) + } + + for _, listener := range eq.listeners { + listener.egress(block...) + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *eventQueue) next() []Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.([]Event) + eq.events.Remove(front) + + return block +} + +// retryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Internally, it is a circuit breaker retries to manage reset. +// Concurrent calls to a retrying sink are serialized through the sink, +// meaning that if one is in-flight, another will not proceed. +type retryingSink struct { + mu sync.Mutex + sink Sink + closed bool + + // circuit breaker hueristics + failures struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + } +} + +type retryingSinkListener interface { + active(events ...Event) + retry(events ...Event) +} + +// TODO(stevvooe): We are using circuit break here, which actually doesn't +// make a whole lot of sense for this use case, since we always retry. Move +// this to use bounded exponential backoff. + +// newRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { + rs := &retryingSink{ + sink: sink, + } + rs.failures.threshold = threshold + rs.failures.backoff = backoff + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *retryingSink) Write(events ...Event) error { + rs.mu.Lock() + defer rs.mu.Unlock() + +retry: + + if rs.closed { + return ErrSinkClosed + } + + if !rs.proceed() { + logrus.Warnf("%v encountered too many errors, backing off", rs.sink) + rs.wait(rs.failures.backoff) + goto retry + } + + if err := rs.write(events...); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logrus.Errorf("retryingsink: error writing events: %v, retrying", err) + goto retry + } + + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *retryingSink) Close() error { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.closed { + return fmt.Errorf("retryingsink: already closed") + } + + rs.closed = true + return rs.sink.Close() +} + +// write provides a helper that dispatches failure and success properly. Used +// by write as the single-flight write call. +func (rs *retryingSink) write(events ...Event) error { + if err := rs.sink.Write(events...); err != nil { + rs.failure() + return err + } + + rs.reset() + return nil +} + +// wait backoff time against the sink, unlocking so others can proceed. Should +// only be called by methods that currently have the mutex. +func (rs *retryingSink) wait(backoff time.Duration) { + rs.mu.Unlock() + defer rs.mu.Lock() + + // backoff here + time.Sleep(backoff) +} + +// reset marks a succesful call. +func (rs *retryingSink) reset() { + rs.failures.recent = 0 + rs.failures.last = time.Time{} +} + +// failure records a failure. +func (rs *retryingSink) failure() { + rs.failures.recent++ + rs.failures.last = time.Now().UTC() +} + +// proceed returns true if the call should proceed based on circuit breaker +// hueristics. +func (rs *retryingSink) proceed() bool { + return rs.failures.recent < rs.failures.threshold || + time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) +} diff --git a/storage/notifications/sinks_test.go b/storage/notifications/sinks_test.go new file mode 100644 index 00000000..89756a99 --- /dev/null +++ b/storage/notifications/sinks_test.go @@ -0,0 +1,223 @@ +package notifications + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + + for i := 0; i < 10; i++ { + sinks = append(sinks, &testSink{}) + } + + b := NewBroadcaster(sinks...) + + var block []Event + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := b.Write(block...); err != nil { + t.Fatalf("error writing block of length %d: %v", len(block), err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() // Wait until writes complete + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } + +} + +func TestEventQueue(t *testing.T) { + const nevents = 1000 + var ts testSink + metrics := newSafeMetrics() + eq := newEventQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: &ts, + delay: time.Millisecond * 1, + }, metrics.eventQueueListener()) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= nevents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := eq.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + metrics.Lock() + defer metrics.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + + if metrics.Events != nevents { + t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents) + } + + if metrics.Pending != 0 { + t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0) + } +} + +func TestRetryingSink(t *testing.T) { + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + var ts testSink + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: &ts, + } + s := newRetryingSink(flaky, 3, 10*time.Millisecond) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= 100; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + // Above 50, set the failure rate lower + if i > 50 { + s.mu.Lock() + flaky.rate = 0.90 + s.mu.Unlock() + } + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + defer wg.Done() + if err := s.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != 100 { + t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) + } +} + +type testSink struct { + events []Event + mu sync.Mutex + closed bool +} + +func (ts *testSink) Write(events ...Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.events = append(ts.events, events...) + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.closed = true + + logrus.Infof("closing testSink") + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(events ...Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(events...) +} + +type flakySink struct { + Sink + rate float64 +} + +func (fs *flakySink) Write(events ...Event) error { + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing %d events", len(events)) + } + + return fs.Sink.Write(events...) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err == nil { + t.Fatalf("no error on double close") + } + + // Write after closed should be an error + if err := sink.Write([]Event{}...); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +}