diff --git a/docs/app.go b/docs/app.go index b5cb6776..e7c96b74 100644 --- a/docs/app.go +++ b/docs/app.go @@ -2,16 +2,19 @@ package registry import ( "fmt" + "net" "net/http" + "os" + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" "github.com/docker/distribution/api/v2" "github.com/docker/distribution/auth" "github.com/docker/distribution/configuration" "github.com/docker/distribution/storage" + "github.com/docker/distribution/storage/notifications" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" - - log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" ) @@ -21,17 +24,22 @@ import ( type App struct { Config configuration.Configuration - router *mux.Router + // InstanceID is a unique id assigned to the application on each creation. + // Provides information in the logs and context to identify restarts. + InstanceID string - // driver maintains the app global storage driver instance. - driver storagedriver.StorageDriver + router *mux.Router // main application router, configured with dispatchers + driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. + registry storage.Registry // registry is the primary registry backend for the app instance. + accessController auth.AccessController // main access controller for application - // registry is the primary registry backend for the app instance. - registry storage.Registry + // events contains notification related configuration. + events struct { + sink notifications.Sink + source notifications.SourceRecord + } - layerHandler storage.LayerHandler - - accessController auth.AccessController + layerHandler storage.LayerHandler // allows dispatch of layer serving to external provider } // NewApp takes a configuration and returns a configured app, ready to serve @@ -39,8 +47,9 @@ type App struct { // handlers accordingly. func NewApp(configuration configuration.Configuration) *App { app := &App{ - Config: configuration, - router: v2.Router(), + Config: configuration, + InstanceID: uuid.New(), + router: v2.Router(), } // Register the handler dispatchers. @@ -53,7 +62,8 @@ func NewApp(configuration configuration.Configuration) *App { app.register(v2.RouteNameBlobUpload, layerUploadDispatcher) app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher) - driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + var err error + app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) if err != nil { // TODO(stevvooe): Move the creation of a service into a protected @@ -62,7 +72,7 @@ func NewApp(configuration configuration.Configuration) *App { panic(err) } - app.driver = driver + app.configureEvents(&configuration) app.registry = storage.NewRegistryWithDriver(app.driver) authType := configuration.Auth.Type() @@ -77,7 +87,7 @@ func NewApp(configuration configuration.Configuration) *App { layerHandlerType := configuration.LayerHandler.Type() if layerHandlerType != "" { - lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), driver) + lh, err := storage.GetLayerHandler(layerHandlerType, configuration.LayerHandler.Parameters(), app.driver) if err != nil { panic(fmt.Sprintf("unable to configure layer handler (%s): %v", layerHandlerType, err)) } @@ -87,12 +97,6 @@ func NewApp(configuration configuration.Configuration) *App { return app } -func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Set a header with the Docker Distribution API Version for all responses. - w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") - app.router.ServeHTTP(w, r) -} - // register a handler with the application, by route name. The handler will be // passed through the application filters and context will be constructed at // request time. @@ -107,6 +111,59 @@ func (app *App) register(routeName string, dispatch dispatchFunc) { app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch)) } +// configureEvents prepares the event sink for action. +func (app *App) configureEvents(configuration *configuration.Configuration) { + // Configure all of the endpoint sinks. + var sinks []notifications.Sink + for _, endpoint := range configuration.Notifications.Endpoints { + if endpoint.Disabled { + log.Infof("endpoint %s disabled, skipping", endpoint.Name) + continue + } + + log.Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) + endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + }) + + sinks = append(sinks, endpoint) + } + + // NOTE(stevvooe): Moving to a new queueing implementation is as easy as + // replacing broadcaster with a rabbitmq implementation. It's recommended + // that the registry instances also act as the workers to keep deployment + // simple. + app.events.sink = notifications.NewBroadcaster(sinks...) + + // Populate registry event source + hostname, err := os.Hostname() + if err != nil { + hostname = configuration.HTTP.Addr + } else { + // try to pick the port off the config + _, port, err := net.SplitHostPort(configuration.HTTP.Addr) + if err == nil { + hostname = net.JoinHostPort(hostname, port) + } + } + + app.events.source = notifications.SourceRecord{ + Addr: hostname, + InstanceID: app.InstanceID, + } +} + +func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() // ensure that request body is always closed. + + // Set a header with the Docker Distribution API Version for all responses. + w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") + app.router.ServeHTTP(w, r) +} + // dispatchFunc takes a context and request and returns a constructed handler // for the route. The dispatcher will use this to dynamically create request // specific handlers for each endpoint without creating a new router for each @@ -142,11 +199,14 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { return } + // decorate the authorized repository with an event bridge. + context.Repository = notifications.Listen( + context.Repository, app.eventBridge(context, r)) + context.log = log.WithField("name", context.Repository.Name()) handler := dispatch(context, r) ssrw := &singleStatusResponseWriter{ResponseWriter: w} - context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their @@ -167,6 +227,7 @@ func (app *App) context(r *http.Request) *Context { vars := mux.Vars(r) context := &Context{ App: app, + RequestID: uuid.New(), urlBuilder: v2.NewURLBuilderFromRequest(r), } @@ -268,6 +329,22 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont return nil } +// eventBridge returns a bridge for the current request, configured with the +// correct actor and source. +func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { + // TODO(stevvooe): Need to extract user data from request context using + // auth system. Would prefer to do this during logging refactor and + // addition of user and google context type. + actor := notifications.ActorRecord{ + Name: "--todo--", + Addr: r.RemoteAddr, + Host: r.Host, + RequestID: ctx.RequestID, + } + + return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, app.events.sink) +} + // apiBase implements a simple yes-man for doing overall checks against the // api. This can support auth roundtrips to support docker login. func apiBase(w http.ResponseWriter, r *http.Request) { diff --git a/docs/context.go b/docs/context.go index 8e8d0fed..eaa603a8 100644 --- a/docs/context.go +++ b/docs/context.go @@ -13,6 +13,9 @@ type Context struct { // App points to the application structure that created this context. *App + // RequestID is the unique id of the request. + RequestID string + // Repository is the repository for the current request. All requests // should be scoped to a single repository. This field may be nil. Repository storage.Repository diff --git a/docs/util.go b/docs/util.go deleted file mode 100644 index 976ddf31..00000000 --- a/docs/util.go +++ /dev/null @@ -1,27 +0,0 @@ -package registry - -import ( - "net/http" - "reflect" - "runtime" - - "github.com/gorilla/handlers" -) - -// functionName returns the name of the function fn. -func functionName(fn interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() -} - -// resolveHandlerName attempts to resolve a nice, pretty name for the passed -// in handler. -func resolveHandlerName(method string, handler http.Handler) string { - switch v := handler.(type) { - case handlers.MethodHandler: - return functionName(v[method]) - case http.HandlerFunc: - return functionName(v) - default: - return functionName(handler.ServeHTTP) - } -}