Define and implement layer info cache
This changeset defines the interface for layer info caches. Layer info caches speed up access to layer meta data accessed in storage driver backends. The two main operations are tests for repository membership and resolving path and size information for backend blobs. Two implementations are available. The main implementation leverages redis to store layer info. An alternative implementation simply caches layer info in maps, which should speed up resolution for less sophisticated implementations. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
3cad3c7b6a
commit
b1f616cbff
98
registry/storage/cache/cache.go
vendored
Normal file
98
registry/storage/cache/cache.go
vendored
Normal file
@ -0,0 +1,98 @@
|
||||
// Package cache provides facilities to speed up access to the storage
|
||||
// backend. Typically cache implementations deal with internal implementation
|
||||
// details at the backend level, rather than generalized caches for
|
||||
// distribution related interfaces. In other words, unless the cache is
|
||||
// specific to the storage package, it belongs in another package.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/docker/distribution/digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// ErrNotFound is returned when a meta item is not found.
|
||||
var ErrNotFound = fmt.Errorf("not found")
|
||||
|
||||
// LayerMeta describes the backend location and length of layer data.
|
||||
type LayerMeta struct {
|
||||
Path string
|
||||
Length int64
|
||||
}
|
||||
|
||||
// LayerInfoCache is a driver-aware cache of layer metadata. Basically, it
|
||||
// provides a fast cache for checks against repository metadata, avoiding
|
||||
// round trips to backend storage. Note that this is different from a pure
|
||||
// layer cache, which would also provide access to backing data, as well. Such
|
||||
// a cache should be implemented as a middleware, rather than integrated with
|
||||
// the storage backend.
|
||||
//
|
||||
// Note that most implementations rely on the caller to do strict checks on on
|
||||
// repo and dgst arguments, since these are mostly used behind existing
|
||||
// implementations.
|
||||
type LayerInfoCache interface {
|
||||
// Contains returns true if the repository with name contains the layer.
|
||||
Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error)
|
||||
|
||||
// Add includes the layer in the given repository cache.
|
||||
Add(ctx context.Context, repo string, dgst digest.Digest) error
|
||||
|
||||
// Meta provides the location of the layer on the backend and its size. Membership of a
|
||||
// repository should be tested before using the result, if required.
|
||||
Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error)
|
||||
|
||||
// SetMeta sets the meta data for the given layer.
|
||||
SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error
|
||||
}
|
||||
|
||||
// base implements common checks between cache implementations. Note that
|
||||
// these are not full checks of input, since that should be done by the
|
||||
// caller.
|
||||
type base struct {
|
||||
LayerInfoCache
|
||||
}
|
||||
|
||||
func (b *base) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
|
||||
if repo == "" {
|
||||
return false, fmt.Errorf("cache: cannot check for empty repository name")
|
||||
}
|
||||
|
||||
if dgst == "" {
|
||||
return false, fmt.Errorf("cache: cannot check for empty digests")
|
||||
}
|
||||
|
||||
return b.LayerInfoCache.Contains(ctx, repo, dgst)
|
||||
}
|
||||
|
||||
func (b *base) Add(ctx context.Context, repo string, dgst digest.Digest) error {
|
||||
if repo == "" {
|
||||
return fmt.Errorf("cache: cannot add empty repository name")
|
||||
}
|
||||
|
||||
if dgst == "" {
|
||||
return fmt.Errorf("cache: cannot add empty digest")
|
||||
}
|
||||
|
||||
return b.LayerInfoCache.Add(ctx, repo, dgst)
|
||||
}
|
||||
|
||||
func (b *base) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
|
||||
if dgst == "" {
|
||||
return LayerMeta{}, fmt.Errorf("cache: cannot get meta for empty digest")
|
||||
}
|
||||
|
||||
return b.LayerInfoCache.Meta(ctx, dgst)
|
||||
}
|
||||
|
||||
func (b *base) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
|
||||
if dgst == "" {
|
||||
return fmt.Errorf("cache: cannot set meta for empty digest")
|
||||
}
|
||||
|
||||
if meta.Path == "" {
|
||||
return fmt.Errorf("cache: cannot set empty path for meta")
|
||||
}
|
||||
|
||||
return b.LayerInfoCache.SetMeta(ctx, dgst, meta)
|
||||
}
|
86
registry/storage/cache/cache_test.go
vendored
Normal file
86
registry/storage/cache/cache_test.go
vendored
Normal file
@ -0,0 +1,86 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// checkLayerInfoCache takes a cache implementation through a common set of
|
||||
// operations. If adding new tests, please add them here so new
|
||||
// implementations get the benefit.
|
||||
func checkLayerInfoCache(t *testing.T, lic LayerInfoCache) {
|
||||
ctx := context.Background()
|
||||
|
||||
exists, err := lic.Contains(ctx, "", "fake:abc")
|
||||
if err == nil {
|
||||
t.Fatalf("expected error checking for cache item with empty repo")
|
||||
}
|
||||
|
||||
exists, err = lic.Contains(ctx, "foo/bar", "")
|
||||
if err == nil {
|
||||
t.Fatalf("expected error checking for cache item with empty digest")
|
||||
}
|
||||
|
||||
exists, err = lic.Contains(ctx, "foo/bar", "fake:abc")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking for cache item: %v", err)
|
||||
}
|
||||
|
||||
if exists {
|
||||
t.Fatalf("item should not exist")
|
||||
}
|
||||
|
||||
if err := lic.Add(ctx, "", "fake:abc"); err == nil {
|
||||
t.Fatalf("expected error adding cache item with empty name")
|
||||
}
|
||||
|
||||
if err := lic.Add(ctx, "foo/bar", ""); err == nil {
|
||||
t.Fatalf("expected error adding cache item with empty digest")
|
||||
}
|
||||
|
||||
if err := lic.Add(ctx, "foo/bar", "fake:abc"); err != nil {
|
||||
t.Fatalf("unexpected error adding item: %v", err)
|
||||
}
|
||||
|
||||
exists, err = lic.Contains(ctx, "foo/bar", "fake:abc")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking for cache item: %v", err)
|
||||
}
|
||||
|
||||
if !exists {
|
||||
t.Fatalf("item should exist")
|
||||
}
|
||||
|
||||
_, err = lic.Meta(ctx, "")
|
||||
if err == nil || err == ErrNotFound {
|
||||
t.Fatalf("expected error getting meta for cache item with empty digest")
|
||||
}
|
||||
|
||||
_, err = lic.Meta(ctx, "fake:abc")
|
||||
if err != ErrNotFound {
|
||||
t.Fatalf("expected unknown layer error getting meta for cache item with empty digest")
|
||||
}
|
||||
|
||||
if err = lic.SetMeta(ctx, "", LayerMeta{}); err == nil {
|
||||
t.Fatalf("expected error setting meta for cache item with empty digest")
|
||||
}
|
||||
|
||||
if err = lic.SetMeta(ctx, "foo/bar", LayerMeta{}); err == nil {
|
||||
t.Fatalf("expected error setting meta for cache item with empty meta")
|
||||
}
|
||||
|
||||
expected := LayerMeta{Path: "/foo/bar", Length: 20}
|
||||
if err := lic.SetMeta(ctx, "foo/bar", expected); err != nil {
|
||||
t.Fatalf("unexpected error setting meta: %v", err)
|
||||
}
|
||||
|
||||
meta, err := lic.Meta(ctx, "foo/bar")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting meta: %v", err)
|
||||
}
|
||||
|
||||
if meta != expected {
|
||||
t.Fatalf("retrieved meta data did not match: %v", err)
|
||||
}
|
||||
}
|
63
registry/storage/cache/memory.go
vendored
Normal file
63
registry/storage/cache/memory.go
vendored
Normal file
@ -0,0 +1,63 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/docker/distribution/digest"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// inmemoryLayerInfoCache is a map-based implementation of LayerInfoCache.
|
||||
type inmemoryLayerInfoCache struct {
|
||||
membership map[string]map[digest.Digest]struct{}
|
||||
meta map[digest.Digest]LayerMeta
|
||||
}
|
||||
|
||||
// NewInMemoryLayerInfoCache provides an implementation of LayerInfoCache that
|
||||
// stores results in memory.
|
||||
func NewInMemoryLayerInfoCache() LayerInfoCache {
|
||||
return &base{&inmemoryLayerInfoCache{
|
||||
membership: make(map[string]map[digest.Digest]struct{}),
|
||||
meta: make(map[digest.Digest]LayerMeta),
|
||||
}}
|
||||
}
|
||||
|
||||
func (ilic *inmemoryLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
|
||||
members, ok := ilic.membership[repo]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
_, ok = members[dgst]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Add adds the layer to the redis repository blob set.
|
||||
func (ilic *inmemoryLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error {
|
||||
members, ok := ilic.membership[repo]
|
||||
if !ok {
|
||||
members = make(map[digest.Digest]struct{})
|
||||
ilic.membership[repo] = members
|
||||
}
|
||||
|
||||
members[dgst] = struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Meta retrieves the layer meta data from the redis hash, returning
|
||||
// ErrUnknownLayer if not found.
|
||||
func (ilic *inmemoryLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
|
||||
meta, ok := ilic.meta[dgst]
|
||||
if !ok {
|
||||
return LayerMeta{}, ErrNotFound
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// SetMeta sets the meta data for the given digest using a redis hash. A hash
|
||||
// is used here since we may store unrelated fields about a layer in the
|
||||
// future.
|
||||
func (ilic *inmemoryLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
|
||||
ilic.meta[dgst] = meta
|
||||
return nil
|
||||
}
|
9
registry/storage/cache/memory_test.go
vendored
Normal file
9
registry/storage/cache/memory_test.go
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
package cache
|
||||
|
||||
import "testing"
|
||||
|
||||
// TestInMemoryLayerInfoCache checks the in memory implementation is working
|
||||
// correctly.
|
||||
func TestInMemoryLayerInfoCache(t *testing.T) {
|
||||
checkLayerInfoCache(t, NewInMemoryLayerInfoCache())
|
||||
}
|
98
registry/storage/cache/redis.go
vendored
Normal file
98
registry/storage/cache/redis.go
vendored
Normal file
@ -0,0 +1,98 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
ctxu "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// redisLayerInfoCache provides an implementation of storage.LayerInfoCache
|
||||
// based on redis. Layer info is stored in two parts. The first provide fast
|
||||
// access to repository membership through a redis set for each repo. The
|
||||
// second is a redis hash keyed by the digest of the layer, providing path and
|
||||
// length information. Note that there is no implied relationship between
|
||||
// these two caches. The layer may exist in one, both or none and the code
|
||||
// must be written this way.
|
||||
type redisLayerInfoCache struct {
|
||||
pool *redis.Pool
|
||||
|
||||
// TODO(stevvooe): We use a pool because we don't have great control over
|
||||
// the cache lifecycle to manage connections. A new connection if fetched
|
||||
// for each operation. Once we have better lifecycle management of the
|
||||
// request objects, we can change this to a connection.
|
||||
}
|
||||
|
||||
// NewRedisLayerInfoCache returns a new redis-based LayerInfoCache using the
|
||||
// provided redis connection pool.
|
||||
func NewRedisLayerInfoCache(pool *redis.Pool) LayerInfoCache {
|
||||
return &base{&redisLayerInfoCache{
|
||||
pool: pool,
|
||||
}}
|
||||
}
|
||||
|
||||
// Contains does a membership check on the repository blob set in redis. This
|
||||
// is used as an access check before looking up global path information. If
|
||||
// false is returned, the caller should still check the backend to if it
|
||||
// exists elsewhere.
|
||||
func (rlic *redisLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) {
|
||||
conn := rlic.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Contains(%q, %q)", repo, dgst)
|
||||
return redis.Bool(conn.Do("SISMEMBER", rlic.repositoryBlobSetKey(repo), dgst))
|
||||
}
|
||||
|
||||
// Add adds the layer to the redis repository blob set.
|
||||
func (rlic *redisLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error {
|
||||
conn := rlic.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Add(%q, %q)", repo, dgst)
|
||||
_, err := conn.Do("SADD", rlic.repositoryBlobSetKey(repo), dgst)
|
||||
return err
|
||||
}
|
||||
|
||||
// Meta retrieves the layer meta data from the redis hash, returning
|
||||
// ErrUnknownLayer if not found.
|
||||
func (rlic *redisLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) {
|
||||
conn := rlic.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
reply, err := redis.Values(conn.Do("HMGET", rlic.blobMetaHashKey(dgst), "path", "length"))
|
||||
if err != nil {
|
||||
return LayerMeta{}, err
|
||||
}
|
||||
|
||||
if len(reply) < 2 || reply[0] == nil || reply[1] == nil {
|
||||
return LayerMeta{}, ErrNotFound
|
||||
}
|
||||
|
||||
var meta LayerMeta
|
||||
if _, err := redis.Scan(reply, &meta.Path, &meta.Length); err != nil {
|
||||
return LayerMeta{}, err
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// SetMeta sets the meta data for the given digest using a redis hash. A hash
|
||||
// is used here since we may store unrelated fields about a layer in the
|
||||
// future.
|
||||
func (rlic *redisLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error {
|
||||
conn := rlic.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
_, err := conn.Do("HMSET", rlic.blobMetaHashKey(dgst), "path", meta.Path, "length", meta.Length)
|
||||
return err
|
||||
}
|
||||
|
||||
// repositoryBlobSetKey returns the key for the blob set in the cache.
|
||||
func (rlic *redisLayerInfoCache) repositoryBlobSetKey(repo string) string {
|
||||
return "repository::" + repo + "::blobs"
|
||||
}
|
||||
|
||||
// blobMetaHashKey returns the cache key for immutable blob meta data.
|
||||
func (rlic *redisLayerInfoCache) blobMetaHashKey(dgst digest.Digest) string {
|
||||
return "blobs::" + dgst.String()
|
||||
}
|
50
registry/storage/cache/redis_test.go
vendored
Normal file
50
registry/storage/cache/redis_test.go
vendored
Normal file
@ -0,0 +1,50 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
var redisAddr string
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&redisAddr, "test.registry.storage.cache.redis.addr", "", "configure the address of a test instance of redis")
|
||||
}
|
||||
|
||||
// TestRedisLayerInfoCache exercises a live redis instance using the cache
|
||||
// implementation.
|
||||
func TestRedisLayerInfoCache(t *testing.T) {
|
||||
if redisAddr == "" {
|
||||
// fallback to an environement variable
|
||||
redisAddr = os.Getenv("TEST_REGISTRY_STORAGE_CACHE_REDIS_ADDR")
|
||||
}
|
||||
|
||||
if redisAddr == "" {
|
||||
// skip if still not set
|
||||
t.Skip("please set -registry.storage.cache.redis to test layer info cache against redis")
|
||||
}
|
||||
|
||||
pool := &redis.Pool{
|
||||
Dial: func() (redis.Conn, error) {
|
||||
return redis.Dial("tcp", redisAddr)
|
||||
},
|
||||
MaxIdle: 1,
|
||||
MaxActive: 2,
|
||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
Wait: false, // if a connection is not avialable, proceed without cache.
|
||||
}
|
||||
|
||||
// Clear the database
|
||||
if _, err := pool.Get().Do("FLUSHDB"); err != nil {
|
||||
t.Fatalf("unexpected error flushing redis db: %v", err)
|
||||
}
|
||||
|
||||
checkLayerInfoCache(t, NewRedisLayerInfoCache(pool))
|
||||
}
|
Loading…
Reference in New Issue
Block a user