Adds functional options arguments to the Blobs Create method

Removes the Mount operation and instead implements this behavior as part
of Create a From option is provided, which in turn returns a rich
ErrBlobMounted indicating that a blob upload session was not initiated,
but instead the blob was mounted from another repository

Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
Brian Bland 2016-01-13 11:44:42 -08:00
parent 4378e1aa85
commit ce88d8a6f4
13 changed files with 217 additions and 120 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
)
var (
@ -40,6 +41,18 @@ func (err ErrBlobInvalidDigest) Error() string {
err.Digest, err.Reason)
}
// ErrBlobMounted returned when a blob is mounted from another repository
// instead of initiating an upload session.
type ErrBlobMounted struct {
From reference.Canonical
Descriptor Descriptor
}
func (err ErrBlobMounted) Error() string {
return fmt.Sprintf("blob mounted from: %v to: %v",
err.From, err.Descriptor)
}
// Descriptor describes targeted content. Used in conjunction with a blob
// store, a descriptor can be used to fetch, store and target any kind of
// blob. The struct also describes the wire protocol format. Fields should
@ -151,14 +164,19 @@ type BlobIngester interface {
// returned handle can be written to and later resumed using an opaque
// identifier. With this approach, one can Close and Resume a BlobWriter
// multiple times until the BlobWriter is committed or cancelled.
Create(ctx context.Context) (BlobWriter, error)
Create(ctx context.Context, options ...BlobCreateOption) (BlobWriter, error)
// Resume attempts to resume a write to a blob, identified by an id.
Resume(ctx context.Context, id string) (BlobWriter, error)
}
// Mount adds a blob to this service from another source repository,
// identified by a digest.
Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error)
// BlobCreateOption is a general extensible function argument for blob creation
// methods. A BlobIngester may choose to honor any or none of the given
// BlobCreateOptions, which can be specific to the implementation of the
// BlobIngester receiving them.
// TODO (brianbland): unify this with ManifestServiceOption in the future
type BlobCreateOption interface {
Apply(interface{}) error
}
// BlobWriter provides a handle for inserting data into a blob store.

View File

@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may
ignore the value but if it is used, the client should verify the value against
the uploaded blob data.
If a mount fails due to invalid repository or digest arguments, the registry
will fall back to the standard upload behavior and return a `202 Accepted` with
the upload URL in the `Location` header:
```
202 Accepted
Location: /v2/<name>/blobs/uploads/<uuid>
Range: bytes=0-<offset>
Content-Length: 0
Docker-Upload-UUID: <uuid>
```
This behavior is consistent with older versions of the registry, which do not
recognize the repository mount query parameters.
Note: a client may issue a HEAD request to check existence of a blob in a source
repository to distinguish between the registry not supporting blob mounts and
the blob not existing in the expected repository.
##### Errors
If an 502, 503 or 504 error is received, the client should assume that the

View File

@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may
ignore the value but if it is used, the client should verify the value against
the uploaded blob data.
If a mount fails due to invalid repository or digest arguments, the registry
will fall back to the standard upload behavior and return a `202 Accepted` with
the upload URL in the `Location` header:
```
202 Accepted
Location: /v2/<name>/blobs/uploads/<uuid>
Range: bytes=0-<offset>
Content-Length: 0
Docker-Upload-UUID: <uuid>
```
This behavior is consistent with older versions of the registry, which do not
recognize the repository mount query parameters.
Note: a client may issue a HEAD request to check existence of a blob in a source
repository to distinguish between the registry not supporting blob mounts and
the blob not existing in the expected repository.
##### Errors
If an 502, 503 or 504 error is received, the client should assume that the

View File

