Make Storage Driver API calls context aware.

- Change driver interface to take a context as its first argument
     - Make newFileReader take a context as its first argument
     - Make newFileWriter take a context as its first argument
     - Make blobstore exists and delete take a context as a first argument
     - Pass the layerreader's context to the storage layer
     - Pass the app's context to purgeuploads
     - Store the app's context into the blobstore (was previously null)
     - Pass the trace'd context to the storage drivers

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
This commit is contained in:
Richard 2015-04-27 15:58:58 -07:00
parent c0d297c011
commit 5d9105bd25
30 changed files with 383 additions and 343 deletions

View File

@ -73,7 +73,6 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
var err error
app.driver, err = factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters())
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
@ -92,7 +91,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
}
startUploadPurger(app.driver, ctxu.GetLogger(app), purgeConfig)
startUploadPurger(app, app.driver, ctxu.GetLogger(app), purgeConfig)
app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"])
if err != nil {
@ -109,10 +108,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis))
app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewRedisLayerInfoCache(app.redis))
ctxu.GetLogger(app).Infof("using redis layerinfo cache")
case "inmemory":
app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache())
app.registry = storage.NewRegistryWithDriver(app, app.driver, cache.NewInMemoryLayerInfoCache())
ctxu.GetLogger(app).Infof("using inmemory layerinfo cache")
default:
if cc["layerinfo"] != "" {
@ -123,7 +122,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App
if app.registry == nil {
// configure the registry if no cache section is available.
app.registry = storage.NewRegistryWithDriver(app.driver, nil)
app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil)
}
app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"])
@ -365,7 +364,6 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
}
dispatch(context, r).ServeHTTP(w, r)
// Automated error response handling here. Handlers may return their
// own errors if they need different behavior (such as range errors
// for layer upload).
@ -597,7 +595,7 @@ func badPurgeUploadConfig(reason string) {
// startUploadPurger schedules a goroutine which will periodically
// check upload directories for old files and delete them
func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) {
func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log ctxu.Logger, config map[interface{}]interface{}) {
if config["enabled"] == false {
return
}
@ -652,7 +650,7 @@ func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logge
time.Sleep(jitter)
for {
storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
log.Infof("Starting upload purge in %s", intervalDuration)
time.Sleep(intervalDuration)
}

View File

@ -24,12 +24,13 @@ import (
// tested individually.
func TestAppDispatcher(t *testing.T) {
driver := inmemory.New()
ctx := context.Background()
app := &App{
Config: configuration.Configuration{},
Context: context.Background(),
Context: ctx,
router: v2.Router(),
driver: driver,
registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()),
registry: storage.NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache()),
}
server := httptest.NewServer(app)
router := v2.Router()

View File

@ -4,7 +4,7 @@ import (
"net/http"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/v2"
"github.com/gorilla/handlers"
@ -48,7 +48,7 @@ type layerHandler struct {
// GetLayer fetches the binary data from backend storage returns it in the
// response.
func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
ctxu.GetLogger(lh).Debug("GetImageLayer")
context.GetLogger(lh).Debug("GetImageLayer")
layers := lh.Repository.Layers()
layer, err := layers.Fetch(lh.Digest)
@ -65,7 +65,7 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
handler, err := layer.Handler(r)
if err != nil {
ctxu.GetLogger(lh).Debugf("unexpected error getting layer HTTP handler: %s", err)
context.GetLogger(lh).Debugf("unexpected error getting layer HTTP handler: %s", err)
lh.Errors.Push(v2.ErrorCodeUnknown, err)
return
}

View File

@ -3,10 +3,9 @@ package storage
import (
"fmt"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"golang.org/x/net/context"
)
// TODO(stevvooe): Currently, the blobStore implementation used by the
@ -32,7 +31,7 @@ func (bs *blobStore) exists(dgst digest.Digest) (bool, error) {
return false, err
}
ok, err := exists(bs.driver, path)
ok, err := exists(bs.ctx, bs.driver, path)
if err != nil {
return false, err
}
@ -48,7 +47,7 @@ func (bs *blobStore) get(dgst digest.Digest) ([]byte, error) {
return nil, err
}
return bs.driver.GetContent(bp)
return bs.driver.GetContent(bs.ctx, bp)
}
// link links the path to the provided digest by writing the digest into the
@ -62,7 +61,7 @@ func (bs *blobStore) link(path string, dgst digest.Digest) error {
// The contents of the "link" file are the exact string contents of the
// digest, which is specified in that package.
return bs.driver.PutContent(path, []byte(dgst))
return bs.driver.PutContent(bs.ctx, path, []byte(dgst))
}
// linked reads the link at path and returns the content.
@ -77,7 +76,7 @@ func (bs *blobStore) linked(path string) ([]byte, error) {
// readlink returns the linked digest at path.
func (bs *blobStore) readlink(path string) (digest.Digest, error) {
content, err := bs.driver.GetContent(path)
content, err := bs.driver.GetContent(bs.ctx, path)
if err != nil {
return "", err
}
@ -112,7 +111,7 @@ func (bs *blobStore) resolve(path string) (string, error) {
func (bs *blobStore) put(p []byte) (digest.Digest, error) {
dgst, err := digest.FromBytes(p)
if err != nil {
ctxu.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p))
context.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p))
return "", err
}
@ -128,7 +127,7 @@ func (bs *blobStore) put(p []byte) (digest.Digest, error) {
return dgst, nil
}
return dgst, bs.driver.PutContent(bp, p)
return dgst, bs.driver.PutContent(bs.ctx, bp, p)
}
// path returns the canonical path for the blob identified by digest. The blob
@ -145,9 +144,9 @@ func (bs *blobStore) path(dgst digest.Digest) (string, error) {
return bp, nil
}
// exists provides a utility method to test whether or not
func exists(driver storagedriver.StorageDriver, path string) (bool, error) {
if _, err := driver.Stat(path); err != nil {
// exists provides a utility method to test whether or not a path exists
func exists(ctx context.Context, driver storagedriver.StorageDriver, path string) (bool, error) {
if _, err := driver.Stat(ctx, path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return false, nil

View File

@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
@ -99,7 +100,7 @@ func (d *driver) Name() string {
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
blob, err := d.client.GetBlob(d.container, path)
if err != nil {
if is404(err) {
@ -112,13 +113,13 @@ func (d *driver) GetContent(path string) ([]byte, error) {
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(path string, contents []byte) error {
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents)))
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if !ok {
@ -145,7 +146,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) {
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
return 0, err
} else if !blobExists {
@ -166,7 +167,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (int64
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
// Check if the path is a blob
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
@ -215,7 +216,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(path string) ([]string, error) {
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
if path == "/" {
path = ""
}
@ -231,7 +232,7 @@ func (d *driver) List(path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(sourcePath string, destPath string) error {
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath)
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
if err != nil {
@ -245,7 +246,7 @@ func (d *driver) Move(sourcePath string, destPath string) error {
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(path string) error {
func (d *driver) Delete(ctx context.Context, path string) error {
ok, err := d.client.DeleteBlobIfExists(d.container, path)
if err != nil {
return err
@ -275,7 +276,7 @@ func (d *driver) Delete(path string) error {
// URLFor returns a publicly accessible URL for the blob stored at given path
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
expires, ok := options["expiry"]
if ok {

View File

@ -51,32 +51,32 @@ type Base struct {
}
// GetContent wraps GetContent of underlying storage driver.
func (base *Base) GetContent(path string) ([]byte, error) {
_, done := context.WithTrace(context.Background())
func (base *Base) GetContent(ctx context.Context, path string) ([]byte, error) {
ctx, done := context.WithTrace(ctx)
defer done("%s.GetContent(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.GetContent(path)
return base.StorageDriver.GetContent(ctx, path)
}
// PutContent wraps PutContent of underlying storage driver.
func (base *Base) PutContent(path string, content []byte) error {
_, done := context.WithTrace(context.Background())
func (base *Base) PutContent(ctx context.Context, path string, content []byte) error {
ctx, done := context.WithTrace(context.Background())
defer done("%s.PutContent(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.PutContent(path, content)
return base.StorageDriver.PutContent(ctx, path, content)
}
// ReadStream wraps ReadStream of underlying storage driver.
func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) {
_, done := context.WithTrace(context.Background())
func (base *Base) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
ctx, done := context.WithTrace(context.Background())
defer done("%s.ReadStream(%q, %d)", base.Name(), path, offset)
if offset < 0 {
@ -87,12 +87,12 @@ func (base *Base) ReadStream(path string, offset int64) (io.ReadCloser, error) {
return nil, storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.ReadStream(path, offset)
return base.StorageDriver.ReadStream(ctx, path, offset)
}
// WriteStream wraps WriteStream of underlying storage driver.
func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) {
_, done := context.WithTrace(context.Background())
func (base *Base) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) {
ctx, done := context.WithTrace(ctx)
defer done("%s.WriteStream(%q, %d)", base.Name(), path, offset)
if offset < 0 {
@ -103,36 +103,36 @@ func (base *Base) WriteStream(path string, offset int64, reader io.Reader) (nn i
return 0, storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.WriteStream(path, offset, reader)
return base.StorageDriver.WriteStream(ctx, path, offset, reader)
}
// Stat wraps Stat of underlying storage driver.
func (base *Base) Stat(path string) (storagedriver.FileInfo, error) {
_, done := context.WithTrace(context.Background())
func (base *Base) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
ctx, done := context.WithTrace(ctx)
defer done("%s.Stat(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return nil, storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.Stat(path)
return base.StorageDriver.Stat(ctx, path)
}
// List wraps List of underlying storage driver.
func (base *Base) List(path string) ([]string, error) {
_, done := context.WithTrace(context.Background())
func (base *Base) List(ctx context.Context, path string) ([]string, error) {
ctx, done := context.WithTrace(ctx)
defer done("%s.List(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
return nil, storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.List(path)
return base.StorageDriver.List(ctx, path)
}
// Move wraps Move of underlying storage driver.
func (base *Base) Move(sourcePath string, destPath string) error {
_, done := context.WithTrace(context.Background())
func (base *Base) Move(ctx context.Context, sourcePath string, destPath string) error {
ctx, done := context.WithTrace(ctx)
defer done("%s.Move(%q, %q", base.Name(), sourcePath, destPath)
if !storagedriver.PathRegexp.MatchString(sourcePath) {
@ -141,29 +141,29 @@ func (base *Base) Move(sourcePath string, destPath string) error {
return storagedriver.InvalidPathError{Path: destPath}
}
return base.StorageDriver.Move(sourcePath, destPath)
return base.StorageDriver.Move(ctx, sourcePath, destPath)
}
// Delete wraps Delete of underlying storage driver.
func (base *Base) Delete(path string) error {
_, done := context.WithTrace(context.Background())
func (base *Base) Delete(ctx context.Context, path string) error {
ctx, done := context.WithTrace(ctx)
defer done("%s.Delete(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.Delete(path)
return base.StorageDriver.Delete(ctx, path)
}
// URLFor wraps URLFor of underlying storage driver.
func (base *Base) URLFor(path string, options map[string]interface{}) (string, error) {
_, done := context.WithTrace(context.Background())
func (base *Base) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
ctx, done := context.WithTrace(ctx)
defer done("%s.URLFor(%q)", base.Name(), path)
if !storagedriver.PathRegexp.MatchString(path) {
return "", storagedriver.InvalidPathError{Path: path}
}
return base.StorageDriver.URLFor(path, options)
return base.StorageDriver.URLFor(ctx, path, options)
}

View File

@ -9,6 +9,7 @@ import (
"path"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
@ -76,8 +77,8 @@ func (d *driver) Name() string {
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
rc, err := d.ReadStream(path, 0)
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
rc, err := d.ReadStream(ctx, path, 0)
if err != nil {
return nil, err
}
@ -92,8 +93,8 @@ func (d *driver) GetContent(path string) ([]byte, error) {
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(subPath string, contents []byte) error {
if _, err := d.WriteStream(subPath, 0, bytes.NewReader(contents)); err != nil {
func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
if _, err := d.WriteStream(ctx, subPath, 0, bytes.NewReader(contents)); err != nil {
return err
}
@ -102,7 +103,7 @@ func (d *driver) PutContent(subPath string, contents []byte) error {
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
if err != nil {
if os.IsNotExist(err) {
@ -126,7 +127,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.Reader at a location
// designated by the given path.
func (d *driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn int64, err error) {
func (d *driver) WriteStream(ctx context.Context, subPath string, offset int64, reader io.Reader) (nn int64, err error) {
// TODO(stevvooe): This needs to be a requirement.
// if !path.IsAbs(subPath) {
// return fmt.Errorf("absolute path required: %q", subPath)
@ -162,7 +163,7 @@ func (d *driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(subPath string) (storagedriver.FileInfo, error) {
func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
fullPath := d.fullPath(subPath)
fi, err := os.Stat(fullPath)
@ -182,7 +183,7 @@ func (d *driver) Stat(subPath string) (storagedriver.FileInfo, error) {
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(subPath string) ([]string, error) {
func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
if subPath[len(subPath)-1] != '/' {
subPath += "/"
}
@ -213,7 +214,7 @@ func (d *driver) List(subPath string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(sourcePath string, destPath string) error {
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
source := d.fullPath(sourcePath)
dest := d.fullPath(destPath)
@ -230,7 +231,7 @@ func (d *driver) Move(sourcePath string, destPath string) error {
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(subPath string) error {
func (d *driver) Delete(ctx context.Context, subPath string) error {
fullPath := d.fullPath(subPath)
_, err := os.Stat(fullPath)
@ -246,7 +247,7 @@ func (d *driver) Delete(subPath string) error {
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
return "", storagedriver.ErrUnsupportedMethod
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
@ -69,11 +70,11 @@ func (d *driver) Name() string {
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
rc, err := d.ReadStream(path, 0)
rc, err := d.ReadStream(ctx, path, 0)
if err != nil {
return nil, err
}
@ -83,7 +84,7 @@ func (d *driver) GetContent(path string) ([]byte, error) {
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(p string, contents []byte) error {
func (d *driver) PutContent(ctx context.Context, p string, contents []byte) error {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -102,7 +103,7 @@ func (d *driver) PutContent(p string, contents []byte) error {
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
@ -126,7 +127,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) {
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -167,7 +168,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (nn in
}
// Stat returns info about the provided path.
func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
@ -193,7 +194,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(path string) ([]string, error) {
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
@ -223,7 +224,7 @@ func (d *driver) List(path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(sourcePath string, destPath string) error {
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -239,7 +240,7 @@ func (d *driver) Move(sourcePath string, destPath string) error {
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(path string) error {
func (d *driver) Delete(ctx context.Context, path string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -256,6 +257,6 @@ func (d *driver) Delete(path string) error {
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
return "", storagedriver.ErrUnsupportedMethod
}

View File

@ -98,12 +98,12 @@ type S3BucketKeyer interface {
// Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver.
func (lh *cloudFrontStorageMiddleware) URLFor(path string, options map[string]interface{}) (string, error) {
func (lh *cloudFrontStorageMiddleware) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
// TODO(endophage): currently only supports S3
keyer, ok := lh.StorageDriver.(S3BucketKeyer)
if !ok {
context.GetLogger(context.Background()).Warn("the CloudFront middleware does not support this backend storage driver")
return lh.StorageDriver.URLFor(path, options)
context.GetLogger(ctx).Warn("the CloudFront middleware does not support this backend storage driver")
return lh.StorageDriver.URLFor(ctx, path, options)
}
cfURL, err := lh.cloudfront.CannedSignedURL(keyer.S3BucketKey(path), "", time.Now().Add(lh.duration))

View File

@ -29,6 +29,8 @@ import (
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
@ -267,7 +269,7 @@ func (d *driver) Name() string {
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(path string) ([]byte, error) {
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
content, err := d.Bucket.Get(d.s3Path(path))
if err != nil {
return nil, parseError(path, err)
@ -276,13 +278,13 @@ func (d *driver) GetContent(path string) ([]byte, error) {
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(path string, contents []byte) error {
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
@ -304,7 +306,7 @@ func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (totalRead int64, err error) {
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
partNumber := 1
bytesRead := 0
var putErrChan chan error
@ -348,7 +350,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
// Fills from 0 to total from current
fromSmallCurrent := func(total int64) error {
current, err := d.ReadStream(path, 0)
current, err := d.ReadStream(ctx, path, 0)
if err != nil {
return err
}
@ -628,7 +630,7 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", 1)
if err != nil {
return nil, err
@ -661,7 +663,7 @@ func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
}
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(path string) ([]string, error) {
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
@ -706,7 +708,7 @@ func (d *driver) List(path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(sourcePath string, destPath string) error {
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(d.s3Path(destPath), getPermissions(),
s3.CopyOptions{Options: d.getOptions(), ContentType: d.getContentType()}, d.Bucket.Name+"/"+d.s3Path(sourcePath))
@ -714,11 +716,11 @@ func (d *driver) Move(sourcePath string, destPath string) error {
return parseError(sourcePath, err)
}
return d.Delete(sourcePath)
return d.Delete(ctx, sourcePath)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(path string) error {
func (d *driver) Delete(ctx context.Context, path string) error {
listResponse, err := d.Bucket.List(d.s3Path(path), "", "", listMax)
if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
@ -747,7 +749,7 @@ func (d *driver) Delete(path string) error {
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
methodString := "GET"
method, ok := options["method"]
if ok {

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/AdRoll/goamz/aws"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites"
@ -134,16 +135,17 @@ func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) {
filename := "/test"
contents := []byte("contents")
err = rootedDriver.PutContent(filename, contents)
ctx := context.Background()
err = rootedDriver.PutContent(ctx, filename, contents)
c.Assert(err, check.IsNil)
defer rootedDriver.Delete(filename)
defer rootedDriver.Delete(ctx, filename)
keys, err := emptyRootDriver.List("/")
keys, err := emptyRootDriver.List(ctx, "/")
for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
}
keys, err = slashRootDriver.List("/")
keys, err = slashRootDriver.List(ctx, "/")
for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
}

View File

@ -7,6 +7,8 @@ import (
"regexp"
"strconv"
"strings"
"github.com/docker/distribution/context"
)
// Version is a string representing the storage driver version, of the form
@ -42,45 +44,45 @@ type StorageDriver interface {
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
GetContent(path string) ([]byte, error)
GetContent(ctx context.Context, path string) ([]byte, error)
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
PutContent(path string, content []byte) error
PutContent(ctx context.Context, path string, content []byte) error
// ReadStream retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
ReadStream(path string, offset int64) (io.ReadCloser, error)
ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error)
// WriteStream stores the contents of the provided io.ReadCloser at a
// location designated by the given path.
// May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path.
WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error)
WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error)
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
Stat(path string) (FileInfo, error)
Stat(ctx context.Context, path string) (FileInfo, error)
// List returns a list of the objects that are direct descendants of the
//given path.
List(path string) ([]string, error)
List(ctx context.Context, path string) ([]string, error)
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
// Note: This may be no more efficient than a copy followed by a delete for
// many implementations.
Move(sourcePath string, destPath string) error
Move(ctx context.Context, sourcePath string, destPath string) error
// Delete recursively deletes all objects stored at "path" and its subpaths.
Delete(path string) error
Delete(ctx context.Context, path string) error
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// May return an ErrUnsupportedMethod in certain StorageDriver
// implementations.
URLFor(path string, options map[string]interface{}) (string, error)
URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error)
}
// PathRegexp is the regular expression which each file path must match. A

View File

@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"gopkg.in/check.v1"
)
@ -27,6 +28,7 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
check.Suite(&DriverSuite{
Constructor: driverConstructor,
SkipCheck: skipCheck,
ctx: context.Background(),
})
}
@ -88,6 +90,7 @@ type DriverSuite struct {
Teardown DriverTeardown
SkipCheck
storagedriver.StorageDriver
ctx context.Context
}
// SetUpSuite sets up the gocheck test suite.
@ -112,7 +115,7 @@ func (suite *DriverSuite) TearDownSuite(c *check.C) {
// This causes the suite to abort if any files are left around in the storage
// driver.
func (suite *DriverSuite) TearDownTest(c *check.C) {
files, _ := suite.StorageDriver.List("/")
files, _ := suite.StorageDriver.List(suite.ctx, "/")
if len(files) > 0 {
c.Fatalf("Storage driver did not clean up properly. Offending files: %#v", files)
}
@ -141,11 +144,11 @@ func (suite *DriverSuite) TestValidPaths(c *check.C) {
"/Abc/Cba"}
for _, filename := range validFiles {
err := suite.StorageDriver.PutContent(filename, contents)
defer suite.StorageDriver.Delete(firstPart(filename))
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
c.Assert(err, check.IsNil)
received, err := suite.StorageDriver.GetContent(filename)
received, err := suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, contents)
}
@ -164,12 +167,12 @@ func (suite *DriverSuite) TestInvalidPaths(c *check.C) {
"/abc_123/"}
for _, filename := range invalidFiles {
err := suite.StorageDriver.PutContent(filename, contents)
defer suite.StorageDriver.Delete(firstPart(filename))
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidPathError{})
_, err = suite.StorageDriver.GetContent(filename)
_, err = suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidPathError{})
}
@ -225,7 +228,7 @@ func (suite *DriverSuite) TestTruncate(c *check.C) {
// TestReadNonexistent tests reading content from an empty path.
func (suite *DriverSuite) TestReadNonexistent(c *check.C) {
filename := randomPath(32)
_, err := suite.StorageDriver.GetContent(filename)
_, err := suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -277,17 +280,17 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) {
}
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
checksum := sha1.New()
var fileSize int64 = 5 * 1024 * 1024 * 1024
contents := newRandReader(fileSize)
written, err := suite.StorageDriver.WriteStream(filename, 0, io.TeeReader(contents, checksum))
written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, io.TeeReader(contents, checksum))
c.Assert(err, check.IsNil)
c.Assert(written, check.Equals, fileSize)
reader, err := suite.StorageDriver.ReadStream(filename, 0)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.IsNil)
writtenChecksum := sha1.New()
@ -300,7 +303,7 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) {
// reading with a given offset.
func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
chunkSize := int64(32)
@ -308,10 +311,10 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
contentsChunk2 := randomContents(chunkSize)
contentsChunk3 := randomContents(chunkSize)
err := suite.StorageDriver.PutContent(filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
err := suite.StorageDriver.PutContent(suite.ctx, filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
c.Assert(err, check.IsNil)
reader, err := suite.StorageDriver.ReadStream(filename, 0)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -320,7 +323,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
c.Assert(readContents, check.DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize)
reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -329,7 +332,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
c.Assert(readContents, check.DeepEquals, append(contentsChunk2, contentsChunk3...))
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*2)
reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*2)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -338,7 +341,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
c.Assert(readContents, check.DeepEquals, contentsChunk3)
// Ensure we get invalid offest for negative offsets.
reader, err = suite.StorageDriver.ReadStream(filename, -1)
reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, -1)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{})
c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1))
c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename)
@ -346,7 +349,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
// Read past the end of the content and make sure we get a reader that
// returns 0 bytes and io.EOF
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3)
reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -356,7 +359,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
c.Assert(n, check.Equals, 0)
// Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF.
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3-1)
reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3-1)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -389,7 +392,7 @@ func (suite *DriverSuite) TestContinueStreamAppendSmall(c *check.C) {
func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) {
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
contentsChunk1 := randomContents(chunkSize)
contentsChunk2 := randomContents(chunkSize)
@ -399,39 +402,39 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64)
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contentsChunk1))
nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contentsChunk1))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contentsChunk1)))
fi, err := suite.StorageDriver.Stat(filename)
fi, err := suite.StorageDriver.Stat(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1)))
nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(contentsChunk2))
nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(contentsChunk2))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contentsChunk2)))
fi, err = suite.StorageDriver.Stat(filename)
fi, err = suite.StorageDriver.Stat(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, 2*chunkSize)
// Test re-writing the last chunk
nn, err = suite.StorageDriver.WriteStream(filename, fi.Size()-chunkSize, bytes.NewReader(contentsChunk2))
nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size()-chunkSize, bytes.NewReader(contentsChunk2))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contentsChunk2)))
fi, err = suite.StorageDriver.Stat(filename)
fi, err = suite.StorageDriver.Stat(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, 2*chunkSize)
nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():]))
nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():]))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():])))
received, err := suite.StorageDriver.GetContent(filename)
received, err := suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, fullContents)
@ -443,16 +446,16 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64)
fullContents = append(fullContents, zeroChunk...)
fullContents = append(fullContents, contentsChunk4...)
nn, err = suite.StorageDriver.WriteStream(filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4))
nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, chunkSize)
fi, err = suite.StorageDriver.Stat(filename)
fi, err = suite.StorageDriver.Stat(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, int64(len(fullContents)))
received, err = suite.StorageDriver.GetContent(filename)
received, err = suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(len(received), check.Equals, len(fullContents))
c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk)
@ -460,7 +463,7 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64)
c.Assert(received, check.DeepEquals, fullContents)
// Ensure that negative offsets return correct error.
nn, err = suite.StorageDriver.WriteStream(filename, -1, bytes.NewReader(zeroChunk))
nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, -1, bytes.NewReader(zeroChunk))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{})
c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename)
@ -472,11 +475,11 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64)
func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) {
filename := randomPath(32)
_, err := suite.StorageDriver.ReadStream(filename, 0)
_, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
_, err = suite.StorageDriver.ReadStream(filename, 64)
_, err = suite.StorageDriver.ReadStream(suite.ctx, filename, 64)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -484,27 +487,27 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) {
// TestList checks the returned list of keys after populating a directory tree.
func (suite *DriverSuite) TestList(c *check.C) {
rootDirectory := "/" + randomFilename(int64(8+rand.Intn(8)))
defer suite.StorageDriver.Delete(rootDirectory)
defer suite.StorageDriver.Delete(suite.ctx, rootDirectory)
parentDirectory := rootDirectory + "/" + randomFilename(int64(8+rand.Intn(8)))
childFiles := make([]string, 50)
for i := 0; i < len(childFiles); i++ {
childFile := parentDirectory + "/" + randomFilename(int64(8+rand.Intn(8)))
childFiles[i] = childFile
err := suite.StorageDriver.PutContent(childFile, randomContents(32))
err := suite.StorageDriver.PutContent(suite.ctx, childFile, randomContents(32))
c.Assert(err, check.IsNil)
}
sort.Strings(childFiles)
keys, err := suite.StorageDriver.List("/")
keys, err := suite.StorageDriver.List(suite.ctx, "/")
c.Assert(err, check.IsNil)
c.Assert(keys, check.DeepEquals, []string{rootDirectory})
keys, err = suite.StorageDriver.List(rootDirectory)
keys, err = suite.StorageDriver.List(suite.ctx, rootDirectory)
c.Assert(err, check.IsNil)
c.Assert(keys, check.DeepEquals, []string{parentDirectory})
keys, err = suite.StorageDriver.List(parentDirectory)
keys, err = suite.StorageDriver.List(suite.ctx, parentDirectory)
c.Assert(err, check.IsNil)
sort.Strings(keys)
@ -523,20 +526,20 @@ func (suite *DriverSuite) TestMove(c *check.C) {
sourcePath := randomPath(32)
destPath := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(sourcePath))
defer suite.StorageDriver.Delete(firstPart(destPath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(sourcePath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath))
err := suite.StorageDriver.PutContent(sourcePath, contents)
err := suite.StorageDriver.PutContent(suite.ctx, sourcePath, contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Move(sourcePath, destPath)
err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath)
c.Assert(err, check.IsNil)
received, err := suite.StorageDriver.GetContent(destPath)
received, err := suite.StorageDriver.GetContent(suite.ctx, destPath)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, contents)
_, err = suite.StorageDriver.GetContent(sourcePath)
_, err = suite.StorageDriver.GetContent(suite.ctx, sourcePath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -549,23 +552,23 @@ func (suite *DriverSuite) TestMoveOverwrite(c *check.C) {
sourceContents := randomContents(32)
destContents := randomContents(64)
defer suite.StorageDriver.Delete(firstPart(sourcePath))
defer suite.StorageDriver.Delete(firstPart(destPath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(sourcePath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath))
err := suite.StorageDriver.PutContent(sourcePath, sourceContents)
err := suite.StorageDriver.PutContent(suite.ctx, sourcePath, sourceContents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.PutContent(destPath, destContents)
err = suite.StorageDriver.PutContent(suite.ctx, destPath, destContents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Move(sourcePath, destPath)
err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath)
c.Assert(err, check.IsNil)
received, err := suite.StorageDriver.GetContent(destPath)
received, err := suite.StorageDriver.GetContent(suite.ctx, destPath)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, sourceContents)
_, err = suite.StorageDriver.GetContent(sourcePath)
_, err = suite.StorageDriver.GetContent(suite.ctx, sourcePath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -577,16 +580,16 @@ func (suite *DriverSuite) TestMoveNonexistent(c *check.C) {
sourcePath := randomPath(32)
destPath := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(destPath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(destPath))
err := suite.StorageDriver.PutContent(destPath, contents)
err := suite.StorageDriver.PutContent(suite.ctx, destPath, contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Move(sourcePath, destPath)
err = suite.StorageDriver.Move(suite.ctx, sourcePath, destPath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
received, err := suite.StorageDriver.GetContent(destPath)
received, err := suite.StorageDriver.GetContent(suite.ctx, destPath)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, contents)
}
@ -596,12 +599,12 @@ func (suite *DriverSuite) TestMoveInvalid(c *check.C) {
contents := randomContents(32)
// Create a regular file.
err := suite.StorageDriver.PutContent("/notadir", contents)
err := suite.StorageDriver.PutContent(suite.ctx, "/notadir", contents)
c.Assert(err, check.IsNil)
defer suite.StorageDriver.Delete("/notadir")
defer suite.StorageDriver.Delete(suite.ctx, "/notadir")
// Now try to move a non-existent file under it.
err = suite.StorageDriver.Move("/notadir/foo", "/notadir/bar")
err = suite.StorageDriver.Move(suite.ctx, "/notadir/foo", "/notadir/bar")
c.Assert(err, check.NotNil) // non-nil error
}
@ -611,15 +614,15 @@ func (suite *DriverSuite) TestDelete(c *check.C) {
filename := randomPath(32)
contents := randomContents(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
err := suite.StorageDriver.PutContent(filename, contents)
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Delete(filename)
err = suite.StorageDriver.Delete(suite.ctx, filename)
c.Assert(err, check.IsNil)
_, err = suite.StorageDriver.GetContent(filename)
_, err = suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -630,12 +633,12 @@ func (suite *DriverSuite) TestURLFor(c *check.C) {
filename := randomPath(32)
contents := randomContents(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
err := suite.StorageDriver.PutContent(filename, contents)
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
url, err := suite.StorageDriver.URLFor(filename, nil)
url, err := suite.StorageDriver.URLFor(suite.ctx, filename, nil)
if err == storagedriver.ErrUnsupportedMethod {
return
}
@ -649,7 +652,7 @@ func (suite *DriverSuite) TestURLFor(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(read, check.DeepEquals, contents)
url, err = suite.StorageDriver.URLFor(filename, map[string]interface{}{"method": "HEAD"})
url, err = suite.StorageDriver.URLFor(suite.ctx, filename, map[string]interface{}{"method": "HEAD"})
if err == storagedriver.ErrUnsupportedMethod {
return
}
@ -663,7 +666,7 @@ func (suite *DriverSuite) TestURLFor(c *check.C) {
// TestDeleteNonexistent checks that removing a nonexistent key fails.
func (suite *DriverSuite) TestDeleteNonexistent(c *check.C) {
filename := randomPath(32)
err := suite.StorageDriver.Delete(filename)
err := suite.StorageDriver.Delete(suite.ctx, filename)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -676,42 +679,42 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) {
filename3 := randomPath(32)
contents := randomContents(32)
defer suite.StorageDriver.Delete(firstPart(dirname))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(dirname))
err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents)
err := suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename1), contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.PutContent(path.Join(dirname, filename2), contents)
err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename2), contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.PutContent(path.Join(dirname, filename3), contents)
err = suite.StorageDriver.PutContent(suite.ctx, path.Join(dirname, filename3), contents)
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Delete(path.Join(dirname, filename1))
err = suite.StorageDriver.Delete(suite.ctx, path.Join(dirname, filename1))
c.Assert(err, check.IsNil)
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename1))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename2))
c.Assert(err, check.IsNil)
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename3))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename3))
c.Assert(err, check.IsNil)
err = suite.StorageDriver.Delete(dirname)
err = suite.StorageDriver.Delete(suite.ctx, dirname)
c.Assert(err, check.IsNil)
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename1))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename2))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename3))
_, err = suite.StorageDriver.GetContent(suite.ctx, path.Join(dirname, filename3))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
@ -723,24 +726,24 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
fileName := randomFilename(32)
filePath := path.Join(dirPath, fileName)
defer suite.StorageDriver.Delete(firstPart(dirPath))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(dirPath))
// Call on non-existent file/dir, check error.
fi, err := suite.StorageDriver.Stat(dirPath)
fi, err := suite.StorageDriver.Stat(suite.ctx, dirPath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
c.Assert(fi, check.IsNil)
fi, err = suite.StorageDriver.Stat(filePath)
fi, err = suite.StorageDriver.Stat(suite.ctx, filePath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
c.Assert(fi, check.IsNil)
err = suite.StorageDriver.PutContent(filePath, content)
err = suite.StorageDriver.PutContent(suite.ctx, filePath, content)
c.Assert(err, check.IsNil)
// Call on regular file, check results
fi, err = suite.StorageDriver.Stat(filePath)
fi, err = suite.StorageDriver.Stat(suite.ctx, filePath)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Path(), check.Equals, filePath)
@ -751,9 +754,9 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
// Sleep and modify the file
time.Sleep(time.Second * 10)
content = randomContents(4096)
err = suite.StorageDriver.PutContent(filePath, content)
err = suite.StorageDriver.PutContent(suite.ctx, filePath, content)
c.Assert(err, check.IsNil)
fi, err = suite.StorageDriver.Stat(filePath)
fi, err = suite.StorageDriver.Stat(suite.ctx, filePath)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
time.Sleep(time.Second * 5) // allow changes to propagate (eventual consistency)
@ -768,7 +771,7 @@ func (suite *DriverSuite) TestStatCall(c *check.C) {
}
// Call on directory (do not check ModTime as dirs don't need to support it)
fi, err = suite.StorageDriver.Stat(dirPath)
fi, err = suite.StorageDriver.Stat(suite.ctx, dirPath)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Path(), check.Equals, dirPath)
@ -784,15 +787,15 @@ func (suite *DriverSuite) TestPutContentMultipleTimes(c *check.C) {
filename := randomPath(32)
contents := randomContents(4096)
defer suite.StorageDriver.Delete(firstPart(filename))
err := suite.StorageDriver.PutContent(filename, contents)
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
contents = randomContents(2048) // upload a different, smaller file
err = suite.StorageDriver.PutContent(filename, contents)
err = suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
readContents, err := suite.StorageDriver.GetContent(filename)
readContents, err := suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(readContents, check.DeepEquals, contents)
}
@ -810,9 +813,9 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
filename := randomPath(32)
contents := randomContents(filesize)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
err := suite.StorageDriver.PutContent(filename, contents)
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
var wg sync.WaitGroup
@ -820,7 +823,7 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
readContents := func() {
defer wg.Done()
offset := rand.Int63n(int64(len(contents)))
reader, err := suite.StorageDriver.ReadStream(filename, offset)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset)
c.Assert(err, check.IsNil)
readContents, err := ioutil.ReadAll(reader)
@ -872,7 +875,7 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) {
}
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
var offset int64
var misswrites int
@ -880,17 +883,17 @@ func (suite *DriverSuite) TestEventualConsistency(c *check.C) {
for i := 0; i < 1024; i++ {
contents := randomContents(chunkSize)
read, err := suite.StorageDriver.WriteStream(filename, offset, bytes.NewReader(contents))
read, err := suite.StorageDriver.WriteStream(suite.ctx, filename, offset, bytes.NewReader(contents))
c.Assert(err, check.IsNil)
fi, err := suite.StorageDriver.Stat(filename)
fi, err := suite.StorageDriver.Stat(suite.ctx, filename)
c.Assert(err, check.IsNil)
// We are most concerned with being able to read data as soon as Stat declares
// it is uploaded. This is the strongest guarantee that some drivers (that guarantee
// at best eventual consistency) absolutely need to provide.
if fi.Size() == offset+chunkSize {
reader, err := suite.StorageDriver.ReadStream(filename, offset)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset)
c.Assert(err, check.IsNil)
readContents, err := ioutil.ReadAll(reader)
@ -937,15 +940,15 @@ func (suite *DriverSuite) benchmarkPutGetFiles(c *check.C, size int64) {
parentDir := randomPath(8)
defer func() {
c.StopTimer()
suite.StorageDriver.Delete(firstPart(parentDir))
suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir))
}()
for i := 0; i < c.N; i++ {
filename := path.Join(parentDir, randomPath(32))
err := suite.StorageDriver.PutContent(filename, randomContents(size))
err := suite.StorageDriver.PutContent(suite.ctx, filename, randomContents(size))
c.Assert(err, check.IsNil)
_, err = suite.StorageDriver.GetContent(filename)
_, err = suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
}
}
@ -975,16 +978,16 @@ func (suite *DriverSuite) benchmarkStreamFiles(c *check.C, size int64) {
parentDir := randomPath(8)
defer func() {
c.StopTimer()
suite.StorageDriver.Delete(firstPart(parentDir))
suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir))
}()
for i := 0; i < c.N; i++ {
filename := path.Join(parentDir, randomPath(32))
written, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(randomContents(size)))
written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(randomContents(size)))
c.Assert(err, check.IsNil)
c.Assert(written, check.Equals, size)
rc, err := suite.StorageDriver.ReadStream(filename, 0)
rc, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.IsNil)
rc.Close()
}
@ -1004,17 +1007,17 @@ func (suite *DriverSuite) benchmarkListFiles(c *check.C, numFiles int64) {
parentDir := randomPath(8)
defer func() {
c.StopTimer()
suite.StorageDriver.Delete(firstPart(parentDir))
suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir))
}()
for i := int64(0); i < numFiles; i++ {
err := suite.StorageDriver.PutContent(path.Join(parentDir, randomPath(32)), nil)
err := suite.StorageDriver.PutContent(suite.ctx, path.Join(parentDir, randomPath(32)), nil)
c.Assert(err, check.IsNil)
}
c.ResetTimer()
for i := 0; i < c.N; i++ {
files, err := suite.StorageDriver.List(parentDir)
files, err := suite.StorageDriver.List(suite.ctx, parentDir)
c.Assert(err, check.IsNil)
c.Assert(int64(len(files)), check.Equals, numFiles)
}
@ -1033,17 +1036,17 @@ func (suite *DriverSuite) BenchmarkDelete50Files(c *check.C) {
func (suite *DriverSuite) benchmarkDeleteFiles(c *check.C, numFiles int64) {
for i := 0; i < c.N; i++ {
parentDir := randomPath(8)
defer suite.StorageDriver.Delete(firstPart(parentDir))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir))
c.StopTimer()
for j := int64(0); j < numFiles; j++ {
err := suite.StorageDriver.PutContent(path.Join(parentDir, randomPath(32)), nil)
err := suite.StorageDriver.PutContent(suite.ctx, path.Join(parentDir, randomPath(32)), nil)
c.Assert(err, check.IsNil)
}
c.StartTimer()
// This is the operation we're benchmarking
err := suite.StorageDriver.Delete(firstPart(parentDir))
err := suite.StorageDriver.Delete(suite.ctx, firstPart(parentDir))
c.Assert(err, check.IsNil)
}
}
@ -1055,7 +1058,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
defer tf.Close()
filename := randomPath(32)
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
contents := randomContents(size)
@ -1065,11 +1068,11 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
tf.Sync()
tf.Seek(0, os.SEEK_SET)
nn, err := suite.StorageDriver.WriteStream(filename, 0, tf)
nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, tf)
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, size)
reader, err := suite.StorageDriver.ReadStream(filename, 0)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.IsNil)
defer reader.Close()
@ -1080,25 +1083,25 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
}
func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents []byte) {
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
err := suite.StorageDriver.PutContent(filename, contents)
err := suite.StorageDriver.PutContent(suite.ctx, filename, contents)
c.Assert(err, check.IsNil)
readContents, err := suite.StorageDriver.GetContent(filename)
readContents, err := suite.StorageDriver.GetContent(suite.ctx, filename)
c.Assert(err, check.IsNil)
c.Assert(readContents, check.DeepEquals, contents)
}
func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) {
defer suite.StorageDriver.Delete(firstPart(filename))
defer suite.StorageDriver.Delete(suite.ctx, firstPart(filename))
nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contents))
nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contents))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contents)))
reader, err := suite.StorageDriver.ReadStream(filename, 0)
reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0)
c.Assert(err, check.IsNil)
defer reader.Close()

