distribution/registry/handlers/app.go
Stephen J Day 7420a3de10 Instantiate http context before dispatch
Ensure that the status is logged in the context by instantiating before the
request is routed to handlers. While this requires some level of hacking to
acheive, the result is that the context value of "http.request.status" is as
accurate as possible for each request.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-04-15 19:53:36 -07:00

552 lines
18 KiB
Go

package handlers
import (
"expvar"
"fmt"
"net"
"net/http"
"os"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/notifications"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth"
registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/garyburd/redigo/redis"
"github.com/gorilla/mux"
"golang.org/x/net/context"
)
// App is a global registry application object. Shared resources can be placed
// on this object that will be accessible from all requests. Any writable
// fields should be protected.
type App struct {
context.Context
Config configuration.Configuration
router *mux.Router // main application router, configured with dispatchers
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
registry distribution.Namespace // registry is the primary registry backend for the app instance.
accessController auth.AccessController // main access controller for application
// events contains notification related configuration.
events struct {
sink notifications.Sink
source notifications.SourceRecord
}
redis *redis.Pool
}
// NewApp takes a configuration and returns a configured app, ready to serve
// requests. The app only implements ServeHTTP and can be wrapped in other
// handlers accordingly.
func NewApp(ctx context.Context, configuration configuration.Configuration) *App {
app := &App{
Config: configuration,
Context: ctx,
router: v2.RouterWithPrefix(configuration.HTTP.Prefix),
}
app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id"))
// Register the handler dispatchers.
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
return http.HandlerFunc(apiBase)
})
app.register(v2.RouteNameManifest, imageManifestDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, layerDispatcher)
app.register(v2.RouteNameBlobUpload, layerUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, layerUploadDispatcher)
var err error
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
// a health check.
panic(err)
}
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
if err != nil {
panic(err)
}
app.configureEvents(&configuration)
app.configureRedis(&configuration)
// configure storage caches
if cc, ok := configuration.Storage["cache"]; ok {
switch cc["layerinfo"] {
case "redis":
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
ctxu.GetLogger(app).Infof("using redis layerinfo cache")
case "inmemory":
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
ctxu.GetLogger(app).Infof("using inmemory layerinfo cache")
default:
if cc["layerinfo"] != "" {
ctxu.GetLogger(app).Warnf("unkown cache type %q, caching disabled", configuration.Storage["cache"])
}
}
}
if app.registry == nil {
// configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.driver, nil)
}
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
if err != nil {
panic(err)
}
authType := configuration.Auth.Type()
if authType != "" {
accessController, err := auth.GetAccessController(configuration.Auth.Type(), configuration.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}
app.accessController = accessController
}
return app
}
// register a handler with the application, by route name. The handler will be
// passed through the application filters and context will be constructed at
// request time.
func (app *App) register(routeName string, dispatch dispatchFunc) {
// TODO(stevvooe): This odd dispatcher/route registration is by-product of
// some limitations in the gorilla/mux router. We are using it to keep
// routing consistent between the client and server, but we may want to
// replace it with manual routing and structure-based dispatch for better
// control over the request execution.
app.router.GetRoute(routeName).Handler(app.dispatcher(dispatch))
}
// configureEvents prepares the event sink for action.
func (app *App) configureEvents(configuration *configuration.Configuration) {
// Configure all of the endpoint sinks.
var sinks []notifications.Sink
for _, endpoint := range configuration.Notifications.Endpoints {
if endpoint.Disabled {
ctxu.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
continue
}
ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
Timeout: endpoint.Timeout,
Threshold: endpoint.Threshold,
Backoff: endpoint.Backoff,
Headers: endpoint.Headers,
})
sinks = append(sinks, endpoint)
}
// NOTE(stevvooe): Moving to a new queueing implementation is as easy as
// replacing broadcaster with a rabbitmq implementation. It's recommended
// that the registry instances also act as the workers to keep deployment
// simple.
app.events.sink = notifications.NewBroadcaster(sinks...)
// Populate registry event source
hostname, err := os.Hostname()
if err != nil {
hostname = configuration.HTTP.Addr
} else {
// try to pick the port off the config
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
if err == nil {
hostname = net.JoinHostPort(hostname, port)
}
}
app.events.source = notifications.SourceRecord{
Addr: hostname,
InstanceID: ctxu.GetStringValue(app, "instance.id"),
}
}
func (app *App) configureRedis(configuration *configuration.Configuration) {
if configuration.Redis.Addr == "" {
ctxu.GetLogger(app).Infof("redis not configured")
return
}
pool := &redis.Pool{
Dial: func() (redis.Conn, error) {
// TODO(stevvooe): Yet another use case for contextual timing.
ctx := context.WithValue(app, "redis.connect.startedat", time.Now())
done := func(err error) {
logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration",
ctxu.Since(ctx, "redis.connect.startedat"))
if err != nil {
logger.Errorf("redis: error connecting: %v", err)
} else {
logger.Infof("redis: connect %v", configuration.Redis.Addr)
}
}
conn, err := redis.DialTimeout("tcp",
configuration.Redis.Addr,
configuration.Redis.DialTimeout,
configuration.Redis.ReadTimeout,
configuration.Redis.WriteTimeout)
if err != nil {
ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
configuration.Redis.Addr, err)
done(err)
return nil, err
}
// authorize the connection
if configuration.Redis.Password != "" {
if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
// select the database to use
if configuration.Redis.DB != 0 {
if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
done(nil)
return conn, nil
},
MaxIdle: configuration.Redis.Pool.MaxIdle,
MaxActive: configuration.Redis.Pool.MaxActive,
IdleTimeout: configuration.Redis.Pool.IdleTimeout,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
// TODO(stevvooe): We can probably do something more interesting
// here with the health package.
_, err := c.Do("PING")
return err
},
Wait: false, // if a connection is not avialable, proceed without cache.
}
app.redis = pool
// setup expvar
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
return map[string]interface{}{
"Config": configuration.Redis,
"Active": app.redis.ActiveCount(),
}
}))
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() // ensure that request body is always closed.
// Instantiate an http context here so we can track the error codes
// returned by the request router.
ctx := defaultContextManager.context(app, w, r)
defer func() {
ctxu.GetResponseLogger(ctx).Infof("response completed")
}()
defer defaultContextManager.release(ctx)
// NOTE(stevvooe): Total hack to get instrumented responsewriter from context.
var err error
w, err = ctxu.GetResponseWriter(ctx)
if err != nil {
ctxu.GetLogger(ctx).Warnf("response writer not found in context")
}
// Set a header with the Docker Distribution API Version for all responses.
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
app.router.ServeHTTP(w, r)
}
// dispatchFunc takes a context and request and returns a constructed handler
// for the route. The dispatcher will use this to dynamically create request
// specific handlers for each endpoint without creating a new router for each
// request.
type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
// TODO(stevvooe): dispatchers should probably have some validation error
// chain with proper error reporting.
// dispatcher returns a handler that constructs a request specific context and
// handler, using the dispatch factory function.
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
context := app.context(w, r)
if err := app.authorized(w, r, context); err != nil {
ctxu.GetLogger(context).Errorf("error authorizing context: %v", err)
return
}
// Add username to request logging
context.Context = ctxu.WithLogger(context.Context, ctxu.GetLogger(context.Context, "auth.user.name"))
if app.nameRequired(r) {
repository, err := app.registry.Repository(context, getName(context))
if err != nil {
ctxu.GetLogger(context).Errorf("error resolving repository: %v", err)
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
context.Errors.Push(v2.ErrorCodeNameUnknown, err)
case distribution.ErrRepositoryNameInvalid:
context.Errors.Push(v2.ErrorCodeNameInvalid, err)
}
w.WriteHeader(http.StatusBadRequest)
serveJSON(w, context.Errors)
return
}
// assign and decorate the authorized repository with an event bridge.
context.Repository = notifications.Listen(
repository,
app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"])
if err != nil {
ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors.Push(v2.ErrorCodeUnknown, err)
w.WriteHeader(http.StatusInternalServerError)
serveJSON(w, context.Errors)
return
}
}
dispatch(context, r).ServeHTTP(w, r)
// Automated error response handling here. Handlers may return their
// own errors if they need different behavior (such as range errors
// for layer upload).
if context.Errors.Len() > 0 {
if context.Value("http.response.status") == 0 {
// TODO(stevvooe): Getting this value from the context is a
// bit of a hack. We can further address with some of our
// future refactoring.
w.WriteHeader(http.StatusBadRequest)
}
serveJSON(w, context.Errors)
}
})
}
// context constructs the context object for the application. This only be
// called once per request.
func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
ctx := defaultContextManager.context(app, w, r)
ctx = ctxu.WithVars(ctx, r)
ctx = ctxu.WithLogger(ctx, ctxu.GetLogger(ctx,
"vars.name",
"vars.reference",
"vars.digest",
"vars.uuid"))
context := &Context{
App: app,
Context: ctx,
urlBuilder: v2.NewURLBuilderFromRequest(r),
}
return context
}
// authorized checks if the request can proceed with access to the requested
// repository. If it succeeds, the context may access the requested
// repository. An error will be returned if access is not available.
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
ctxu.GetLogger(context).Debug("authorizing request")
repo := getName(context)
if app.accessController == nil {
return nil // access controller is not enabled.
}
var accessRecords []auth.Access
if repo != "" {
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
} else {
// Only allow the name not to be set on the base route.
if app.nameRequired(r) {
// For this to be properly secured, repo must always be set for a
// resource that may make a modification. The only condition under
// which name is not set and we still allow access is when the
// base route is accessed. This section prevents us from making
// that mistake elsewhere in the code, allowing any operation to
// proceed.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusForbidden)
var errs v2.Errors
errs.Push(v2.ErrorCodeUnauthorized)
serveJSON(w, errs)
return fmt.Errorf("forbidden: no repository name")
}
}
ctx, err := app.accessController.Authorized(context.Context, accessRecords...)
if err != nil {
switch err := err.(type) {
case auth.Challenge:
w.Header().Set("Content-Type", "application/json; charset=utf-8")
err.ServeHTTP(w, r)
var errs v2.Errors
errs.Push(v2.ErrorCodeUnauthorized, accessRecords)
serveJSON(w, errs)
default:
// This condition is a potential security problem either in
// the configuration or whatever is backing the access
// controller. Just return a bad request with no information
// to avoid exposure. The request should not proceed.
ctxu.GetLogger(context).Errorf("error checking authorization: %v", err)
w.WriteHeader(http.StatusBadRequest)
}
return err
}
// TODO(stevvooe): This pattern needs to be cleaned up a bit. One context
// should be replaced by another, rather than replacing the context on a
// mutable object.
context.Context = ctx
return nil
}
// eventBridge returns a bridge for the current request, configured with the
// correct actor and source.
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
actor := notifications.ActorRecord{
Name: getUserName(ctx, r),
}
request := notifications.NewRequestRecord(ctxu.GetRequestID(ctx), r)
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink)
}
// nameRequired returns true if the route requires a name.
func (app *App) nameRequired(r *http.Request) bool {
route := mux.CurrentRoute(r)
return route == nil || route.GetName() != v2.RouteNameBase
}
// apiBase implements a simple yes-man for doing overall checks against the
// api. This can support auth roundtrips to support docker login.
func apiBase(w http.ResponseWriter, r *http.Request) {
const emptyJSON = "{}"
// Provide a simple /v2/ 200 OK response with empty json response.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
fmt.Fprint(w, emptyJSON)
}
// appendAccessRecords checks the method and adds the appropriate Access records to the records list.
func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access {
resource := auth.Resource{
Type: "repository",
Name: repo,
}
switch method {
case "GET", "HEAD":
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
})
case "POST", "PUT", "PATCH":
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
},
auth.Access{
Resource: resource,
Action: "push",
})
case "DELETE":
// DELETE access requires full admin rights, which is represented
// as "*". This may not be ideal.
records = append(records,
auth.Access{
Resource: resource,
Action: "*",
})
}
return records
}
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry)
if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
}
registry = rmw
}
return registry, nil
}
// applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository)
if err != nil {
return nil, err
}
repository = rmw
}
return repository, nil
}
// applyStorageMiddleware wraps a storage driver with the configured middlewares
func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
for _, mw := range middlewares {
smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver)
if err != nil {
return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
}
driver = smw
}
return driver, nil
}