Merge pull request #113 from stevvooe/event-sink-endpoint
Webhook notification support in registry webapp
This commit is contained in:
commit
c1c7d3dabf
@ -5,3 +5,22 @@ storage:
|
|||||||
rootdirectory: /tmp/registry-dev
|
rootdirectory: /tmp/registry-dev
|
||||||
http:
|
http:
|
||||||
addr: :5000
|
addr: :5000
|
||||||
|
secret: asecretforlocaldevelopment
|
||||||
|
debug:
|
||||||
|
addr: localhost:5001
|
||||||
|
notifications:
|
||||||
|
endpoints:
|
||||||
|
- name: local-8082
|
||||||
|
url: http://localhost:5003/callback
|
||||||
|
headers:
|
||||||
|
Authorization: [Bearer <an example token>]
|
||||||
|
timeout: 1s
|
||||||
|
threshold: 10
|
||||||
|
backoff: 1s
|
||||||
|
disabled: true
|
||||||
|
- name: local-8083
|
||||||
|
url: http://localhost:8083/callback
|
||||||
|
timeout: 1s
|
||||||
|
threshold: 10
|
||||||
|
backoff: 1s
|
||||||
|
disabled: true
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
_ "expvar"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -47,6 +48,10 @@ func main() {
|
|||||||
handler = handlers.CombinedLoggingHandler(os.Stdout, handler)
|
handler = handlers.CombinedLoggingHandler(os.Stdout, handler)
|
||||||
log.SetLevel(logLevel(config.Loglevel))
|
log.SetLevel(logLevel(config.Loglevel))
|
||||||
|
|
||||||
|
if config.HTTP.Debug.Addr != "" {
|
||||||
|
go debugServer(config.HTTP.Debug.Addr)
|
||||||
|
}
|
||||||
|
|
||||||
if config.HTTP.TLS.Certificate == "" {
|
if config.HTTP.TLS.Certificate == "" {
|
||||||
log.Infof("listening on %v", config.HTTP.Addr)
|
log.Infof("listening on %v", config.HTTP.Addr)
|
||||||
if err := http.ListenAndServe(config.HTTP.Addr, handler); err != nil {
|
if err := http.ListenAndServe(config.HTTP.Addr, handler); err != nil {
|
||||||
@ -142,3 +147,13 @@ func configureReporting(app *registry.App) http.Handler {
|
|||||||
|
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// debugServer starts the debug server with pprof, expvar among other
|
||||||
|
// endpoints. The addr should not be exposed externally. For most of these to
|
||||||
|
// work, tls cannot be enabled on the endpoint, so it is generally separate.
|
||||||
|
func debugServer(addr string) {
|
||||||
|
log.Infof("debug server listening %v", addr)
|
||||||
|
if err := http.ListenAndServe(addr, nil); err != nil {
|
||||||
|
log.Fatalf("error listening on debug interface: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4,8 +4,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Configuration is a versioned registry configuration, intended to be provided by a yaml file, and
|
// Configuration is a versioned registry configuration, intended to be provided by a yaml file, and
|
||||||
@ -54,7 +56,19 @@ type Configuration struct {
|
|||||||
// Certificate.
|
// Certificate.
|
||||||
Key string `yaml:"key"`
|
Key string `yaml:"key"`
|
||||||
} `yaml:"tls"`
|
} `yaml:"tls"`
|
||||||
|
|
||||||
|
// Debug configures the http debug interface, if specified. This can
|
||||||
|
// include services such as pprof, expvar and other data that should
|
||||||
|
// not be exposed externally. Left disabled by default.
|
||||||
|
Debug struct {
|
||||||
|
// Addr specifies the bind address for the debug server.
|
||||||
|
Addr string `yaml:"addr"`
|
||||||
|
} `yaml:"debug"`
|
||||||
} `yaml:"http"`
|
} `yaml:"http"`
|
||||||
|
|
||||||
|
// Notifications specifies configuration about various endpoint to which
|
||||||
|
// registry events are dispatched.
|
||||||
|
Notifications Notifications `yaml:"notifications"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// v0_1Configuration is a Version 0.1 Configuration struct
|
// v0_1Configuration is a Version 0.1 Configuration struct
|
||||||
@ -232,6 +246,26 @@ func (auth Auth) MarshalYAML() (interface{}, error) {
|
|||||||
return map[string]Parameters(auth), nil
|
return map[string]Parameters(auth), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notifications configures multiple http endpoints.
|
||||||
|
type Notifications struct {
|
||||||
|
// Endpoints is a list of http configurations for endpoints that
|
||||||
|
// respond to webhook notifications. In the future, we may allow other
|
||||||
|
// kinds of endpoints, such as external queues.
|
||||||
|
Endpoints []Endpoint `yaml:"endpoints"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Endpoint describes the configuration of an http webhook notification
|
||||||
|
// endpoint.
|
||||||
|
type Endpoint struct {
|
||||||
|
Name string `yaml:"name"` // identifies the endpoint in the registry instance.
|
||||||
|
Disabled bool `yaml:"disabled"` // disables the endpoint
|
||||||
|
URL string `yaml:"url"` // post url for the endpoint.
|
||||||
|
Headers http.Header `yaml:"headers"` // static headers that should be added to all requests
|
||||||
|
Timeout time.Duration `yaml:"timeout"` // HTTP timeout
|
||||||
|
Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure
|
||||||
|
Backoff time.Duration `yaml:"backoff"` // backoff duration
|
||||||
|
}
|
||||||
|
|
||||||
// Reporting defines error reporting methods.
|
// Reporting defines error reporting methods.
|
||||||
type Reporting struct {
|
type Reporting struct {
|
||||||
// Bugsnag configures error reporting for Bugsnag (bugsnag.com).
|
// Bugsnag configures error reporting for Bugsnag (bugsnag.com).
|
||||||
|
@ -2,6 +2,7 @@ package configuration
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -40,6 +41,17 @@ var configStruct = Configuration{
|
|||||||
APIKey: "BugsnagApiKey",
|
APIKey: "BugsnagApiKey",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Notifications: Notifications{
|
||||||
|
Endpoints: []Endpoint{
|
||||||
|
{
|
||||||
|
Name: "endpoint-1",
|
||||||
|
URL: "http://example.com",
|
||||||
|
Headers: http.Header{
|
||||||
|
"Authorization": []string{"Bearer <example>"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// configYamlV0_1 is a Version 0.1 yaml document representing configStruct
|
// configYamlV0_1 is a Version 0.1 yaml document representing configStruct
|
||||||
@ -61,6 +73,12 @@ auth:
|
|||||||
silly:
|
silly:
|
||||||
realm: silly
|
realm: silly
|
||||||
service: silly
|
service: silly
|
||||||
|
notifications:
|
||||||
|
endpoints:
|
||||||
|
- name: endpoint-1
|
||||||
|
url: http://example.com
|
||||||
|
headers:
|
||||||
|
Authorization: [Bearer <example>]
|
||||||
reporting:
|
reporting:
|
||||||
bugsnag:
|
bugsnag:
|
||||||
apikey: BugsnagApiKey
|
apikey: BugsnagApiKey
|
||||||
@ -76,6 +94,12 @@ auth:
|
|||||||
silly:
|
silly:
|
||||||
realm: silly
|
realm: silly
|
||||||
service: silly
|
service: silly
|
||||||
|
notifications:
|
||||||
|
endpoints:
|
||||||
|
- name: endpoint-1
|
||||||
|
url: http://example.com
|
||||||
|
headers:
|
||||||
|
Authorization: [Bearer <example>]
|
||||||
`
|
`
|
||||||
|
|
||||||
type ConfigSuite struct {
|
type ConfigSuite struct {
|
||||||
@ -129,6 +153,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *C) {
|
|||||||
suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}}
|
suite.expectedConfig.Storage = Storage{"filesystem": Parameters{"rootdirectory": "/tmp/testroot"}}
|
||||||
suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}}
|
suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}}
|
||||||
suite.expectedConfig.Reporting = Reporting{}
|
suite.expectedConfig.Reporting = Reporting{}
|
||||||
|
suite.expectedConfig.Notifications = Notifications{}
|
||||||
|
|
||||||
os.Setenv("REGISTRY_STORAGE", "filesystem")
|
os.Setenv("REGISTRY_STORAGE", "filesystem")
|
||||||
os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot")
|
os.Setenv("REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY", "/tmp/testroot")
|
||||||
@ -292,5 +317,10 @@ func copyConfig(config Configuration) *Configuration {
|
|||||||
configCopy.Auth.setParameter(k, v)
|
configCopy.Auth.setParameter(k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
configCopy.Notifications = Notifications{Endpoints: []Endpoint{}}
|
||||||
|
for _, v := range config.Notifications.Endpoints {
|
||||||
|
configCopy.Notifications.Endpoints = append(configCopy.Notifications.Endpoints, v)
|
||||||
|
}
|
||||||
|
|
||||||
return configCopy
|
return configCopy
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/libtrust"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Versioned provides a struct with just the manifest schemaVersion. Incoming
|
// Versioned provides a struct with just the manifest schemaVersion. Incoming
|
||||||
@ -62,6 +63,31 @@ func (sm *SignedManifest) UnmarshalJSON(b []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Payload returns the raw, signed content of the signed manifest. The
|
||||||
|
// contents can be used to calculate the content identifier.
|
||||||
|
func (sm *SignedManifest) Payload() ([]byte, error) {
|
||||||
|
jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the payload in the manifest.
|
||||||
|
return jsig.Payload()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signatures returns the signatures as provided by
|
||||||
|
// (*libtrust.JSONSignature).Signatures. The byte slices are opaque jws
|
||||||
|
// signatures.
|
||||||
|
func (sm *SignedManifest) Signatures() ([][]byte, error) {
|
||||||
|
jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the payload in the manifest.
|
||||||
|
return jsig.Signatures()
|
||||||
|
}
|
||||||
|
|
||||||
// MarshalJSON returns the contents of raw. If Raw is nil, marshals the inner
|
// MarshalJSON returns the contents of raw. If Raw is nil, marshals the inner
|
||||||
// contents. Applications requiring a marshaled signed manifest should simply
|
// contents. Applications requiring a marshaled signed manifest should simply
|
||||||
// use Raw directly, since the the content produced by json.Marshal will be
|
// use Raw directly, since the the content produced by json.Marshal will be
|
||||||
|
119
registry/app.go
119
registry/app.go
@ -2,16 +2,19 @@ package registry
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"code.google.com/p/go-uuid/uuid"
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/distribution/api/v2"
|
"github.com/docker/distribution/api/v2"
|
||||||
"github.com/docker/distribution/auth"
|
"github.com/docker/distribution/auth"
|
||||||
"github.com/docker/distribution/configuration"
|
"github.com/docker/distribution/configuration"
|
||||||
"github.com/docker/distribution/storage"
|
"github.com/docker/distribution/storage"
|
||||||
|
"github.com/docker/distribution/storage/notifications"
|
||||||
"github.com/docker/distribution/storagedriver"
|
"github.com/docker/distribution/storagedriver"
|
||||||
"github.com/docker/distribution/storagedriver/factory"
|
"github.com/docker/distribution/storagedriver/factory"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,17 +24,22 @@ import (
|
|||||||
type App struct {
|
type App struct {
|
||||||
Config configuration.Configuration
|
Config configuration.Configuration
|
||||||
|
|
||||||
router *mux.Router
|
// InstanceID is a unique id assigned to the application on each creation.
|
||||||
|
// Provides information in the logs and context to identify restarts.
|
||||||
|
InstanceID string
|
||||||
|
|
||||||
// driver maintains the app global storage driver instance.
|
router *mux.Router // main application router, configured with dispatchers
|
||||||
driver storagedriver.StorageDriver
|
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
|
||||||
|
registry storage.Registry // registry is the primary registry backend for the app instance.
|
||||||
|
accessController auth.AccessController // main access controller for application
|
||||||
|
|
||||||
// registry is the primary registry backend for the app instance.
|
// events contains notification related configuration.
|
||||||
registry storage.Registry
|
events struct {
|
||||||
|
sink notifications.Sink
|
||||||
|
source notifications.SourceRecord
|
||||||
|
}
|
||||||
|
|
||||||
layerHandler storage.LayerHandler
|
layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider
|
||||||
|
|
||||||
accessController auth.AccessController
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApp takes a configuration and returns a configured app, ready to serve
|
// NewApp takes a configuration and returns a configured app, ready to serve
|
||||||
@ -39,8 +47,9 @@ type App struct {
|
|||||||
// handlers accordingly.
|
// handlers accordingly.
|
||||||
func NewApp(configuration configuration.Configuration) *App {
|
func NewApp(configuration configuration.Configuration) *App {
|
||||||
app := &App{
|
app := &App{
|
||||||
Config: configuration,
|
Config: configuration,
|
||||||
router: v2.Router(),
|
InstanceID: uuid.New(),
|
||||||
|
router: v2.Router(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the handler dispatchers.
|
// Register the handler dispatchers.
|
||||||
@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||||||
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
|
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
|
||||||
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
|
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
|
||||||
|
|
||||||
driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
|
var err error
|
||||||
|
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(stevvooe): Move the creation of a service into a protected
|
// TODO(stevvooe): Move the creation of a service into a protected
|
||||||
@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
app.driver = driver
|
app.configureEvents(&configuration)
|
||||||
app.registry = storage.NewRegistryWithDriver(app.driver)
|
app.registry = storage.NewRegistryWithDriver(app.driver)
|
||||||
authType := configuration.Auth.Type()
|
authType := configuration.Auth.Type()
|
||||||
|
|
||||||
@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||||||
layerHandlerType := configuration.LayerHandler.Type()
|
layerHandlerType := configuration.LayerHandler.Type()
|
||||||
|
|
||||||
if layerHandlerType != "" {
|
if layerHandlerType != "" {
|
||||||
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver)
|
lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
|
panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err))
|
||||||
}
|
}
|
||||||
@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App {
|
|||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// Set a header with the Docker Distribution API Version for all responses.
|
|
||||||
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
|
|
||||||
app.router.ServeHTTP(w, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// register a handler with the application, by route name. The handler will be
|
// register a handler with the application, by route name. The handler will be
|
||||||
// passed through the application filters and context will be constructed at
|
// passed through the application filters and context will be constructed at
|
||||||
// request time.
|
// request time.
|
||||||
@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) {
|
|||||||
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
|
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// configureEvents prepares the event sink for action.
|
||||||
|
func (app *App) configureEvents(configuration *configuration.Configuration) {
|
||||||
|
// Configure all of the endpoint sinks.
|
||||||
|
var sinks []notifications.Sink
|
||||||
|
for _, endpoint := range configuration.Notifications.Endpoints {
|
||||||
|
if endpoint.Disabled {
|
||||||
|
log.Infof("endpoint %s disabled, skipping", endpoint.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
|
||||||
|
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
|
||||||
|
Timeout: endpoint.Timeout,
|
||||||
|
Threshold: endpoint.Threshold,
|
||||||
|
Backoff: endpoint.Backoff,
|
||||||
|
Headers: endpoint.Headers,
|
||||||
|
})
|
||||||
|
|
||||||
|
sinks = append(sinks, endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
|
||||||
|
// replacing broadcaster with a rabbitmq implementation. It's recommended
|
||||||
|
// that the registry instances also act as the workers to keep deployment
|
||||||
|
// simple.
|
||||||
|
app.events.sink = notifications.NewBroadcaster(sinks...)
|
||||||
|
|
||||||
|
// Populate registry event source
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
hostname = configuration.HTTP.Addr
|
||||||
|
} else {
|
||||||
|
// try to pick the port off the config
|
||||||
|
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
|
||||||
|
if err == nil {
|
||||||
|
hostname = net.JoinHostPort(hostname, port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
app.events.source = notifications.SourceRecord{
|
||||||
|
Addr: hostname,
|
||||||
|
InstanceID: app.InstanceID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
defer r.Body.Close() // ensure that request body is always closed.
|
||||||
|
|
||||||
|
// Set a header with the Docker Distribution API Version for all responses.
|
||||||
|
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
|
||||||
|
app.router.ServeHTTP(w, r)
|
||||||
|
}
|
||||||
|
|
||||||
// dispatchFunc takes a context and request and returns a constructed handler
|
// dispatchFunc takes a context and request and returns a constructed handler
|
||||||
// for the route. The dispatcher will use this to dynamically create request
|
// for the route. The dispatcher will use this to dynamically create request
|
||||||
// specific handlers for each endpoint without creating a new router for each
|
// specific handlers for each endpoint without creating a new router for each
|
||||||
@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decorate the authorized repository with an event bridge.
|
||||||
|
context.Repository = notifications.Listen(
|
||||||
|
context.Repository, app.eventBridge(context, r))
|
||||||
|
|
||||||
context.log = log.WithField("name", context.Repository.Name())
|
context.log = log.WithField("name", context.Repository.Name())
|
||||||
handler := dispatch(context, r)
|
handler := dispatch(context, r)
|
||||||
|
|
||||||
ssrw := &singleStatusResponseWriter{ResponseWriter: w}
|
ssrw := &singleStatusResponseWriter{ResponseWriter: w}
|
||||||
context.log.Infoln("handler", resolveHandlerName(r.Method, handler))
|
|
||||||
handler.ServeHTTP(ssrw, r)
|
handler.ServeHTTP(ssrw, r)
|
||||||
|
|
||||||
// Automated error response handling here. Handlers may return their
|
// Automated error response handling here. Handlers may return their
|
||||||
@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context {
|
|||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
context := &Context{
|
context := &Context{
|
||||||
App: app,
|
App: app,
|
||||||
|
RequestID: uuid.New(),
|
||||||
urlBuilder: v2.NewURLBuilderFromRequest(r),
|
urlBuilder: v2.NewURLBuilderFromRequest(r),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -268,6 +329,20 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eventBridge returns a bridge for the current request, configured with the
|
||||||
|
// correct actor and source.
|
||||||
|
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
|
||||||
|
// TODO(stevvooe): Need to extract user data from request context using
|
||||||
|
// auth system. Would prefer to do this during logging refactor and
|
||||||
|
// addition of user and google context type.
|
||||||
|
actor := notifications.ActorRecord{
|
||||||
|
Name: "--todo--",
|
||||||
|
}
|
||||||
|
request := notifications.NewRequestRecord(ctx.RequestID, r)
|
||||||
|
|
||||||
|
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink)
|
||||||
|
}
|
||||||
|
|
||||||
// apiBase implements a simple yes-man for doing overall checks against the
|
// apiBase implements a simple yes-man for doing overall checks against the
|
||||||
// api. This can support auth roundtrips to support docker login.
|
// api. This can support auth roundtrips to support docker login.
|
||||||
func apiBase(w http.ResponseWriter, r *http.Request) {
|
func apiBase(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -13,6 +13,9 @@ type Context struct {
|
|||||||
// App points to the application structure that created this context.
|
// App points to the application structure that created this context.
|
||||||
*App
|
*App
|
||||||
|
|
||||||
|
// RequestID is the unique id of the request.
|
||||||
|
RequestID string
|
||||||
|
|
||||||
// Repository is the repository for the current request. All requests
|
// Repository is the repository for the current request. All requests
|
||||||
// should be scoped to a single repository. This field may be nil.
|
// should be scoped to a single repository. This field may be nil.
|
||||||
Repository storage.Repository
|
Repository storage.Repository
|
||||||
|
@ -1,27 +0,0 @@
|
|||||||
package registry
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"reflect"
|
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"github.com/gorilla/handlers"
|
|
||||||
)
|
|
||||||
|
|
||||||
// functionName returns the name of the function fn.
|
|
||||||
func functionName(fn interface{}) string {
|
|
||||||
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
|
|
||||||
}
|
|
||||||
|
|
||||||
// resolveHandlerName attempts to resolve a nice, pretty name for the passed
|
|
||||||
// in handler.
|
|
||||||
func resolveHandlerName(method string, handler http.Handler) string {
|
|
||||||
switch v := handler.(type) {
|
|
||||||
case handlers.MethodHandler:
|
|
||||||
return functionName(v[method])
|
|
||||||
case http.HandlerFunc:
|
|
||||||
return functionName(v)
|
|
||||||
default:
|
|
||||||
return functionName(handler.ServeHTTP)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,185 +0,0 @@
|
|||||||
package decorator
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/docker/distribution/digest"
|
|
||||||
"github.com/docker/distribution/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Decorator provides an interface for intercepting object creation within a
|
|
||||||
// registry. The single method accepts an registry storage object, such as a
|
|
||||||
// Layer, optionally replacing it upon with an alternative object or a
|
|
||||||
// wrapper.
|
|
||||||
//
|
|
||||||
// For example, if one wants to intercept the instantiation of a layer, an
|
|
||||||
// implementation might be as follows:
|
|
||||||
//
|
|
||||||
// func (md *DecoratorImplementation) Decorate(v interface{}) interface{} {
|
|
||||||
// switch v := v.(type) {
|
|
||||||
// case Layer:
|
|
||||||
// return wrapLayer(v)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Make sure to return the object or nil if the decorator doesn't require
|
|
||||||
// // replacement.
|
|
||||||
// return v
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// Such a decorator can be used to intercept calls to support implementing
|
|
||||||
// complex features outside of the storage package.
|
|
||||||
type Decorator interface {
|
|
||||||
Decorate(v interface{}) interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Func provides a shortcut handler for decorators that only need a
|
|
||||||
// function. Use is similar to http.HandlerFunc.
|
|
||||||
type Func func(v interface{}) interface{}
|
|
||||||
|
|
||||||
// Decorate allows DecoratorFunc to implement the Decorator interface.
|
|
||||||
func (df Func) Decorate(v interface{}) interface{} {
|
|
||||||
return df(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DecorateRegistry the provided registry with decorator. Registries may be
|
|
||||||
// decorated multiple times.
|
|
||||||
func DecorateRegistry(registry storage.Registry, decorator Decorator) storage.Registry {
|
|
||||||
return ®istryDecorator{
|
|
||||||
Registry: registry,
|
|
||||||
decorator: decorator,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// registryDecorator intercepts registry object creation with a decorator.
|
|
||||||
type registryDecorator struct {
|
|
||||||
storage.Registry
|
|
||||||
decorator Decorator
|
|
||||||
}
|
|
||||||
|
|
||||||
// Repository overrides the method of the same name on the Registry, replacing
|
|
||||||
// the returned instance with a decorator.
|
|
||||||
func (rd *registryDecorator) Repository(name string) storage.Repository {
|
|
||||||
delegate := rd.Registry.Repository(name)
|
|
||||||
decorated := rd.decorator.Decorate(delegate)
|
|
||||||
if decorated != nil {
|
|
||||||
repository, ok := decorated.(storage.Repository)
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
delegate = repository
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &repositoryDecorator{
|
|
||||||
Repository: delegate,
|
|
||||||
decorator: rd.decorator,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// repositoryDecorator decorates a repository, intercepting calls to Layers
|
|
||||||
// and Manifests with injected variants.
|
|
||||||
type repositoryDecorator struct {
|
|
||||||
storage.Repository
|
|
||||||
decorator Decorator
|
|
||||||
}
|
|
||||||
|
|
||||||
// Layers overrides the Layers method of Repository.
|
|
||||||
func (rd *repositoryDecorator) Layers() storage.LayerService {
|
|
||||||
delegate := rd.Repository.Layers()
|
|
||||||
decorated := rd.decorator.Decorate(delegate)
|
|
||||||
|
|
||||||
if decorated != nil {
|
|
||||||
layers, ok := decorated.(storage.LayerService)
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
delegate = layers
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &layerServiceDecorator{
|
|
||||||
LayerService: delegate,
|
|
||||||
decorator: rd.decorator,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manifests overrides the Manifests method of Repository.
|
|
||||||
func (rd *repositoryDecorator) Manifests() storage.ManifestService {
|
|
||||||
delegate := rd.Repository.Manifests()
|
|
||||||
decorated := rd.decorator.Decorate(delegate)
|
|
||||||
|
|
||||||
if decorated != nil {
|
|
||||||
manifests, ok := decorated.(storage.ManifestService)
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
delegate = manifests
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE(stevvooe): We do not have to intercept delegate calls to the
|
|
||||||
// manifest service since it doesn't produce any interfaces for which
|
|
||||||
// interception is supported.
|
|
||||||
return delegate
|
|
||||||
}
|
|
||||||
|
|
||||||
// layerServiceDecorator intercepts calls that generate Layer and LayerUpload
|
|
||||||
// instances, replacing them with instances from the decorator.
|
|
||||||
type layerServiceDecorator struct {
|
|
||||||
storage.LayerService
|
|
||||||
decorator Decorator
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch overrides the Fetch method of LayerService.
|
|
||||||
func (lsd *layerServiceDecorator) Fetch(digest digest.Digest) (storage.Layer, error) {
|
|
||||||
delegate, err := lsd.LayerService.Fetch(digest)
|
|
||||||
return decorateLayer(lsd.decorator, delegate), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Upload overrides the Upload method of LayerService.
|
|
||||||
func (lsd *layerServiceDecorator) Upload() (storage.LayerUpload, error) {
|
|
||||||
delegate, err := lsd.LayerService.Upload()
|
|
||||||
return decorateLayerUpload(lsd.decorator, delegate), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resume overrides the Resume method of LayerService.
|
|
||||||
func (lsd *layerServiceDecorator) Resume(uuid string) (storage.LayerUpload, error) {
|
|
||||||
delegate, err := lsd.LayerService.Resume(uuid)
|
|
||||||
return decorateLayerUpload(lsd.decorator, delegate), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// layerUploadDecorator intercepts calls that generate Layer instances,
|
|
||||||
// replacing them with instances from the decorator.
|
|
||||||
type layerUploadDecorator struct {
|
|
||||||
storage.LayerUpload
|
|
||||||
decorator Decorator
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lud *layerUploadDecorator) Finish(dgst digest.Digest) (storage.Layer, error) {
|
|
||||||
delegate, err := lud.LayerUpload.Finish(dgst)
|
|
||||||
return decorateLayer(lud.decorator, delegate), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// decorateLayer guarantees that a layer gets correctly decorated.
|
|
||||||
func decorateLayer(decorator Decorator, delegate storage.Layer) storage.Layer {
|
|
||||||
decorated := decorator.Decorate(delegate)
|
|
||||||
if decorated != nil {
|
|
||||||
layer, ok := decorated.(storage.Layer)
|
|
||||||
if ok {
|
|
||||||
delegate = layer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return delegate
|
|
||||||
}
|
|
||||||
|
|
||||||
// decorateLayerUpload guarantees that an upload gets correctly decorated.
|
|
||||||
func decorateLayerUpload(decorator Decorator, delegate storage.LayerUpload) storage.LayerUpload {
|
|
||||||
decorated := decorator.Decorate(delegate)
|
|
||||||
if decorated != nil {
|
|
||||||
layerUpload, ok := decorated.(storage.LayerUpload)
|
|
||||||
if ok {
|
|
||||||
delegate = layerUpload
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &layerUploadDecorator{
|
|
||||||
LayerUpload: delegate,
|
|
||||||
decorator: decorator,
|
|
||||||
}
|
|
||||||
}
|
|
@ -94,6 +94,11 @@ func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error {
|
func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error {
|
||||||
|
// TODO(stevvooe): Add check here to see if the revision is already
|
||||||
|
// present in the repository. If it is, we should merge the signatures, do
|
||||||
|
// a shallow verify (or a full one, doesn't matter) and return an error
|
||||||
|
// indicating what happened.
|
||||||
|
|
||||||
// Verify the manifest.
|
// Verify the manifest.
|
||||||
if err := ms.verifyManifest(tag, manifest); err != nil {
|
if err := ms.verifyManifest(tag, manifest); err != nil {
|
||||||
return err
|
return err
|
||||||
|
156
storage/notifications/bridge.go
Normal file
156
storage/notifications/bridge.go
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/manifest"
|
||||||
|
|
||||||
|
"code.google.com/p/go-uuid/uuid"
|
||||||
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type bridge struct {
|
||||||
|
ub URLBuilder
|
||||||
|
actor ActorRecord
|
||||||
|
source SourceRecord
|
||||||
|
request RequestRecord
|
||||||
|
sink Sink
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Listener = &bridge{}
|
||||||
|
|
||||||
|
// URLBuilder defines a subset of url builder to be used by the event listener.
|
||||||
|
type URLBuilder interface {
|
||||||
|
BuildManifestURL(name, tag string) (string, error)
|
||||||
|
BuildBlobURL(name string, dgst digest.Digest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBridge returns a notification listener that writes records to sink,
|
||||||
|
// using the actor and source. Any urls populated in the events created by
|
||||||
|
// this bridge will be created using the URLBuilder.
|
||||||
|
// TODO(stevvooe): Update this to simply take a context.Context object.
|
||||||
|
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener {
|
||||||
|
return &bridge{
|
||||||
|
ub: ub,
|
||||||
|
actor: actor,
|
||||||
|
source: source,
|
||||||
|
request: request,
|
||||||
|
sink: sink,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRequestRecord builds a RequestRecord for use in NewBridge from an
|
||||||
|
// http.Request, associating it with a request id.
|
||||||
|
func NewRequestRecord(id string, r *http.Request) RequestRecord {
|
||||||
|
return RequestRecord{
|
||||||
|
ID: id,
|
||||||
|
Addr: r.RemoteAddr,
|
||||||
|
Host: r.Host,
|
||||||
|
Method: r.Method,
|
||||||
|
UserAgent: r.UserAgent(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
return b.createManifestEventAndWrite(EventActionPush, repo, sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
event, err := b.createManifestEvent(action, repo, sm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.sink.Write(*event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) {
|
||||||
|
event := b.createEvent(action)
|
||||||
|
event.Target.Type = EventTargetTypeManifest
|
||||||
|
event.Target.Name = repo.Name()
|
||||||
|
event.Target.Tag = sm.Tag
|
||||||
|
|
||||||
|
p, err := sm.Payload()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
event.Target.Digest, err = digest.FromBytes(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is
|
||||||
|
// implemented, this should be replaced.
|
||||||
|
event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error {
|
||||||
|
event, err := b.createLayerEvent(action, repo, dgst)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.sink.Write(*event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) {
|
||||||
|
event := b.createEvent(action)
|
||||||
|
event.Target.Type = EventTargetTypeBlob
|
||||||
|
event.Target.Name = repo.Name()
|
||||||
|
event.Target.Digest = dgst
|
||||||
|
|
||||||
|
var err error
|
||||||
|
event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createEvent creates an event with actor and source populated.
|
||||||
|
func (b *bridge) createEvent(action string) *Event {
|
||||||
|
event := createEvent(action)
|
||||||
|
event.Source = b.source
|
||||||
|
event.Actor = b.actor
|
||||||
|
event.Request = b.request
|
||||||
|
|
||||||
|
return event
|
||||||
|
}
|
||||||
|
|
||||||
|
// createEvent returns a new event, timestamped, with the specified action.
|
||||||
|
func createEvent(action string) *Event {
|
||||||
|
return &Event{
|
||||||
|
ID: uuid.New(),
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Action: action,
|
||||||
|
}
|
||||||
|
}
|
86
storage/notifications/endpoint.go
Normal file
86
storage/notifications/endpoint.go
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EndpointConfig covers the optional configuration parameters for an active
|
||||||
|
// endpoint.
|
||||||
|
type EndpointConfig struct {
|
||||||
|
Headers http.Header
|
||||||
|
Timeout time.Duration
|
||||||
|
Threshold int
|
||||||
|
Backoff time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaults set any zero-valued fields to a reasonable default.
|
||||||
|
func (ec *EndpointConfig) defaults() {
|
||||||
|
if ec.Timeout <= 0 {
|
||||||
|
ec.Timeout = time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
if ec.Threshold <= 0 {
|
||||||
|
ec.Threshold = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
if ec.Backoff <= 0 {
|
||||||
|
ec.Backoff = time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Endpoint is a reliable, queued, thread-safe sink that notify external http
|
||||||
|
// services when events are written. Writes are non-blocking and always
|
||||||
|
// succeed for callers but events may be queued internally.
|
||||||
|
type Endpoint struct {
|
||||||
|
Sink
|
||||||
|
url string
|
||||||
|
name string
|
||||||
|
|
||||||
|
EndpointConfig
|
||||||
|
|
||||||
|
metrics *safeMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEndpoint returns a running endpoint, ready to receive events.
|
||||||
|
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
||||||
|
var endpoint Endpoint
|
||||||
|
endpoint.name = name
|
||||||
|
endpoint.url = url
|
||||||
|
endpoint.EndpointConfig = config
|
||||||
|
endpoint.defaults()
|
||||||
|
endpoint.metrics = newSafeMetrics()
|
||||||
|
|
||||||
|
// Configures the inmemory queue, retry, http pipeline.
|
||||||
|
endpoint.Sink = newHTTPSink(
|
||||||
|
endpoint.url, endpoint.Timeout, endpoint.Headers,
|
||||||
|
endpoint.metrics.httpStatusListener())
|
||||||
|
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
|
||||||
|
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
||||||
|
|
||||||
|
register(&endpoint)
|
||||||
|
return &endpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the name of the endpoint, generally used for debugging.
|
||||||
|
func (e *Endpoint) Name() string {
|
||||||
|
return e.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// URL returns the url of the endpoint.
|
||||||
|
func (e *Endpoint) URL() string {
|
||||||
|
return e.url
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadMetrics populates em with metrics from the endpoint.
|
||||||
|
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
|
||||||
|
e.metrics.Lock()
|
||||||
|
defer e.metrics.Unlock()
|
||||||
|
|
||||||
|
*em = e.metrics.EndpointMetrics
|
||||||
|
// Map still need to copied in a threadsafe manner.
|
||||||
|
em.Statuses = make(map[string]int)
|
||||||
|
for k, v := range e.metrics.Statuses {
|
||||||
|
em.Statuses[k] = v
|
||||||
|
}
|
||||||
|
}
|
154
storage/notifications/event.go
Normal file
154
storage/notifications/event.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/digest"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventAction constants used in action field of Event.
|
||||||
|
const (
|
||||||
|
EventActionPull = "pull"
|
||||||
|
EventActionPush = "push"
|
||||||
|
EventActionDelete = "delete"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventTargetType constants used in Target section of Event.
|
||||||
|
const (
|
||||||
|
EventTargetTypeManifest = "manifest"
|
||||||
|
EventTargetTypeBlob = "blob"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventsMediaType is the mediatype for the json event envelope. If the Event,
|
||||||
|
// ActorRecord, SourceRecord or Envelope structs change, the version number
|
||||||
|
// should be incremented.
|
||||||
|
const EventsMediaType = "application/vnd.docker.distribution.events.v1+json"
|
||||||
|
|
||||||
|
// Envelope defines the fields of a json event envelope message that can hold
|
||||||
|
// one or more events.
|
||||||
|
type Envelope struct {
|
||||||
|
// Events make up the contents of the envelope. Events present in a single
|
||||||
|
// envelope are not necessarily related.
|
||||||
|
Events []Event `json:"events,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): The event type should be separate from the json format. It
|
||||||
|
// should be defined as an interface. Leaving as is for now since we don't
|
||||||
|
// need that at this time. If we make this change, the struct below would be
|
||||||
|
// called "EventRecord".
|
||||||
|
|
||||||
|
// Event provides the fields required to describe a registry event.
|
||||||
|
type Event struct {
|
||||||
|
// ID provides a unique identifier for the event.
|
||||||
|
ID string `json:"id,omitempty"`
|
||||||
|
|
||||||
|
// Timestamp is the time at which the event occurred.
|
||||||
|
Timestamp time.Time `json:"timestamp,omitempty"`
|
||||||
|
|
||||||
|
// Action indicates what action encompasses the provided event.
|
||||||
|
Action string `json:"action,omitempty"`
|
||||||
|
|
||||||
|
// Target uniquely describes the target of the event.
|
||||||
|
Target struct {
|
||||||
|
// Type should be "manifest" or "blob"
|
||||||
|
Type string `json:"type,omitempty"`
|
||||||
|
|
||||||
|
// Name identifies the named repository.
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
|
// Digest should identify the object in the repository.
|
||||||
|
Digest digest.Digest `json:"digest,omitempty"`
|
||||||
|
|
||||||
|
// Tag is present if the operation involved a tagged manifest.
|
||||||
|
Tag string `json:"tag,omitempty"`
|
||||||
|
|
||||||
|
// URL provides a link to the content on the relevant repository instance.
|
||||||
|
URL string `json:"url,omitempty"`
|
||||||
|
} `json:"target,omitempty"`
|
||||||
|
|
||||||
|
// Request covers the request that generated the event.
|
||||||
|
Request RequestRecord `json:"request,omitempty"`
|
||||||
|
|
||||||
|
// Actor specifies the agent that initiated the event. For most
|
||||||
|
// situations, this could be from the authorizaton context of the request.
|
||||||
|
Actor ActorRecord `json:"actor,omitempty"`
|
||||||
|
|
||||||
|
// Source identifies the registry node that generated the event. Put
|
||||||
|
// differently, while the actor "initiates" the event, the source
|
||||||
|
// "generates" it.
|
||||||
|
Source SourceRecord `json:"source,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ActorRecord specifies the agent that initiated the event. For most
|
||||||
|
// situations, this could be from the authorizaton context of the request.
|
||||||
|
// Data in this record can refer to both the initiating client and the
|
||||||
|
// generating request.
|
||||||
|
type ActorRecord struct {
|
||||||
|
// Name corresponds to the subject or username associated with the
|
||||||
|
// request context that generated the event.
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
|
// TODO(stevvooe): Look into setting a session cookie to get this
|
||||||
|
// without docker daemon.
|
||||||
|
// SessionID
|
||||||
|
|
||||||
|
// TODO(stevvooe): Push the "Docker-Command" header to replace cookie and
|
||||||
|
// get the actual command.
|
||||||
|
// Command
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestRecord covers the request that generated the event.
|
||||||
|
type RequestRecord struct {
|
||||||
|
// ID uniquely identifies the request that initiated the event.
|
||||||
|
ID string `json:"id"`
|
||||||
|
|
||||||
|
// Addr contains the ip or hostname and possibly port of the client
|
||||||
|
// connection that initiated the event. This is the RemoteAddr from
|
||||||
|
// the standard http request.
|
||||||
|
Addr string `json:"addr,omitempty"`
|
||||||
|
|
||||||
|
// Host is the externally accessible host name of the registry instance,
|
||||||
|
// as specified by the http host header on incoming requests.
|
||||||
|
Host string `json:"host,omitempty"`
|
||||||
|
|
||||||
|
// Method has the request method that generated the event.
|
||||||
|
Method string `json:"method"`
|
||||||
|
|
||||||
|
// UserAgent contains the user agent header of the request.
|
||||||
|
UserAgent string `json:"useragent"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SourceRecord identifies the registry node that generated the event. Put
|
||||||
|
// differently, while the actor "initiates" the event, the source "generates"
|
||||||
|
// it.
|
||||||
|
type SourceRecord struct {
|
||||||
|
// Addr contains the ip or hostname and the port of the registry node
|
||||||
|
// that generated the event. Generally, this will be resolved by
|
||||||
|
// os.Hostname() along with the running port.
|
||||||
|
Addr string `json:"addr,omitempty"`
|
||||||
|
|
||||||
|
// InstanceID identifies a running instance of an application. Changes
|
||||||
|
// after each restart.
|
||||||
|
InstanceID string `json:"instanceID,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrSinkClosed is returned if a write is issued to a sink that has been
|
||||||
|
// closed. If encountered, the error should be considered terminal and
|
||||||
|
// retries will not be successful.
|
||||||
|
ErrSinkClosed = fmt.Errorf("sink: closed")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sink accepts and sends events.
|
||||||
|
type Sink interface {
|
||||||
|
// Write writes one or more events to the sink. If no error is returned,
|
||||||
|
// the caller will assume that all events have been committed and will not
|
||||||
|
// try to send them again. If an error is received, the caller may retry
|
||||||
|
// sending the event. The caller should cede the slice of memory to the
|
||||||
|
// sink and not modify it after calling this method.
|
||||||
|
Write(events ...Event) error
|
||||||
|
|
||||||
|
// Close the sink, possibly waiting for pending events to flush.
|
||||||
|
Close() error
|
||||||
|
}
|
145
storage/notifications/event_test.go
Normal file
145
storage/notifications/event_test.go
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestEventJSONFormat provides silly test to detect if the event format or
|
||||||
|
// envelope has changed. If this code fails, the revision of the protocol may
|
||||||
|
// need to be incremented.
|
||||||
|
func TestEventEnvelopeJSONFormat(t *testing.T) {
|
||||||
|
var expected = strings.TrimSpace(`
|
||||||
|
{
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"id": "asdf-asdf-asdf-asdf-0",
|
||||||
|
"timestamp": "2006-01-02T15:04:05Z",
|
||||||
|
"action": "push",
|
||||||
|
"target": {
|
||||||
|
"type": "manifest",
|
||||||
|
"name": "library/test",
|
||||||
|
"digest": "sha256:0123456789abcdef0",
|
||||||
|
"tag": "latest",
|
||||||
|
"url": "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
},
|
||||||
|
"request": {
|
||||||
|
"id": "asdfasdf",
|
||||||
|
"addr": "client.local",
|
||||||
|
"host": "registrycluster.local",
|
||||||
|
"method": "PUT",
|
||||||
|
"useragent": "test/0.1"
|
||||||
|
},
|
||||||
|
"actor": {
|
||||||
|
"name": "test-actor"
|
||||||
|
},
|
||||||
|
"source": {
|
||||||
|
"addr": "hostname.local:port"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "asdf-asdf-asdf-asdf-1",
|
||||||
|
"timestamp": "2006-01-02T15:04:05Z",
|
||||||
|
"action": "push",
|
||||||
|
"target": {
|
||||||
|
"type": "blob",
|
||||||
|
"name": "library/test",
|
||||||
|
"digest": "tarsum.v2+sha256:0123456789abcdef1",
|
||||||
|
"url": "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
},
|
||||||
|
"request": {
|
||||||
|
"id": "asdfasdf",
|
||||||
|
"addr": "client.local",
|
||||||
|
"host": "registrycluster.local",
|
||||||
|
"method": "PUT",
|
||||||
|
"useragent": "test/0.1"
|
||||||
|
},
|
||||||
|
"actor": {
|
||||||
|
"name": "test-actor"
|
||||||
|
},
|
||||||
|
"source": {
|
||||||
|
"addr": "hostname.local:port"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "asdf-asdf-asdf-asdf-2",
|
||||||
|
"timestamp": "2006-01-02T15:04:05Z",
|
||||||
|
"action": "push",
|
||||||
|
"target": {
|
||||||
|
"type": "blob",
|
||||||
|
"name": "library/test",
|
||||||
|
"digest": "tarsum.v2+sha256:0123456789abcdef2",
|
||||||
|
"url": "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
},
|
||||||
|
"request": {
|
||||||
|
"id": "asdfasdf",
|
||||||
|
"addr": "client.local",
|
||||||
|
"host": "registrycluster.local",
|
||||||
|
"method": "PUT",
|
||||||
|
"useragent": "test/0.1"
|
||||||
|
},
|
||||||
|
"actor": {
|
||||||
|
"name": "test-actor"
|
||||||
|
},
|
||||||
|
"source": {
|
||||||
|
"addr": "hostname.local:port"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
`)
|
||||||
|
|
||||||
|
tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error creating time: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var prototype Event
|
||||||
|
prototype.Action = "push"
|
||||||
|
prototype.Timestamp = tm
|
||||||
|
prototype.Actor.Name = "test-actor"
|
||||||
|
prototype.Request.ID = "asdfasdf"
|
||||||
|
prototype.Request.Addr = "client.local"
|
||||||
|
prototype.Request.Host = "registrycluster.local"
|
||||||
|
prototype.Request.Method = "PUT"
|
||||||
|
prototype.Request.UserAgent = "test/0.1"
|
||||||
|
prototype.Source.Addr = "hostname.local:port"
|
||||||
|
|
||||||
|
var manifestPush Event
|
||||||
|
manifestPush = prototype
|
||||||
|
manifestPush.ID = "asdf-asdf-asdf-asdf-0"
|
||||||
|
manifestPush.Target.Digest = "sha256:0123456789abcdef0"
|
||||||
|
manifestPush.Target.Type = EventTargetTypeManifest
|
||||||
|
manifestPush.Target.Name = "library/test"
|
||||||
|
manifestPush.Target.Tag = "latest"
|
||||||
|
manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
|
||||||
|
var layerPush0 Event
|
||||||
|
layerPush0 = prototype
|
||||||
|
layerPush0.ID = "asdf-asdf-asdf-asdf-1"
|
||||||
|
layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1"
|
||||||
|
layerPush0.Target.Type = EventTargetTypeBlob
|
||||||
|
layerPush0.Target.Name = "library/test"
|
||||||
|
layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
|
||||||
|
var layerPush1 Event
|
||||||
|
layerPush1 = prototype
|
||||||
|
layerPush1.ID = "asdf-asdf-asdf-asdf-2"
|
||||||
|
layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2"
|
||||||
|
layerPush1.Target.Type = EventTargetTypeBlob
|
||||||
|
layerPush1.Target.Name = "library/test"
|
||||||
|
layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"
|
||||||
|
|
||||||
|
var envelope Envelope
|
||||||
|
envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1)
|
||||||
|
|
||||||
|
p, err := json.MarshalIndent(envelope, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error marshaling envelope: %v", err)
|
||||||
|
}
|
||||||
|
if string(p) != expected {
|
||||||
|
t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected)
|
||||||
|
}
|
||||||
|
}
|
145
storage/notifications/http.go
Normal file
145
storage/notifications/http.go
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// httpSink implements a single-flight, http notification endpoint. This is
|
||||||
|
// very lightweight in that it only makes an attempt at an http request.
|
||||||
|
// Reliability should be provided by the caller.
|
||||||
|
type httpSink struct {
|
||||||
|
url string
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
closed bool
|
||||||
|
client *http.Client
|
||||||
|
listeners []httpStatusListener
|
||||||
|
|
||||||
|
// TODO(stevvooe): Allow one to configure the media type accepted by this
|
||||||
|
// sink and choose the serialization based on that.
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
|
||||||
|
// sinks for increased reliability.
|
||||||
|
func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
|
||||||
|
return &httpSink{
|
||||||
|
url: u,
|
||||||
|
listeners: listeners,
|
||||||
|
client: &http.Client{
|
||||||
|
Transport: &headerRoundTripper{
|
||||||
|
Transport: http.DefaultTransport.(*http.Transport),
|
||||||
|
headers: headers,
|
||||||
|
},
|
||||||
|
Timeout: timeout,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// httpStatusListener is called on various outcomes of sending notifications.
|
||||||
|
type httpStatusListener interface {
|
||||||
|
success(status int, events ...Event)
|
||||||
|
failure(status int, events ...Event)
|
||||||
|
err(err error, events ...Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept makes an attempt to notify the endpoint, returning an error if it
|
||||||
|
// fails. It is the caller's responsibility to retry on error. The events are
|
||||||
|
// accepted or rejected as a group.
|
||||||
|
func (hs *httpSink) Write(events ...Event) error {
|
||||||
|
hs.mu.Lock()
|
||||||
|
defer hs.mu.Unlock()
|
||||||
|
|
||||||
|
if hs.closed {
|
||||||
|
return ErrSinkClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
envelope := Envelope{
|
||||||
|
Events: events,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
|
||||||
|
// retry but we are going to do it to keep the code simple. It is likely
|
||||||
|
// we could change the event struct to manage its own buffer.
|
||||||
|
|
||||||
|
p, err := json.MarshalIndent(envelope, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
for _, listener := range hs.listeners {
|
||||||
|
listener.err(err, events...)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := bytes.NewReader(p)
|
||||||
|
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
|
||||||
|
if err != nil {
|
||||||
|
for _, listener := range hs.listeners {
|
||||||
|
listener.err(err, events...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("%v: error posting: %v", hs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The notifier will treat any 2xx or 3xx response as accepted by the
|
||||||
|
// endpoint.
|
||||||
|
switch {
|
||||||
|
case resp.StatusCode >= 200 && resp.StatusCode < 400:
|
||||||
|
for _, listener := range hs.listeners {
|
||||||
|
listener.success(resp.StatusCode, events...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): This is a little accepting: we may want to support
|
||||||
|
// unsupported media type responses with retries using the correct
|
||||||
|
// media type. There may also be cases that will never work.
|
||||||
|
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
for _, listener := range hs.listeners {
|
||||||
|
listener.failure(resp.StatusCode, events...)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the endpoint
|
||||||
|
func (hs *httpSink) Close() error {
|
||||||
|
hs.mu.Lock()
|
||||||
|
defer hs.mu.Unlock()
|
||||||
|
|
||||||
|
if hs.closed {
|
||||||
|
return fmt.Errorf("httpsink: already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
hs.closed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hs *httpSink) String() string {
|
||||||
|
return fmt.Sprintf("httpSink{%s}", hs.url)
|
||||||
|
}
|
||||||
|
|
||||||
|
type headerRoundTripper struct {
|
||||||
|
*http.Transport // must be transport to support CancelRequest
|
||||||
|
headers http.Header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
var nreq http.Request
|
||||||
|
nreq = *req
|
||||||
|
nreq.Header = make(http.Header)
|
||||||
|
|
||||||
|
merge := func(headers http.Header) {
|
||||||
|
for k, v := range headers {
|
||||||
|
nreq.Header[k] = append(nreq.Header[k], v...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
merge(req.Header)
|
||||||
|
merge(hrt.headers)
|
||||||
|
|
||||||
|
return hrt.Transport.RoundTrip(&nreq)
|
||||||
|
}
|
155
storage/notifications/http_test.go
Normal file
155
storage/notifications/http_test.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
|
||||||
|
// conditions, ensuring correct behavior.
|
||||||
|
func TestHTTPSink(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
defer r.Body.Close()
|
||||||
|
if r.Method != "POST" {
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
t.Fatalf("unexpected request method: %v", r.Method)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the content type and make sure it matches
|
||||||
|
contentType := r.Header.Get("Content-Type")
|
||||||
|
mediaType, _, err := mime.ParseMediaType(contentType)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if mediaType != EventsMediaType {
|
||||||
|
w.WriteHeader(http.StatusUnsupportedMediaType)
|
||||||
|
t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var envelope Envelope
|
||||||
|
dec := json.NewDecoder(r.Body)
|
||||||
|
if err := dec.Decode(&envelope); err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
t.Fatalf("error decoding request body: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let caller choose the status
|
||||||
|
status, err := strconv.Atoi(r.FormValue("status"))
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("error parsing status: %v", err)
|
||||||
|
|
||||||
|
// May just be empty, set status to 200
|
||||||
|
status = http.StatusOK
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(status)
|
||||||
|
}))
|
||||||
|
|
||||||
|
metrics := newSafeMetrics()
|
||||||
|
sink := newHTTPSink(server.URL, 0, nil,
|
||||||
|
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
|
||||||
|
|
||||||
|
var expectedMetrics EndpointMetrics
|
||||||
|
expectedMetrics.Statuses = make(map[string]int)
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
events []Event // events to send
|
||||||
|
url string
|
||||||
|
failure bool // true if there should be a failure.
|
||||||
|
statusCode int // if not set, no status code should be incremented.
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
statusCode: http.StatusOK,
|
||||||
|
events: []Event{
|
||||||
|
createTestEvent("push", "library/test", "manifest")},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
statusCode: http.StatusOK,
|
||||||
|
events: []Event{
|
||||||
|
createTestEvent("push", "library/test", "manifest"),
|
||||||
|
createTestEvent("push", "library/test", "layer"),
|
||||||
|
createTestEvent("push", "library/test", "layer"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
statusCode: http.StatusTemporaryRedirect,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
statusCode: http.StatusBadRequest,
|
||||||
|
failure: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Case where connection never goes through.
|
||||||
|
url: "http://shoudlntresolve/",
|
||||||
|
failure: true,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
|
||||||
|
if tc.failure {
|
||||||
|
expectedMetrics.Failures += len(tc.events)
|
||||||
|
} else {
|
||||||
|
expectedMetrics.Successes += len(tc.events)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.statusCode > 0 {
|
||||||
|
expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events)
|
||||||
|
}
|
||||||
|
|
||||||
|
url := tc.url
|
||||||
|
if url == "" {
|
||||||
|
url = server.URL + "/"
|
||||||
|
}
|
||||||
|
// setup endpoint to respond with expected status code.
|
||||||
|
url += fmt.Sprintf("?status=%v", tc.statusCode)
|
||||||
|
sink.url = url
|
||||||
|
|
||||||
|
t.Logf("testcase: %v, fail=%v", url, tc.failure)
|
||||||
|
// Try a simple event emission.
|
||||||
|
err := sink.Write(tc.events...)
|
||||||
|
|
||||||
|
if !tc.failure {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error send event: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("the endpoint should have rejected the request")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {
|
||||||
|
t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sink.Close(); err != nil {
|
||||||
|
t.Fatalf("unexpected error closing http sink: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// double close returns error
|
||||||
|
if err := sink.Close(); err == nil {
|
||||||
|
t.Fatalf("second close should have returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestEvent(action, repo, typ string) Event {
|
||||||
|
event := createEvent(action)
|
||||||
|
|
||||||
|
event.Target.Type = typ
|
||||||
|
event.Target.Name = repo
|
||||||
|
|
||||||
|
return *event
|
||||||
|
}
|
140
storage/notifications/listener.go
Normal file
140
storage/notifications/listener.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/manifest"
|
||||||
|
"github.com/docker/distribution/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ManifestListener describes a set of methods for listening to events related to manifests.
|
||||||
|
type ManifestListener interface {
|
||||||
|
ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error
|
||||||
|
ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error
|
||||||
|
|
||||||
|
// TODO(stevvooe): Please note that delete support is still a little shaky
|
||||||
|
// and we'll need to propagate these in the future.
|
||||||
|
|
||||||
|
ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// LayerListener describes a listener that can respond to layer related events.
|
||||||
|
type LayerListener interface {
|
||||||
|
LayerPushed(repo storage.Repository, layer storage.Layer) error
|
||||||
|
LayerPulled(repo storage.Repository, layer storage.Layer) error
|
||||||
|
|
||||||
|
// TODO(stevvooe): Please note that delete support is still a little shaky
|
||||||
|
// and we'll need to propagate these in the future.
|
||||||
|
|
||||||
|
LayerDeleted(repo storage.Repository, layer storage.Layer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listener combines all repository events into a single interface.
|
||||||
|
type Listener interface {
|
||||||
|
ManifestListener
|
||||||
|
LayerListener
|
||||||
|
}
|
||||||
|
|
||||||
|
type repositoryListener struct {
|
||||||
|
storage.Repository
|
||||||
|
listener Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listen dispatches events on the repository to the listener.
|
||||||
|
func Listen(repo storage.Repository, listener Listener) storage.Repository {
|
||||||
|
return &repositoryListener{
|
||||||
|
Repository: repo,
|
||||||
|
listener: listener,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *repositoryListener) Manifests() storage.ManifestService {
|
||||||
|
return &manifestServiceListener{
|
||||||
|
ManifestService: rl.Repository.Manifests(),
|
||||||
|
parent: rl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *repositoryListener) Layers() storage.LayerService {
|
||||||
|
return &layerServiceListener{
|
||||||
|
LayerService: rl.Repository.Layers(),
|
||||||
|
parent: rl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type manifestServiceListener struct {
|
||||||
|
storage.ManifestService
|
||||||
|
parent *repositoryListener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) {
|
||||||
|
sm, err := msl.ManifestService.Get(tag)
|
||||||
|
if err == nil {
|
||||||
|
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil {
|
||||||
|
logrus.Errorf("error dispatching manifest pull to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sm, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error {
|
||||||
|
err := msl.ManifestService.Put(tag, sm)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil {
|
||||||
|
logrus.Errorf("error dispatching manifest push to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type layerServiceListener struct {
|
||||||
|
storage.LayerService
|
||||||
|
parent *repositoryListener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) {
|
||||||
|
layer, err := lsl.LayerService.Fetch(dgst)
|
||||||
|
if err == nil {
|
||||||
|
if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil {
|
||||||
|
logrus.Errorf("error dispatching layer pull to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return layer, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) {
|
||||||
|
lu, err := lsl.LayerService.Upload()
|
||||||
|
return lsl.decorateUpload(lu), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) {
|
||||||
|
lu, err := lsl.LayerService.Resume(uuid)
|
||||||
|
return lsl.decorateUpload(lu), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload {
|
||||||
|
return &layerUploadListener{
|
||||||
|
LayerUpload: lu,
|
||||||
|
parent: lsl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type layerUploadListener struct {
|
||||||
|
storage.LayerUpload
|
||||||
|
parent *layerServiceListener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) {
|
||||||
|
layer, err := lul.LayerUpload.Finish(dgst)
|
||||||
|
if err == nil {
|
||||||
|
if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil {
|
||||||
|
logrus.Errorf("error dispatching layer push to listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return layer, err
|
||||||
|
}
|
@ -1,79 +1,92 @@
|
|||||||
package decorator
|
package notifications
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/libtrust"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/manifest"
|
"github.com/docker/distribution/manifest"
|
||||||
"github.com/docker/distribution/storage"
|
"github.com/docker/distribution/storage"
|
||||||
"github.com/docker/distribution/storagedriver/inmemory"
|
"github.com/docker/distribution/storagedriver/inmemory"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
|
"github.com/docker/libtrust"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegistryDecorator(t *testing.T) {
|
func TestListener(t *testing.T) {
|
||||||
// Initialize the expected decorations. Call counting is a horrible way to
|
|
||||||
// test this but should keep this code from being atrocious.
|
|
||||||
expected := map[string]int{
|
|
||||||
"repository": 1,
|
|
||||||
"manifestservice": 1,
|
|
||||||
"layerservice": 1,
|
|
||||||
"layer": 4,
|
|
||||||
"layerupload": 4,
|
|
||||||
}
|
|
||||||
decorated := map[string]int{}
|
|
||||||
|
|
||||||
decorator := Func(func(v interface{}) interface{} {
|
|
||||||
switch v := v.(type) {
|
|
||||||
case storage.Repository:
|
|
||||||
t.Logf("decorate repository: %T", v)
|
|
||||||
decorated["repository"]++
|
|
||||||
case storage.ManifestService:
|
|
||||||
t.Logf("decorate manifestservice: %T", v)
|
|
||||||
decorated["manifestservice"]++
|
|
||||||
case storage.LayerService:
|
|
||||||
t.Logf("decorate layerservice: %T", v)
|
|
||||||
decorated["layerservice"]++
|
|
||||||
case storage.Layer:
|
|
||||||
t.Logf("decorate layer: %T", v)
|
|
||||||
decorated["layer"]++
|
|
||||||
case storage.LayerUpload:
|
|
||||||
t.Logf("decorate layerupload: %T", v)
|
|
||||||
decorated["layerupload"]++
|
|
||||||
default:
|
|
||||||
t.Fatalf("unexpected object decorated: %v", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
|
||||||
})
|
|
||||||
|
|
||||||
registry := storage.NewRegistryWithDriver(inmemory.New())
|
registry := storage.NewRegistryWithDriver(inmemory.New())
|
||||||
registry = DecorateRegistry(registry, decorator)
|
tl := &testListener{
|
||||||
|
ops: make(map[string]int),
|
||||||
|
}
|
||||||
|
repository := Listen(registry.Repository("foo/bar"), tl)
|
||||||
|
|
||||||
// Now take the registry through a number of operations
|
// Now take the registry through a number of operations
|
||||||
checkExerciseRegistry(t, registry)
|
checkExerciseRepository(t, repository)
|
||||||
|
|
||||||
for component, calls := range expected {
|
expectedOps := map[string]int{
|
||||||
if decorated[component] != calls {
|
"manifest:push": 1,
|
||||||
t.Fatalf("%v was not decorated expected number of times: %d != %d", component, decorated[component], calls)
|
"manifest:pull": 1,
|
||||||
}
|
// "manifest:delete": 0, // deletes not supported for now
|
||||||
|
"layer:push": 2,
|
||||||
|
"layer:pull": 2,
|
||||||
|
// "layer:delete": 0, // deletes not supported for now
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(tl.ops, expectedOps) {
|
||||||
|
t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testListener struct {
|
||||||
|
ops map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
tl.ops["manifest:push"]++
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
tl.ops["manifest:pull"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error {
|
||||||
|
tl.ops["manifest:delete"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
tl.ops["layer:push"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
tl.ops["layer:pull"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error {
|
||||||
|
tl.ops["layer:delete"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// checkExerciseRegistry takes the registry through all of its operations,
|
// checkExerciseRegistry takes the registry through all of its operations,
|
||||||
// carrying out generic checks.
|
// carrying out generic checks.
|
||||||
func checkExerciseRegistry(t *testing.T, registry storage.Registry) {
|
func checkExerciseRepository(t *testing.T, repository storage.Repository) {
|
||||||
name := "foo/bar"
|
// TODO(stevvooe): This would be a nice testutil function. Basically, it
|
||||||
|
// takes the registry through a common set of operations. This could be
|
||||||
|
// used to make cross-cutting updates by changing internals that affect
|
||||||
|
// update counts. Basically, it would make writing tests a lot easier.
|
||||||
|
|
||||||
tag := "thetag"
|
tag := "thetag"
|
||||||
repository := registry.Repository(name)
|
|
||||||
m := manifest.Manifest{
|
m := manifest.Manifest{
|
||||||
Versioned: manifest.Versioned{
|
Versioned: manifest.Versioned{
|
||||||
SchemaVersion: 1,
|
SchemaVersion: 1,
|
||||||
},
|
},
|
||||||
Name: name,
|
Name: repository.Name(),
|
||||||
Tag: tag,
|
Tag: tag,
|
||||||
}
|
}
|
||||||
|
|
152
storage/notifications/metrics.go
Normal file
152
storage/notifications/metrics.go
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"expvar"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EndpointMetrics track various actions taken by the endpoint, typically by
|
||||||
|
// number of events. The goal of this to export it via expvar but we may find
|
||||||
|
// some other future solution to be better.
|
||||||
|
type EndpointMetrics struct {
|
||||||
|
Pending int // events pending in queue
|
||||||
|
Events int // total events incoming
|
||||||
|
Successes int // total events written successfully
|
||||||
|
Failures int // total events failed
|
||||||
|
Errors int // total events errored
|
||||||
|
Statuses map[string]int // status code histogram, per call event
|
||||||
|
}
|
||||||
|
|
||||||
|
// safeMetrics guards the metrics implementation with a lock and provides a
|
||||||
|
// safe update function.
|
||||||
|
type safeMetrics struct {
|
||||||
|
EndpointMetrics
|
||||||
|
sync.Mutex // protects statuses map
|
||||||
|
}
|
||||||
|
|
||||||
|
// newSafeMetrics returns safeMetrics with map allocated.
|
||||||
|
func newSafeMetrics() *safeMetrics {
|
||||||
|
var sm safeMetrics
|
||||||
|
sm.Statuses = make(map[string]int)
|
||||||
|
return &sm
|
||||||
|
}
|
||||||
|
|
||||||
|
// httpStatusListener returns the listener for the http sink that updates the
|
||||||
|
// relevent counters.
|
||||||
|
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
|
||||||
|
return &endpointMetricsHTTPStatusListener{
|
||||||
|
safeMetrics: sm,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// eventQueueListener returns a listener that maintains queue related counters.
|
||||||
|
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
|
||||||
|
return &endpointMetricsEventQueueListener{
|
||||||
|
safeMetrics: sm,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// endpointMetricsHTTPStatusListener increments counters related to http sinks
|
||||||
|
// for the relevent events.
|
||||||
|
type endpointMetricsHTTPStatusListener struct {
|
||||||
|
*safeMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
|
||||||
|
|
||||||
|
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
|
||||||
|
emsl.safeMetrics.Lock()
|
||||||
|
defer emsl.safeMetrics.Unlock()
|
||||||
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
||||||
|
emsl.Successes += len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
|
||||||
|
emsl.safeMetrics.Lock()
|
||||||
|
defer emsl.safeMetrics.Unlock()
|
||||||
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
||||||
|
emsl.Failures += len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
|
||||||
|
emsl.safeMetrics.Lock()
|
||||||
|
defer emsl.safeMetrics.Unlock()
|
||||||
|
emsl.Errors += len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
// endpointMetricsEventQueueListener maintains the incoming events counter and
|
||||||
|
// the queues pending count.
|
||||||
|
type endpointMetricsEventQueueListener struct {
|
||||||
|
*safeMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
|
||||||
|
eqc.Lock()
|
||||||
|
defer eqc.Unlock()
|
||||||
|
eqc.Events += len(events)
|
||||||
|
eqc.Pending += len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
|
||||||
|
eqc.Lock()
|
||||||
|
defer eqc.Unlock()
|
||||||
|
eqc.Pending -= len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
// endpoints is global registry of endpoints used to report metrics to expvar
|
||||||
|
var endpoints struct {
|
||||||
|
registered []*Endpoint
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// register places the endpoint into expvar so that stats are tracked.
|
||||||
|
func register(e *Endpoint) {
|
||||||
|
endpoints.mu.Lock()
|
||||||
|
defer endpoints.mu.Unlock()
|
||||||
|
|
||||||
|
endpoints.registered = append(endpoints.registered, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
|
||||||
|
// Ideally, we do more metrics through logging but we need some nice
|
||||||
|
// realtime metrics for queue state for now.
|
||||||
|
|
||||||
|
registry := expvar.Get("registry")
|
||||||
|
|
||||||
|
if registry == nil {
|
||||||
|
registry = expvar.NewMap("registry")
|
||||||
|
}
|
||||||
|
|
||||||
|
var notifications expvar.Map
|
||||||
|
notifications.Init()
|
||||||
|
notifications.Set("endpoints", expvar.Func(func() interface{} {
|
||||||
|
endpoints.mu.Lock()
|
||||||
|
defer endpoints.mu.Unlock()
|
||||||
|
|
||||||
|
var names []interface{}
|
||||||
|
for _, v := range endpoints.registered {
|
||||||
|
var epjson struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
URL string `json:"url"`
|
||||||
|
EndpointConfig
|
||||||
|
|
||||||
|
Metrics EndpointMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
epjson.Name = v.Name()
|
||||||
|
epjson.URL = v.URL()
|
||||||
|
epjson.EndpointConfig = v.EndpointConfig
|
||||||
|
|
||||||
|
v.ReadMetrics(&epjson.Metrics)
|
||||||
|
|
||||||
|
names = append(names, epjson)
|
||||||
|
}
|
||||||
|
|
||||||
|
return names
|
||||||
|
}))
|
||||||
|
|
||||||
|
registry.(*expvar.Map).Set("notifications", ¬ifications)
|
||||||
|
}
|
337
storage/notifications/sinks.go
Normal file
337
storage/notifications/sinks.go
Normal file
@ -0,0 +1,337 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NOTE(stevvooe): This file contains definitions for several utility sinks.
|
||||||
|
// Typically, the broadcaster is the only sink that should be required
|
||||||
|
// externally, but others are suitable for export if the need arises. Albeit,
|
||||||
|
// the tight integration with endpoint metrics should be removed.
|
||||||
|
|
||||||
|
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
|
||||||
|
// component is to dispatch events to configured endpoints. Reliability can be
|
||||||
|
// provided by wrapping incoming sinks.
|
||||||
|
type Broadcaster struct {
|
||||||
|
sinks []Sink
|
||||||
|
events chan []Event
|
||||||
|
closed chan chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBroadcaster ...
|
||||||
|
// Add appends one or more sinks to the list of sinks. The broadcaster
|
||||||
|
// behavior will be affected by the properties of the sink. Generally, the
|
||||||
|
// sink should accept all messages and deal with reliability on its own. Use
|
||||||
|
// of EventQueue and RetryingSink should be used here.
|
||||||
|
func NewBroadcaster(sinks ...Sink) *Broadcaster {
|
||||||
|
b := Broadcaster{
|
||||||
|
sinks: sinks,
|
||||||
|
events: make(chan []Event),
|
||||||
|
closed: make(chan chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the broadcaster
|
||||||
|
go b.run()
|
||||||
|
|
||||||
|
return &b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write accepts a block of events to be dispatched to all sinks. This method
|
||||||
|
// will never fail and should never block (hopefully!). The caller cedes the
|
||||||
|
// slice memory to the broadcaster and should not modify it after calling
|
||||||
|
// write.
|
||||||
|
func (b *Broadcaster) Write(events ...Event) error {
|
||||||
|
select {
|
||||||
|
case b.events <- events:
|
||||||
|
case <-b.closed:
|
||||||
|
return ErrSinkClosed
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the broadcaster, ensuring that all messages are flushed to the
|
||||||
|
// underlying sink before returning.
|
||||||
|
func (b *Broadcaster) Close() error {
|
||||||
|
logrus.Infof("broadcaster: closing")
|
||||||
|
select {
|
||||||
|
case <-b.closed:
|
||||||
|
// already closed
|
||||||
|
return fmt.Errorf("broadcaster: already closed")
|
||||||
|
default:
|
||||||
|
// do a little chan handoff dance to synchronize closing
|
||||||
|
closed := make(chan struct{})
|
||||||
|
b.closed <- closed
|
||||||
|
close(b.closed)
|
||||||
|
<-closed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is the main broadcast loop, started when the broadcaster is created.
|
||||||
|
// Under normal conditions, it waits for events on the event channel. After
|
||||||
|
// Close is called, this goroutine will exit.
|
||||||
|
func (b *Broadcaster) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case block := <-b.events:
|
||||||
|
for _, sink := range b.sinks {
|
||||||
|
if err := sink.Write(block...); err != nil {
|
||||||
|
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case closing := <-b.closed:
|
||||||
|
|
||||||
|
// close all the underlying sinks
|
||||||
|
for _, sink := range b.sinks {
|
||||||
|
if err := sink.Close(); err != nil {
|
||||||
|
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closing <- struct{}{}
|
||||||
|
|
||||||
|
logrus.Debugf("broadcaster: closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// eventQueue accepts all messages into a queue for asynchronous consumption
|
||||||
|
// by a sink. It is unbounded and thread safe but the sink must be reliable or
|
||||||
|
// events will be dropped.
|
||||||
|
type eventQueue struct {
|
||||||
|
sink Sink
|
||||||
|
events *list.List
|
||||||
|
listeners []eventQueueListener
|
||||||
|
cond *sync.Cond
|
||||||
|
mu sync.Mutex
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// eventQueueListener is called when various events happen on the queue.
|
||||||
|
type eventQueueListener interface {
|
||||||
|
ingress(events ...Event)
|
||||||
|
egress(events ...Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEventQueue returns a queue to the provided sink. If the updater is non-
|
||||||
|
// nil, it will be called to update pending metrics on ingress and egress.
|
||||||
|
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
|
||||||
|
eq := eventQueue{
|
||||||
|
sink: sink,
|
||||||
|
events: list.New(),
|
||||||
|
listeners: listeners,
|
||||||
|
}
|
||||||
|
|
||||||
|
eq.cond = sync.NewCond(&eq.mu)
|
||||||
|
go eq.run()
|
||||||
|
return &eq
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write accepts the events into the queue, only failing if the queue has
|
||||||
|
// beend closed.
|
||||||
|
func (eq *eventQueue) Write(events ...Event) error {
|
||||||
|
eq.mu.Lock()
|
||||||
|
defer eq.mu.Unlock()
|
||||||
|
|
||||||
|
if eq.closed {
|
||||||
|
return ErrSinkClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, listener := range eq.listeners {
|
||||||
|
listener.ingress(events...)
|
||||||
|
}
|
||||||
|
eq.events.PushBack(events)
|
||||||
|
eq.cond.Signal() // signal waiters
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shutsdown the event queue, flushing
|
||||||
|
func (eq *eventQueue) Close() error {
|
||||||
|
eq.mu.Lock()
|
||||||
|
defer eq.mu.Unlock()
|
||||||
|
|
||||||
|
if eq.closed {
|
||||||
|
return fmt.Errorf("eventqueue: already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// set closed flag
|
||||||
|
eq.closed = true
|
||||||
|
eq.cond.Signal() // signal flushes queue
|
||||||
|
eq.cond.Wait() // wait for signal from last flush
|
||||||
|
|
||||||
|
return eq.sink.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// run is the main goroutine to flush events to the target sink.
|
||||||
|
func (eq *eventQueue) run() {
|
||||||
|
for {
|
||||||
|
block := eq.next()
|
||||||
|
|
||||||
|
if block == nil {
|
||||||
|
return // nil block means event queue is closed.
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := eq.sink.Write(block...); err != nil {
|
||||||
|
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, listener := range eq.listeners {
|
||||||
|
listener.egress(block...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// next encompasses the critical section of the run loop. When the queue is
|
||||||
|
// empty, it will block on the condition. If new data arrives, it will wake
|
||||||
|
// and return a block. When closed, a nil slice will be returned.
|
||||||
|
func (eq *eventQueue) next() []Event {
|
||||||
|
eq.mu.Lock()
|
||||||
|
defer eq.mu.Unlock()
|
||||||
|
|
||||||
|
for eq.events.Len() < 1 {
|
||||||
|
if eq.closed {
|
||||||
|
eq.cond.Broadcast()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eq.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
front := eq.events.Front()
|
||||||
|
block := front.Value.([]Event)
|
||||||
|
eq.events.Remove(front)
|
||||||
|
|
||||||
|
return block
|
||||||
|
}
|
||||||
|
|
||||||
|
// retryingSink retries the write until success or an ErrSinkClosed is
|
||||||
|
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
||||||
|
// block. Internally, it is a circuit breaker retries to manage reset.
|
||||||
|
// Concurrent calls to a retrying sink are serialized through the sink,
|
||||||
|
// meaning that if one is in-flight, another will not proceed.
|
||||||
|
type retryingSink struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
sink Sink
|
||||||
|
closed bool
|
||||||
|
|
||||||
|
// circuit breaker hueristics
|
||||||
|
failures struct {
|
||||||
|
threshold int
|
||||||
|
recent int
|
||||||
|
last time.Time
|
||||||
|
backoff time.Duration // time after which we retry after failure.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type retryingSinkListener interface {
|
||||||
|
active(events ...Event)
|
||||||
|
retry(events ...Event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): We are using circuit break here, which actually doesn't
|
||||||
|
// make a whole lot of sense for this use case, since we always retry. Move
|
||||||
|
// this to use bounded exponential backoff.
|
||||||
|
|
||||||
|
// newRetryingSink returns a sink that will retry writes to a sink, backing
|
||||||
|
// off on failure. Parameters threshold and backoff adjust the behavior of the
|
||||||
|
// circuit breaker.
|
||||||
|
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
|
||||||
|
rs := &retryingSink{
|
||||||
|
sink: sink,
|
||||||
|
}
|
||||||
|
rs.failures.threshold = threshold
|
||||||
|
rs.failures.backoff = backoff
|
||||||
|
|
||||||
|
return rs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write attempts to flush the events to the downstream sink until it succeeds
|
||||||
|
// or the sink is closed.
|
||||||
|
func (rs *retryingSink) Write(events ...Event) error {
|
||||||
|
rs.mu.Lock()
|
||||||
|
defer rs.mu.Unlock()
|
||||||
|
|
||||||
|
retry:
|
||||||
|
|
||||||
|
if rs.closed {
|
||||||
|
return ErrSinkClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rs.proceed() {
|
||||||
|
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
|
||||||
|
rs.wait(rs.failures.backoff)
|
||||||
|
goto retry
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rs.write(events...); err != nil {
|
||||||
|
if err == ErrSinkClosed {
|
||||||
|
// terminal!
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
|
||||||
|
goto retry
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the sink and the underlying sink.
|
||||||
|
func (rs *retryingSink) Close() error {
|
||||||
|
rs.mu.Lock()
|
||||||
|
defer rs.mu.Unlock()
|
||||||
|
|
||||||
|
if rs.closed {
|
||||||
|
return fmt.Errorf("retryingsink: already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
rs.closed = true
|
||||||
|
return rs.sink.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// write provides a helper that dispatches failure and success properly. Used
|
||||||
|
// by write as the single-flight write call.
|
||||||
|
func (rs *retryingSink) write(events ...Event) error {
|
||||||
|
if err := rs.sink.Write(events...); err != nil {
|
||||||
|
rs.failure()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rs.reset()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait backoff time against the sink, unlocking so others can proceed. Should
|
||||||
|
// only be called by methods that currently have the mutex.
|
||||||
|
func (rs *retryingSink) wait(backoff time.Duration) {
|
||||||
|
rs.mu.Unlock()
|
||||||
|
defer rs.mu.Lock()
|
||||||
|
|
||||||
|
// backoff here
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset marks a succesful call.
|
||||||
|
func (rs *retryingSink) reset() {
|
||||||
|
rs.failures.recent = 0
|
||||||
|
rs.failures.last = time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// failure records a failure.
|
||||||
|
func (rs *retryingSink) failure() {
|
||||||
|
rs.failures.recent++
|
||||||
|
rs.failures.last = time.Now().UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
// proceed returns true if the call should proceed based on circuit breaker
|
||||||
|
// hueristics.
|
||||||
|
func (rs *retryingSink) proceed() bool {
|
||||||
|
return rs.failures.recent < rs.failures.threshold ||
|
||||||
|
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
|
||||||
|
}
|
223
storage/notifications/sinks_test.go
Normal file
223
storage/notifications/sinks_test.go
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
package notifications
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBroadcaster(t *testing.T) {
|
||||||
|
const nEvents = 1000
|
||||||
|
var sinks []Sink
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
sinks = append(sinks, &testSink{})
|
||||||
|
}
|
||||||
|
|
||||||
|
b := NewBroadcaster(sinks...)
|
||||||
|
|
||||||
|
var block []Event
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 1; i <= nEvents; i++ {
|
||||||
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
||||||
|
|
||||||
|
if i%10 == 0 && i > 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(block ...Event) {
|
||||||
|
if err := b.Write(block...); err != nil {
|
||||||
|
t.Fatalf("error writing block of length %d: %v", len(block), err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(block...)
|
||||||
|
|
||||||
|
block = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait() // Wait until writes complete
|
||||||
|
checkClose(t, b)
|
||||||
|
|
||||||
|
// Iterate through the sinks and check that they all have the expected length.
|
||||||
|
for _, sink := range sinks {
|
||||||
|
ts := sink.(*testSink)
|
||||||
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
|
||||||
|
if len(ts.events) != nEvents {
|
||||||
|
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ts.closed {
|
||||||
|
t.Fatalf("sink should have been closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventQueue(t *testing.T) {
|
||||||
|
const nevents = 1000
|
||||||
|
var ts testSink
|
||||||
|
metrics := newSafeMetrics()
|
||||||
|
eq := newEventQueue(
|
||||||
|
// delayed sync simulates destination slower than channel comms
|
||||||
|
&delayedSink{
|
||||||
|
Sink: &ts,
|
||||||
|
delay: time.Millisecond * 1,
|
||||||
|
}, metrics.eventQueueListener())
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var block []Event
|
||||||
|
for i := 1; i <= nevents; i++ {
|
||||||
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
||||||
|
if i%10 == 0 && i > 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(block ...Event) {
|
||||||
|
if err := eq.Write(block...); err != nil {
|
||||||
|
t.Fatalf("error writing event block: %v", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(block...)
|
||||||
|
|
||||||
|
block = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
checkClose(t, eq)
|
||||||
|
|
||||||
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
metrics.Lock()
|
||||||
|
defer metrics.Unlock()
|
||||||
|
|
||||||
|
if len(ts.events) != nevents {
|
||||||
|
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ts.closed {
|
||||||
|
t.Fatalf("sink should have been closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if metrics.Events != nevents {
|
||||||
|
t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
|
||||||
|
}
|
||||||
|
|
||||||
|
if metrics.Pending != 0 {
|
||||||
|
t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryingSink(t *testing.T) {
|
||||||
|
|
||||||
|
// Make a sync that fails most of the time, ensuring that all the events
|
||||||
|
// make it through.
|
||||||
|
var ts testSink
|
||||||
|
flaky := &flakySink{
|
||||||
|
rate: 1.0, // start out always failing.
|
||||||
|
Sink: &ts,
|
||||||
|
}
|
||||||
|
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var block []Event
|
||||||
|
for i := 1; i <= 100; i++ {
|
||||||
|
block = append(block, createTestEvent("push", "library/test", "blob"))
|
||||||
|
|
||||||
|
// Above 50, set the failure rate lower
|
||||||
|
if i > 50 {
|
||||||
|
s.mu.Lock()
|
||||||
|
flaky.rate = 0.90
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%10 == 0 && i > 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(block ...Event) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := s.Write(block...); err != nil {
|
||||||
|
t.Fatalf("error writing event block: %v", err)
|
||||||
|
}
|
||||||
|
}(block...)
|
||||||
|
|
||||||
|
block = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
checkClose(t, s)
|
||||||
|
|
||||||
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
|
||||||
|
if len(ts.events) != 100 {
|
||||||
|
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testSink struct {
|
||||||
|
events []Event
|
||||||
|
mu sync.Mutex
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *testSink) Write(events ...Event) error {
|
||||||
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
ts.events = append(ts.events, events...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *testSink) Close() error {
|
||||||
|
ts.mu.Lock()
|
||||||
|
defer ts.mu.Unlock()
|
||||||
|
ts.closed = true
|
||||||
|
|
||||||
|
logrus.Infof("closing testSink")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type delayedSink struct {
|
||||||
|
Sink
|
||||||
|
delay time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ds *delayedSink) Write(events ...Event) error {
|
||||||
|
time.Sleep(ds.delay)
|
||||||
|
return ds.Sink.Write(events...)
|
||||||
|
}
|
||||||
|
|
||||||
|
type flakySink struct {
|
||||||
|
Sink
|
||||||
|
rate float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *flakySink) Write(events ...Event) error {
|
||||||
|
if rand.Float64() < fs.rate {
|
||||||
|
return fmt.Errorf("error writing %d events", len(events))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fs.Sink.Write(events...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkClose(t *testing.T, sink Sink) {
|
||||||
|
if err := sink.Close(); err != nil {
|
||||||
|
t.Fatalf("unexpected error closing: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// second close should not crash but should return an error.
|
||||||
|
if err := sink.Close(); err == nil {
|
||||||
|
t.Fatalf("no error on double close")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write after closed should be an error
|
||||||
|
if err := sink.Write([]Event{}...); err == nil {
|
||||||
|
t.Fatalf("write after closed did not have an error")
|
||||||
|
} else if err != ErrSinkClosed {
|
||||||
|
t.Fatalf("error should be ErrSinkClosed")
|
||||||
|
}
|
||||||
|
}
|
@ -79,13 +79,8 @@ func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest,
|
|||||||
// put stores the manifest in the repository, if not already present. Any
|
// put stores the manifest in the repository, if not already present. Any
|
||||||
// updated signatures will be stored, as well.
|
// updated signatures will be stored, as well.
|
||||||
func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) {
|
func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) {
|
||||||
jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures")
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve the payload in the manifest.
|
// Resolve the payload in the manifest.
|
||||||
payload, err := jsig.Payload()
|
payload, err := sm.Payload()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -103,7 +98,7 @@ func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Grab each json signature and store them.
|
// Grab each json signature and store them.
|
||||||
signatures, err := jsig.Signatures()
|
signatures, err := sm.Signatures()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user