View File

@ -9,6 +9,7 @@ import (
"os"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
@ -25,6 +26,8 @@ const fileReaderBufferSize = 4 << 20
type fileReader struct {
driver storagedriver.StorageDriver
ctx context.Context
// identifying fields
path string
size int64 // size is the total size, must be set.
@ -40,14 +43,15 @@ type fileReader struct {
// newFileReader initializes a file reader for the remote file. The read takes
// on the offset and size at the time the reader is created. If the underlying
// file changes, one must create a new fileReader.
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string) (*fileReader, error) {
rd := &fileReader{
driver: driver,
path: path,
ctx: ctx,
}
// Grab the size of the layer file, ensuring existence.
if fi, err := driver.Stat(path); err != nil {
if fi, err := driver.Stat(ctx, path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): We really don't care if the file is not
@ -141,7 +145,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
}
// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
rc, err := fr.driver.ReadStream(fr.ctx, fr.path, fr.offset)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:

View File

@ -8,12 +8,13 @@ import (
"os"
"testing"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func TestSimpleRead(t *testing.T) {
ctx := context.Background()
content := make([]byte, 1<<20)
n, err := rand.Read(content)
if err != nil {
@ -21,7 +22,7 @@ func TestSimpleRead(t *testing.T) {
}
if n != len(content) {
t.Fatalf("random read did't fill buffer")
t.Fatalf("random read didn't fill buffer")
}
dgst, err := digest.FromReader(bytes.NewReader(content))
@ -32,11 +33,11 @@ func TestSimpleRead(t *testing.T) {
driver := inmemory.New()
path := "/random"
if err := driver.PutContent(path, content); err != nil {
if err := driver.PutContent(ctx, path, content); err != nil {
t.Fatalf("error putting patterned content: %v", err)
}
fr, err := newFileReader(driver, path)
fr, err := newFileReader(ctx, driver, path)
if err != nil {
t.Fatalf("error allocating file reader: %v", err)
}
@ -59,12 +60,13 @@ func TestFileReaderSeek(t *testing.T) {
repititions := 1024
path := "/patterned"
content := bytes.Repeat([]byte(pattern), repititions)
ctx := context.Background()
if err := driver.PutContent(path, content); err != nil {
if err := driver.PutContent(ctx, path, content); err != nil {
t.Fatalf("error putting patterned content: %v", err)
}
fr, err := newFileReader(driver, path)
fr, err := newFileReader(ctx, driver, path)
if err != nil {
t.Fatalf("unexpected error creating file reader: %v", err)
@ -160,7 +162,7 @@ func TestFileReaderSeek(t *testing.T) {
// read method, with an io.EOF error.
func TestFileReaderNonExistentFile(t *testing.T) {
driver := inmemory.New()
fr, err := newFileReader(driver, "/doesnotexist")
fr, err := newFileReader(context.Background(), driver, "/doesnotexist")
if err != nil {
t.Fatalf("unexpected error initializing reader: %v", err)
}

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
@ -18,6 +19,8 @@ const (
type fileWriter struct {
driver storagedriver.StorageDriver
ctx context.Context
// identifying fields
path string
@ -45,13 +48,14 @@ var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
fw := fileWriter{
driver: driver,
path: path,
ctx: ctx,
}
if fi, err := driver.Stat(path); err != nil {
if fi, err := driver.Stat(ctx, path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
@ -179,7 +183,7 @@ func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error)
updateOffset = true
}
nn, err := fw.driver.WriteStream(fw.path, offset, r)
nn, err := fw.driver.WriteStream(fw.ctx, fw.path, offset, r)
if updateOffset {
// We should forward the offset, whether or not there was an error.

View File

@ -7,6 +7,7 @@ import (
"os"
"testing"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
@ -32,8 +33,9 @@ func TestSimpleWrite(t *testing.T) {
driver := inmemory.New()
path := "/random"
ctx := context.Background()
fw, err := newFileWriter(driver, path)
fw, err := newFileWriter(ctx, driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
@ -49,7 +51,7 @@ func TestSimpleWrite(t *testing.T) {
t.Fatalf("unexpected write length: %d != %d", n, len(content))
}
fr, err := newFileReader(driver, path)
fr, err := newFileReader(ctx, driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
@ -92,7 +94,7 @@ func TestSimpleWrite(t *testing.T) {
t.Fatalf("writeat was short: %d != %d", n, len(content))
}
fr, err = newFileReader(driver, path)
fr, err = newFileReader(ctx, driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
@ -122,13 +124,13 @@ func TestSimpleWrite(t *testing.T) {
// Now, we copy from one path to another, running the data through the
// fileReader to fileWriter, rather than the driver.Move command to ensure
// everything is working correctly.
fr, err = newFileReader(driver, path)
fr, err = newFileReader(ctx, driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
fw, err = newFileWriter(driver, "/copied")
fw, err = newFileWriter(ctx, driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
@ -143,7 +145,7 @@ func TestSimpleWrite(t *testing.T) {
t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled))
}
fr, err = newFileReader(driver, "/copied")
fr, err = newFileReader(ctx, driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
@ -162,7 +164,8 @@ func TestSimpleWrite(t *testing.T) {
}
func TestBufferedFileWriter(t *testing.T) {
writer, err := newFileWriter(inmemory.New(), "/random")
ctx := context.Background()
writer, err := newFileWriter(ctx, inmemory.New(), "/random")
if err != nil {
t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
@ -203,8 +206,8 @@ func BenchmarkFileWriter(b *testing.B) {
driver: inmemory.New(),
path: "/random",
}
if fi, err := fw.driver.Stat(fw.path); err != nil {
ctx := context.Background()
if fi, err := fw.driver.Stat(ctx, fw.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
@ -236,8 +239,9 @@ func BenchmarkFileWriter(b *testing.B) {
func BenchmarkBufferedFileWriter(b *testing.B) {
b.StopTimer() // not sure how long setup above will take
ctx := context.Background()
for i := 0; i < b.N; i++ {
bfw, err := newFileWriter(inmemory.New(), "/random")
bfw, err := newFileWriter(ctx, inmemory.New(), "/random")
if err != nil {
b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())

View File

@ -10,12 +10,12 @@ import (
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage/cache"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"golang.org/x/net/context"
)
// TestSimpleLayerUpload covers the layer upload process, exercising common
@ -36,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -144,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -253,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache())
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
@ -353,7 +353,8 @@ func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper,
digest: dgst,
})
if err := driver.PutContent(blobPath, p); err != nil {
ctx := context.Background()
if err := driver.PutContent(ctx, blobPath, p); err != nil {
return "", err
}
@ -370,7 +371,7 @@ func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper,
return "", err
}
if err := driver.PutContent(layerLinkPath, []byte(dgst)); err != nil {
if err := driver.PutContent(ctx, layerLinkPath, []byte(dgst)); err != nil {
return "", nil
}

View File

@ -54,7 +54,7 @@ func (lr *layerReader) Close() error {
func (lr *layerReader) Handler(r *http.Request) (h http.Handler, err error) {
var handlerFunc http.HandlerFunc
redirectURL, err := lr.fileReader.driver.URLFor(lr.path, map[string]interface{}{"method": r.Method})
redirectURL, err := lr.fileReader.driver.URLFor(lr.ctx, lr.path, map[string]interface{}{"method": r.Method})
switch err {
case nil:

View File

@ -5,7 +5,7 @@ import (
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
@ -16,7 +16,7 @@ type layerStore struct {
}
func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists")
context.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists")
// Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch.
@ -35,13 +35,14 @@ func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
}
func (ls *layerStore) Fetch(dgst digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch")
ctx := ls.repository.ctx
context.GetLogger(ctx).Debug("(*layerStore).Fetch")
bp, err := ls.path(dgst)
if err != nil {
return nil, err
}
fr, err := newFileReader(ls.repository.driver, bp)
fr, err := newFileReader(ctx, ls.repository.driver, bp)
if err != nil {
return nil, err
}
@ -56,7 +57,8 @@ func (ls *layerStore) Fetch(dgst digest.Digest) (distribution.Layer, error) {
// is already in progress or the layer has already been uploaded, this
// will return an error.
func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload")
ctx := ls.repository.ctx
context.GetLogger(ctx).Debug("(*layerStore).Upload")
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both
@ -84,7 +86,7 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
}
// Write a startedat file for this upload
if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
if err := ls.repository.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
@ -94,7 +96,9 @@ func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume")
ctx := ls.repository.ctx
context.GetLogger(ctx).Debug("(*layerStore).Resume")
startedAtPath, err := ls.repository.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
@ -104,7 +108,7 @@ func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) {
return nil, err
}
startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath)
startedAtBytes, err := ls.repository.driver.GetContent(ctx, startedAtPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
@ -133,7 +137,7 @@ func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) {
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (distribution.LayerUpload, error) {
fw, err := newFileWriter(ls.repository.driver, path)
fw, err := newFileWriter(ls.repository.ctx, ls.repository.driver, path)
if err != nil {
return nil, err
}

View File

@ -10,7 +10,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
@ -47,7 +47,7 @@ func (lw *layerWriter) StartedAt() time.Time {
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish")
context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Finish")
if err := lw.bufferedFileWriter.Close(); err != nil {
return nil, err
@ -67,7 +67,7 @@ func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) {
break
}
ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries).
context.GetLoggerWithField(lw.layerStore.repository.ctx, "retries", retries).
Errorf("error validating layer: %v", err)
if retries < 3 {
@ -98,7 +98,7 @@ func (lw *layerWriter) Finish(dgst digest.Digest) (distribution.Layer, error) {
// Cancel the layer upload process.
func (lw *layerWriter) Cancel() error {
ctxu.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel")
context.GetLogger(lw.layerStore.repository.ctx).Debug("(*layerWriter).Cancel")
if err := lw.removeResources(); err != nil {
return err
}
@ -168,7 +168,7 @@ func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) {
return nil, err
}
paths, err := lw.driver.List(uploadHashStatePathPrefix)
paths, err := lw.driver.List(lw.layerStore.repository.ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
@ -214,6 +214,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
ctx := lw.layerStore.repository.ctx
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
@ -229,7 +230,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := lw.driver.Delete(hashState.path); err != nil {
if err := lw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
@ -239,7 +240,7 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
// No need to load any state, just reset the hasher.
lw.resumableDigester.Reset()
} else {
storedState, err := lw.driver.GetContent(hashStateMatch.path)
storedState, err := lw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
@ -251,9 +252,8 @@ func (lw *layerWriter) resumeHashAt(offset int64) error {
// Mind the gap.
if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired
// offset.
fr, err := newFileReader(lw.driver, lw.path)
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, lw.driver, lw.path)
if err != nil {
return err
}
@ -286,7 +286,7 @@ func (lw *layerWriter) storeHashState() error {
return err
}
return lw.driver.PutContent(uploadHashStatePath, hashState)
return lw.driver.PutContent(lw.layerStore.repository.ctx, uploadHashStatePath, hashState)
}
// validateLayer checks the layer data against the digest, returning an error
@ -329,7 +329,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error)
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
fr, err := newFileReader(lw.layerStore.repository.ctx, lw.bufferedFileWriter.driver, lw.path)
if err != nil {
return "", err
}
@ -345,7 +345,7 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error)
}
if !verified {
ctxu.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst).
context.GetLoggerWithField(lw.layerStore.repository.ctx, "canonical", dgst).
Errorf("canonical digest does match provided digest")
return "", distribution.ErrLayerInvalidDigest{
Digest: dgst,
@ -368,8 +368,9 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
return err
}
ctx := lw.layerStore.repository.ctx
// Check for existence
if _, err := lw.driver.Stat(blobPath); err != nil {
if _, err := lw.driver.Stat(ctx, blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
@ -388,7 +389,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// tars.
if _, err := lw.driver.Stat(lw.path); err != nil {
if _, err := lw.driver.Stat(ctx, lw.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
@ -397,7 +398,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum.
if dgst == digest.DigestSha256EmptyTar {
return lw.driver.PutContent(blobPath, []byte{})
return lw.driver.PutContent(ctx, blobPath, []byte{})
}
// We let this fail during the move below.
@ -409,7 +410,7 @@ func (lw *layerWriter) moveLayer(dgst digest.Digest) error {
}
}
return lw.driver.Move(lw.path, blobPath)
return lw.driver.Move(ctx, lw.path, blobPath)
}
// linkLayer links a valid, written layer blob into the registry under the
@ -435,7 +436,8 @@ func (lw *layerWriter) linkLayer(canonical digest.Digest, aliases ...digest.Dige
return err
}
if err := lw.layerStore.repository.driver.PutContent(layerLinkPath, []byte(canonical)); err != nil {
ctx := lw.layerStore.repository.ctx
if err := lw.layerStore.repository.driver.PutContent(ctx, layerLinkPath, []byte(canonical)); err != nil {
return err
}
}
@ -459,8 +461,7 @@ func (lw *layerWriter) removeResources() error {
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := lw.driver.Delete(dirPath); err != nil {
if err := lw.driver.Delete(lw.layerStore.repository.ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!

View File

@ -30,7 +30,7 @@ type manifestStoreTestEnv struct {
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
ctx := context.Background()
driver := inmemory.New()
registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache())
registry := NewRegistryWithDriver(ctx, driver, cache.NewInMemoryLayerInfoCache())
repo, err := registry.Repository(ctx, name)
if err != nil {

View File

@ -7,6 +7,7 @@ import (
"code.google.com/p/go-uuid/uuid"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/context"
storageDriver "github.com/docker/distribution/registry/storage/driver"
)
@ -28,9 +29,9 @@ func newUploadData() uploadData {
// PurgeUploads deletes files from the upload directory
// created before olderThan. The list of files deleted and errors
// encountered are returned
func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) {
func PurgeUploads(ctx context.Context, driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) {
log.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete)
uploadData, errors := getOutstandingUploads(driver)
uploadData, errors := getOutstandingUploads(ctx, driver)
var deleted []string
for _, uploadData := range uploadData {
if uploadData.startedAt.Before(olderThan) {
@ -38,7 +39,7 @@ func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actua
log.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.",
uploadData.containingDir, uploadData.startedAt, olderThan)
if actuallyDelete {
err = driver.Delete(uploadData.containingDir)
err = driver.Delete(ctx, uploadData.containingDir)
}
if err == nil {
deleted = append(deleted, uploadData.containingDir)
@ -56,7 +57,7 @@ func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actua
// which could be eligible for deletion. The only reliable way to
// classify the age of a file is with the date stored in the startedAt
// file, so gather files by UUID with a date from startedAt.
func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploadData, []error) {
func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriver) (map[string]uploadData, []error) {
var errors []error
uploads := make(map[string]uploadData, 0)
@ -65,7 +66,7 @@ func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploa
if err != nil {
return uploads, append(errors, err)
}
err = Walk(driver, root, func(fileInfo storageDriver.FileInfo) error {
err = Walk(ctx, driver, root, func(fileInfo storageDriver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if file[0] == '_' {
@ -124,7 +125,8 @@ func uUIDFromPath(path string) (string, bool) {
// readStartedAtFile reads the date from an upload's startedAtFile
func readStartedAtFile(driver storageDriver.StorageDriver, path string) (time.Time, error) {
startedAtBytes, err := driver.GetContent(path)
// todo:(richardscothern) - pass in a context
startedAtBytes, err := driver.GetContent(context.Background(), path)
if err != nil {
return time.Now(), err
}

View File

@ -7,26 +7,28 @@ import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
var pm = defaultPathMapper
func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) driver.StorageDriver {
func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) (driver.StorageDriver, context.Context) {
d := inmemory.New()
ctx := context.Background()
for i := 0; i < numUploads; i++ {
addUploads(t, d, uuid.New(), repoName, startedAt)
addUploads(ctx, t, d, uuid.New(), repoName, startedAt)
}
return d
return d, ctx
}
func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) {
func addUploads(ctx context.Context, t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) {
dataPath, err := pm.path(uploadDataPathSpec{name: repo, uuid: uploadID})
if err != nil {
t.Fatalf("Unable to resolve path")
}
if err := d.PutContent(dataPath, []byte("")); err != nil {
if err := d.PutContent(ctx, dataPath, []byte("")); err != nil {
t.Fatalf("Unable to write data file")
}
@ -35,7 +37,7 @@ func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, sta
t.Fatalf("Unable to resolve path")
}
if d.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
if d.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
t.Fatalf("Unable to write startedAt file")
}
@ -43,8 +45,8 @@ func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, sta
func TestPurgeGather(t *testing.T) {
uploadCount := 5
fs := testUploadFS(t, uploadCount, "test-repo", time.Now())
uploadData, errs := getOutstandingUploads(fs)
fs, ctx := testUploadFS(t, uploadCount, "test-repo", time.Now())
uploadData, errs := getOutstandingUploads(ctx, fs)
if len(errs) != 0 {
t.Errorf("Unexepected errors: %q", errs)
}
@ -54,9 +56,9 @@ func TestPurgeGather(t *testing.T) {
}
func TestPurgeNone(t *testing.T) {
fs := testUploadFS(t, 10, "test-repo", time.Now())
fs, ctx := testUploadFS(t, 10, "test-repo", time.Now())
oneHourAgo := time.Now().Add(-1 * time.Hour)
deleted, errs := PurgeUploads(fs, oneHourAgo, true)
deleted, errs := PurgeUploads(ctx, fs, oneHourAgo, true)
if len(errs) != 0 {
t.Error("Unexpected errors", errs)
}
@ -68,13 +70,13 @@ func TestPurgeNone(t *testing.T) {
func TestPurgeAll(t *testing.T) {
uploadCount := 10
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs := testUploadFS(t, uploadCount, "test-repo", oneHourAgo)
fs, ctx := testUploadFS(t, uploadCount, "test-repo", oneHourAgo)
// Ensure > 1 repos are purged
addUploads(t, fs, uuid.New(), "test-repo2", oneHourAgo)
addUploads(ctx, t, fs, uuid.New(), "test-repo2", oneHourAgo)
uploadCount++
deleted, errs := PurgeUploads(fs, time.Now(), true)
deleted, errs := PurgeUploads(ctx, fs, time.Now(), true)
if len(errs) != 0 {
t.Error("Unexpected errors:", errs)
}
@ -88,15 +90,15 @@ func TestPurgeAll(t *testing.T) {
func TestPurgeSome(t *testing.T) {
oldUploadCount := 5
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo)
fs, ctx := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo)
newUploadCount := 4
for i := 0; i < newUploadCount; i++ {
addUploads(t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour))
addUploads(ctx, t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour))
}
deleted, errs := PurgeUploads(fs, time.Now(), true)
deleted, errs := PurgeUploads(ctx, fs, time.Now(), true)
if len(errs) != 0 {
t.Error("Unexpected errors:", errs)
}
@ -109,7 +111,7 @@ func TestPurgeSome(t *testing.T) {
func TestPurgeOnlyUploads(t *testing.T) {
oldUploadCount := 5
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo)
fs, ctx := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo)
// Create a directory tree outside _uploads and ensure
// these files aren't deleted.
@ -123,11 +125,11 @@ func TestPurgeOnlyUploads(t *testing.T) {
}
nonUploadFile := path.Join(nonUploadPath, "file")
if err = fs.PutContent(nonUploadFile, []byte("")); err != nil {
if err = fs.PutContent(ctx, nonUploadFile, []byte("")); err != nil {
t.Fatalf("Unable to write data file")
}
deleted, errs := PurgeUploads(fs, time.Now(), true)
deleted, errs := PurgeUploads(ctx, fs, time.Now(), true)
if len(errs) != 0 {
t.Error("Unexpected errors", errs)
}
@ -140,13 +142,14 @@ func TestPurgeOnlyUploads(t *testing.T) {
func TestPurgeMissingStartedAt(t *testing.T) {
oneHourAgo := time.Now().Add(-1 * time.Hour)
fs := testUploadFS(t, 1, "test-repo", oneHourAgo)
err := Walk(fs, "/", func(fileInfo driver.FileInfo) error {
fs, ctx := testUploadFS(t, 1, "test-repo", oneHourAgo)
err := Walk(ctx, fs, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if file == "startedat" {
if err := fs.Delete(filePath); err != nil {
if err := fs.Delete(ctx, filePath); err != nil {
t.Fatalf("Unable to delete startedat file: %s", filePath)
}
}
@ -155,7 +158,7 @@ func TestPurgeMissingStartedAt(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error during Walk: %s ", err.Error())
}
deleted, errs := PurgeUploads(fs, time.Now(), true)
deleted, errs := PurgeUploads(ctx, fs, time.Now(), true)
if len(errs) > 0 {
t.Errorf("Unexpected errors")
}

View File

@ -20,10 +20,11 @@ type registry struct {
// NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Namespace {
func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Namespace {
bs := &blobStore{
driver: driver,
pm: defaultPathMapper,
ctx: ctx,
}
return &registry{

View File

@ -26,7 +26,7 @@ func (rs *revisionStore) exists(revision digest.Digest) (bool, error) {
return false, err
}
exists, err := exists(rs.driver, revpath)
exists, err := exists(rs.repository.ctx, rs.driver, revpath)
if err != nil {
return false, err
}
@ -121,7 +121,7 @@ func (rs *revisionStore) link(revision digest.Digest) error {
return err
}
if exists, err := exists(rs.driver, revisionPath); err != nil {
if exists, err := exists(rs.repository.ctx, rs.driver, revisionPath); err != nil {
return err
} else if exists {
// Revision has already been linked!
@ -142,5 +142,5 @@ func (rs *revisionStore) delete(revision digest.Digest) error {
return err
}
return rs.driver.Delete(revisionPath)
return rs.driver.Delete(rs.repository.ctx, revisionPath)
}

View File

@ -30,7 +30,7 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
// can be eliminated by implementing listAll on drivers.
signaturesPath = path.Join(signaturesPath, "sha256")
signaturePaths, err := s.driver.List(signaturesPath)
signaturePaths, err := s.driver.List(s.repository.ctx, signaturesPath)
if err != nil {
return nil, err
}

View File

@ -4,6 +4,7 @@ import (
"path"
"github.com/docker/distribution"
// "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
@ -23,7 +24,7 @@ func (ts *tagStore) tags() ([]string, error) {
}
var tags []string
entries, err := ts.driver.List(p)
entries, err := ts.driver.List(ts.repository.ctx, p)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
@ -52,7 +53,7 @@ func (ts *tagStore) exists(tag string) (bool, error) {
return false, err
}
exists, err := exists(ts.driver, tagPath)
exists, err := exists(ts.repository.ctx, ts.driver, tagPath)
if err != nil {
return false, err
}
@ -102,7 +103,7 @@ func (ts *tagStore) resolve(tag string) (digest.Digest, error) {
return "", err
}
if exists, err := exists(ts.driver, currentPath); err != nil {
if exists, err := exists(ts.repository.ctx, ts.driver, currentPath); err != nil {
return "", err
} else if !exists {
return "", distribution.ErrManifestUnknown{Name: ts.Name(), Tag: tag}
@ -130,7 +131,7 @@ func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) {
// TODO(stevvooe): Need to append digest alg to get listing of revisions.
manifestTagIndexPath = path.Join(manifestTagIndexPath, "sha256")
entries, err := ts.driver.List(manifestTagIndexPath)
entries, err := ts.driver.List(ts.repository.ctx, manifestTagIndexPath)
if err != nil {
return nil, err
}
@ -154,5 +155,5 @@ func (ts *tagStore) delete(tag string) error {
return err
}
return ts.driver.Delete(tagPath)
return ts.driver.Delete(ts.repository.ctx, tagPath)
}

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/docker/distribution/context"
storageDriver "github.com/docker/distribution/registry/storage/driver"
)
@ -20,13 +21,13 @@ type WalkFn func(fileInfo storageDriver.FileInfo) error
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error {
children, err := driver.List(from)
func Walk(ctx context.Context, driver storageDriver.StorageDriver, from string, f WalkFn) error {
children, err := driver.List(ctx, from)
if err != nil {
return err
}
for _, child := range children {
fileInfo, err := driver.Stat(child)
fileInfo, err := driver.Stat(ctx, child)
if err != nil {
return err
}
@ -37,7 +38,7 @@ func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error {
}
if fileInfo.IsDir() && !skipDir {
Walk(driver, child, f)
Walk(ctx, driver, child, f)
}
}
return nil

View File

@ -4,17 +4,19 @@ import (
"fmt"
"testing"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
)
func testFS(t *testing.T) (driver.StorageDriver, map[string]string) {
func testFS(t *testing.T) (driver.StorageDriver, map[string]string, context.Context) {
d := inmemory.New()
c := []byte("")
if err := d.PutContent("/a/b/c/d", c); err != nil {
ctx := context.Background()
if err := d.PutContent(ctx, "/a/b/c/d", c); err != nil {
t.Fatalf("Unable to put to inmemory fs")
}
if err := d.PutContent("/a/b/c/e", c); err != nil {
if err := d.PutContent(ctx, "/a/b/c/e", c); err != nil {
t.Fatalf("Unable to put to inmemory fs")
}
@ -26,20 +28,20 @@ func testFS(t *testing.T) (driver.StorageDriver, map[string]string) {
"/a/b/c/e": "file",
}
return d, expected
return d, expected, ctx
}
func TestWalkErrors(t *testing.T) {
d, expected := testFS(t)
d, expected, ctx := testFS(t)
fileCount := len(expected)
err := Walk(d, "", func(fileInfo driver.FileInfo) error {
err := Walk(ctx, d, "", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
t.Error("Expected invalid root err")
}
err = Walk(d, "/", func(fileInfo driver.FileInfo) error {
err = Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
// error on the 2nd file
if fileInfo.Path() == "/a/b" {
return fmt.Errorf("Early termination")
@ -54,7 +56,7 @@ func TestWalkErrors(t *testing.T) {
t.Error(err.Error())
}
err = Walk(d, "/nonexistant", func(fileInfo driver.FileInfo) error {
err = Walk(ctx, d, "/nonexistant", func(fileInfo driver.FileInfo) error {
return nil
})
if err == nil {
@ -64,8 +66,8 @@ func TestWalkErrors(t *testing.T) {
}
func TestWalk(t *testing.T) {
d, expected := testFS(t)
err := Walk(d, "/", func(fileInfo driver.FileInfo) error {
d, expected, ctx := testFS(t)
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
filetype, ok := expected[filePath]
if !ok {
@ -93,8 +95,8 @@ func TestWalk(t *testing.T) {
}
func TestWalkSkipDir(t *testing.T) {
d, expected := testFS(t)
err := Walk(d, "/", func(fileInfo driver.FileInfo) error {
d, expected, ctx := testFS(t)
err := Walk(ctx, d, "/", func(fileInfo driver.FileInfo) error {
filePath := fileInfo.Path()
if filePath == "/a/b" {
// skip processing /a/b/c and /a/b/c/d