Add redis pool to registry webapp

Redis has been integrated with the web application for use with various
services. The configuraiton exposes connection details, timeouts and pool
parameters. Documentation has been updated accordingly.

A few convenience methods have been added to the context package to get loggers
with certain fields, exposing some missing functionality from logrus.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-04-01 16:27:24 -07:00
parent 9ee35877e3
commit 38ae1cb461

View File

@ -1,10 +1,12 @@
package handlers
import (
"expvar"
"fmt"
"net"
"net/http"
"os"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
@ -19,6 +21,7 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/factory"
storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
"github.com/garyburd/redigo/redis"
"github.com/gorilla/mux"
"golang.org/x/net/context"
)
@ -44,6 +47,8 @@ type App struct {
sink notifications.Sink
source notifications.SourceRecord
}
redis *redis.Pool
}
// Value intercepts calls context.Context.Value, returning the current app id,
@ -95,6 +100,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
}
app.configureEvents(&configuration)
app.configureRedis(&configuration)
app.registry = storage.NewRegistryWithDriver(app.driver)
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
@ -174,6 +180,83 @@ func (app *App) configureEvents(configuration *configuration.Configuration) {
}
}
func (app *App) configureRedis(configuration *configuration.Configuration) {
if configuration.Redis.Addr == "" {
ctxu.GetLogger(app).Infof("redis not configured")
return
}
pool := &redis.Pool{
Dial: func() (redis.Conn, error) {
// TODO(stevvooe): Yet another use case for contextual timing.
ctx := context.WithValue(app, "redis.connect.startedat", time.Now())
done := func(err error) {
logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration",
ctxu.Since(ctx, "redis.connect.startedat"))
if err != nil {
logger.Errorf("redis: error connecting: %v", err)
} else {
logger.Infof("redis: connect %v", configuration.Redis.Addr)
}
}
conn, err := redis.DialTimeout("tcp",
configuration.Redis.Addr,
configuration.Redis.DialTimeout,
configuration.Redis.ReadTimeout,
configuration.Redis.WriteTimeout)
if err != nil {
ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
configuration.Redis.Addr, err)
done(err)
return nil, err
}
// authorize the connection
if configuration.Redis.Password != "" {
if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
// select the database to use
if configuration.Redis.DB != 0 {
if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
defer conn.Close()
done(err)
return nil, err
}
}
done(nil)
return conn, nil
},
MaxIdle: configuration.Redis.Pool.MaxIdle,
MaxActive: configuration.Redis.Pool.MaxActive,
IdleTimeout: configuration.Redis.Pool.IdleTimeout,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
// TODO(stevvooe): We can probably do something more interesting
// here with the health package.
_, err := c.Do("PING")
return err
},
Wait: false, // if a connection is not avialable, proceed without cache.
}
app.redis = pool
expvar.Publish("redis", expvar.Func(func() interface{} {
return map[string]interface{}{
"Config": configuration.Redis,
"Active": app.redis.ActiveCount(),
}
}))
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() // ensure that request body is always closed.