@ -42,7 +42,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte)
return d, nil
}
func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) {
func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
panic("not implemented")
}
@ -50,10 +50,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
panic("not implemented")
}
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
panic("not implemented")
}
func TestEmptyTar(t *testing.T) {
// Confirm that gzippedEmptyTar expands to 1024 NULL bytes.
var decompressed [2048]byte

View File

@ -38,7 +38,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte)
return d, nil
}
func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) {
func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
panic("not implemented")
}
@ -46,10 +46,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
panic("not implemented")
}
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
panic("not implemented")
}
func TestBuilder(t *testing.T) {
imgJSON := []byte(`{
"architecture": "amd64",

View File

@ -160,8 +160,15 @@ func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []b
return desc, err
}
func (bsl *blobServiceListener) Create(ctx context.Context) (distribution.BlobWriter, error) {
wr, err := bsl.BlobStore.Create(ctx)
func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
wr, err := bsl.BlobStore.Create(ctx, options...)
switch err := err.(type) {
case distribution.ErrBlobMounted:
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), err.Descriptor, err.From.Name()); err != nil {
context.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
}
return nil, err
}
return bsl.decorateWriter(wr), err
}
@ -170,17 +177,6 @@ func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribu
return bsl.decorateWriter(wr), err
}
func (bsl *blobServiceListener) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := bsl.BlobStore.Mount(ctx, sourceRepo, dgst)
if err == nil {
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), desc, sourceRepo); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer mount to listener: %v", err)
}
}
return desc, err
}
func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
return &blobWriterListener{
BlobWriter: wr,

View File

@ -10,7 +10,6 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
@ -19,6 +18,7 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache"
"github.com/docker/distribution/registry/storage/cache/memory"
)
@ -500,9 +500,6 @@ type blobs struct {
statter distribution.BlobDescriptorService
distribution.BlobDeleter
cacheLock sync.Mutex
cachedBlobUpload distribution.BlobWriter
}
func sanitizeLocation(location, base string) (string, error) {
@ -576,18 +573,23 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
return writer.Commit(ctx, desc)
}
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
bs.cacheLock.Lock()
if bs.cachedBlobUpload != nil {
upload := bs.cachedBlobUpload
bs.cachedBlobUpload = nil
bs.cacheLock.Unlock()
func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
var opts storage.CreateOptions
return upload, nil
for _, option := range options {
err := option.Apply(&opts)
if err != nil {
return nil, err
}
}
bs.cacheLock.Unlock()
u, err := bs.ub.BuildBlobUploadURL(bs.name)
var values []url.Values
if opts.Mount.ShouldMount {
values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}})
}
u, err := bs.ub.BuildBlobUploadURL(bs.name, values...)
if err != nil {
return nil, err
}
@ -598,7 +600,14 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
}
defer resp.Body.Close()
if SuccessStatus(resp.StatusCode) {
switch resp.StatusCode {
case http.StatusCreated:
desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest())
if err != nil {
return nil, err
}
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
case http.StatusAccepted:
// TODO(dmcgowan): Check for invalid UUID
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
@ -613,53 +622,15 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
startedAt: time.Now(),
location: location,
}, nil
default:
return nil, HandleErrorResponse(resp)
}
return nil, HandleErrorResponse(resp)
}
func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
panic("not implemented")
}
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
return bs.Stat(ctx, dgst)
case http.StatusAccepted:
// Triggered a blob upload (legacy behavior), so cache the creation info
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return distribution.Descriptor{}, err
}
bs.cacheLock.Lock()
bs.cachedBlobUpload = &httpBlobUpload{
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}
bs.cacheLock.Unlock()
return distribution.Descriptor{}, HandleErrorResponse(resp)
default:
return distribution.Descriptor{}, HandleErrorResponse(resp)
}
}
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}

View File

@ -18,7 +18,9 @@ import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/testutil"
"github.com/docker/distribution/uuid"
"github.com/docker/libtrust"
@ -471,6 +473,16 @@ func TestBlobMount(t *testing.T) {
var m testutil.RequestResponseMap
repo := "test.example.com/uploadrepo"
sourceRepo := "test.example.com/sourcerepo"
namedRef, err := reference.ParseNamed(sourceRepo)
if err != nil {
t.Fatal(err)
}
canonicalRef, err := reference.WithDigest(namedRef, dgst)
if err != nil {
t.Fatal(err)
}
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
@ -511,13 +523,20 @@ func TestBlobMount(t *testing.T) {
l := r.Blobs(ctx)
stat, err := l.Mount(ctx, sourceRepo, dgst)
if err != nil {
t.Fatal(err)
bw, err := l.Create(ctx, storage.WithMountFrom(canonicalRef))
if bw != nil {
t.Fatalf("Expected blob writer to be nil, was %v", bw)
}
if stat.Digest != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst)
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
if ebm.From.Digest() != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", ebm.From.Digest(), dgst)
}
if ebm.From.Name() != sourceRepo {
t.Fatalf("Unexpected from: %s, expected %s", ebm.From.Name(), sourceRepo)
}
} else {
t.Fatalf("Unexpected error: %v, expected an ErrBlobMounted", err)
}
}

View File

@ -9,8 +9,10 @@ import (
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
)
@ -118,19 +120,27 @@ type blobUploadHandler struct {
// StartBlobUpload begins the blob upload process and allocates a server-side
// blob writer session, optionally mounting the blob from a separate repository.
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
var options []distribution.BlobCreateOption
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")
if mountDigest != "" && fromRepo != "" {
buh.mountBlob(w, fromRepo, mountDigest)
return
opt, err := buh.createBlobMountOption(fromRepo, mountDigest)
if err != nil {
options = append(options, opt)
}
}
blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh)
upload, err := blobs.Create(buh, options...)
if err != nil {
if err == distribution.ErrUnsupported {
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
} else if err == distribution.ErrUnsupported {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
@ -339,27 +349,23 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
// mountBlob attempts to mount a blob from another repository by its digest. If
// successful, the blob is linked into the blob store and 201 Created is
// returned with the canonical url of the blob.
func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) {
func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string) (distribution.BlobCreateOption, error) {
dgst, err := digest.ParseDigest(mountDigest)
if err != nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
return
return nil, err
}
blobs := buh.Repository.Blobs(buh)
desc, err := blobs.Mount(buh, fromRepo, dgst)
ref, err := reference.ParseNamed(fromRepo)
if err != nil {
if err == distribution.ErrBlobUnknown {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst))
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
return nil, err
}
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
canonical, err := reference.WithDigest(ref, dgst)
if err != nil {
return nil, err
}
return storage.WithMountFrom(canonical), nil
}
// writeBlobCreatedHeaders writes the standard headers describing a newly

View File

@ -161,7 +161,7 @@ func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte)
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
}

View File

@ -42,12 +42,12 @@ func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte,
return sbs.blobs.Get(ctx, dgst)
}
func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
sbsMu.Lock()
sbs.stats["create"]++
sbsMu.Unlock()
return sbs.blobs.Create(ctx)
return sbs.blobs.Create(ctx, options...)
}
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
@ -58,14 +58,6 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B
return sbs.blobs.Resume(ctx, id)
}
func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
sbsMu.Lock()
sbs.stats["mount"]++
sbsMu.Unlock()
return sbs.blobs.Mount(ctx, sourceRepo, dgst)
}
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
sbsMu.Lock()
sbs.stats["open"]++

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
@ -377,13 +378,27 @@ func TestBlobMount(t *testing.T) {
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
}
mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest)
namedRef, err := reference.ParseNamed(sourceRepository.Name())
if err != nil {
t.Fatal(err)
}
canonicalRef, err := reference.WithDigest(namedRef, desc.Digest)
if err != nil {
t.Fatal(err)
}
bw, err := bs.Create(ctx, WithMountFrom(canonicalRef))
if bw != nil {
t.Fatal("unexpected blobwriter returned from Create call, should mount instead")
}
ebm, ok := err.(distribution.ErrBlobMounted)
if !ok {
t.Fatalf("unexpected error mounting layer: %v", err)
}
if mountDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc)
if ebm.Descriptor != desc {
t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc)
}
// Test for existence.

View File

@ -1,12 +1,14 @@
package storage
import (
"fmt"
"net/http"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/uuid"
)
@ -95,10 +97,58 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte)
return desc, lbs.linkBlob(ctx, desc)
}
// CreateOptions is a collection of blob creation modifiers relevant to general
// blob storage intended to be configured by the BlobCreateOption.Apply method.
type CreateOptions struct {
Mount struct {
ShouldMount bool
From reference.Canonical
}
}
type optionFunc func(interface{}) error
func (f optionFunc) Apply(v interface{}) error {
return f(v)
}
// WithMountFrom returns a BlobCreateOption which designates that the blob should be
// mounted from the given canonical reference.
func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
return optionFunc(func(v interface{}) error {
opts, ok := v.(*CreateOptions)
if !ok {
return fmt.Errorf("unexpected options type: %T", v)
}
opts.Mount.ShouldMount = true
opts.Mount.From = ref
return nil
})
}
// Writer begins a blob write session, returning a handle.
func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
var opts CreateOptions
for _, option := range options {
err := option.Apply(&opts)
if err != nil {
return nil, err
}
}
if opts.Mount.ShouldMount {
desc, err := lbs.mount(ctx, opts.Mount.From.Name(), opts.Mount.From.Digest())
if err == nil {
// Mount successful, no need to initiate an upload session
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
}
}
uuid := uuid.Generate().String()
startedAt := time.Now().UTC()
@ -186,7 +236,7 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
return nil
}
func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {
return distribution.Descriptor{}, err