From 5df21570a76e08b7ca47d37f9e22b8e338dadb27 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 14 Dec 2015 18:34:18 -0800 Subject: [PATCH 1/3] Adds cross-repository blob mounting behavior Extends blob upload POST endpoint to support mount and from query parameters as described in #634 Signed-off-by: Brian Bland --- blobs.go | 4 + docs/spec/api.md | 234 +++++++++++++++++++++++- docs/spec/api.md.tmpl | 32 +++- manifest/schema1/config_builder_test.go | 4 + manifest/schema2/builder_test.go | 4 + registry/api/v2/descriptors.go | 64 +++++++ registry/client/repository.go | 56 ++++++ registry/client/repository_test.go | 55 ++++++ registry/handlers/app.go | 5 + registry/handlers/blobupload.go | 62 ++++++- registry/proxy/proxyblobstore.go | 4 + registry/proxy/proxyblobstore_test.go | 8 + registry/storage/blob_test.go | 148 +++++++++++++++ registry/storage/linkedblobstore.go | 23 +++ registry/storage/registry.go | 1 + 15 files changed, 688 insertions(+), 16 deletions(-) diff --git a/blobs.go b/blobs.go index 40cd8295..bd5f0bc9 100644 --- a/blobs.go +++ b/blobs.go @@ -155,6 +155,10 @@ type BlobIngester interface { // Resume attempts to resume a write to a blob, identified by an id. Resume(ctx context.Context, id string) (BlobWriter, error) + + // Mount adds a blob to this service from another source repository, + // identified by a digest. + Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error) } // BlobWriter provides a handle for inserting data into a blob store. diff --git a/docs/spec/api.md b/docs/spec/api.md index cdf225ae..042cbd8f 100644 --- a/docs/spec/api.md +++ b/docs/spec/api.md @@ -582,7 +582,7 @@ the uploads endpoint, including the "size" and "digest" parameters: POST /v2//blobs/uploads/?digest= Content-Length: Content-Type: application/octet-stream - + ``` @@ -594,7 +594,7 @@ a place to continue the download. The single `POST` method is provided for convenience and most clients should implement `POST` + `PUT` to support reliable resume of uploads. - + ##### Chunked Upload To carry out an upload of a chunk, the client can specify a range header and @@ -707,6 +707,34 @@ registry server will dump all intermediate data. While uploads will time out if not completed, clients should issue this request if they encounter a fatal error but still have the ability to issue an http request. +##### Cross Repository Blob Mount + +A blob may be mounted from another repository that the client has read access +to, removing the need to upload a blob already known to the registry. To issue +a blob mount instead of an upload, a POST request should be issued in the +following format: + +``` +POST /v2//blobs/uploads/?mount=&from= +Content-Length: 0 +``` + +If the blob is successfully mounted, the client will receive a `201 Created` +response: + +``` +201 Created +Location: /v2//blobs/ +Content-Length: 0 +Docker-Content-Digest: +``` + +The `Location` header will contain the registry URL to access the accepted +layer file. The `Docker-Content-Digest` header returns the canonical digest of +the uploaded blob which may differ from the provided digest. Most clients may +ignore the value but if it is used, the client should verify the value against +the uploaded blob data. + ##### Errors If an 502, 503 or 504 error is received, the client should assume that the @@ -1023,7 +1051,7 @@ A list of methods and URIs are covered in the table below: |------|----|------|-----------| | GET | `/v2/` | Base | Check that the endpoint implements Docker Registry API V2. | | GET | `/v2//tags/list` | Tags | Fetch the tags under the repository identified by `name`. | -| GET | `/v2//manifests/` | Manifest | Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. | +| GET | `/v2//manifests/` | Manifest | Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data. | | PUT | `/v2//manifests/` | Manifest | Put the manifest identified by `name` and `reference` where `reference` can be a tag or digest. | | DELETE | `/v2//manifests/` | Manifest | Delete the manifest identified by `name` and `reference`. Note that a manifest can _only_ be deleted by `digest`. | | GET | `/v2//blobs/` | Blob | Retrieve the blob from the registry identified by `digest`. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data. | @@ -1500,7 +1528,7 @@ Create, update, delete and retrieve manifests. #### GET Manifest -Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. +Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data. @@ -3313,6 +3341,204 @@ The error codes that may be included in the response body are enumerated below: +##### Mount Blob + +``` +POST /v2//blobs/uploads/?mount=&from= +Host: +Authorization: +Content-Length: 0 +``` + +Mount a blob identified by the `mount` parameter from another repository. + + +The following parameters should be specified on the request: + +|Name|Kind|Description| +|----|----|-----------| +|`Host`|header|Standard HTTP Host Header. Should be set to the registry host.| +|`Authorization`|header|An RFC7235 compliant authorization header.| +|`Content-Length`|header|The `Content-Length` header must be zero and the body must be empty.| +|`name`|path|Name of the target repository.| +|`mount`|query|Digest of blob to mount from the source repository.| +|`from`|query|Name of the source repository.| + + + + +###### On Success: Created + +``` +201 Created +Location: +Content-Length: 0 +Docker-Upload-UUID: +``` + +The blob has been mounted in the repository and is available at the provided location. + +The following headers will be returned with the response: + +|Name|Description| +|----|-----------| +|`Location`|| +|`Content-Length`|The `Content-Length` header must be zero and the body must be empty.| +|`Docker-Upload-UUID`|Identifies the docker upload uuid for the current request.| + + + + +###### On Failure: Invalid Name or Digest + +``` +400 Bad Request +``` + + + + + +The error codes that may be included in the response body are enumerated below: + +|Code|Message|Description| +|----|-------|-----------| +| `DIGEST_INVALID` | provided digest did not match uploaded content | When a blob is uploaded, the registry will check that the content matches the digest provided by the client. The error may include a detail structure with the key "digest", including the invalid digest string. This error may also be returned when a manifest includes an invalid layer digest. | +| `NAME_INVALID` | invalid repository name | Invalid repository name encountered either during manifest validation or any API operation. | + + + +###### On Failure: Not allowed + +``` +405 Method Not Allowed +``` + +Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason + + + +The error codes that may be included in the response body are enumerated below: + +|Code|Message|Description| +|----|-------|-----------| +| `UNSUPPORTED` | The operation is unsupported. | The operation was unsupported due to a missing implementation or invalid set of parameters. | + + + +###### On Failure: Authentication Required + +``` +401 Unauthorized +WWW-Authenticate: realm="", ..." +Content-Length: +Content-Type: application/json; charset=utf-8 + +{ + "errors:" [ + { + "code": , + "message": "", + "detail": ... + }, + ... + ] +} +``` + +The client is not authenticated. + +The following headers will be returned on the response: + +|Name|Description| +|----|-----------| +|`WWW-Authenticate`|An RFC7235 compliant authentication challenge header.| +|`Content-Length`|Length of the JSON response body.| + + + +The error codes that may be included in the response body are enumerated below: + +|Code|Message|Description| +|----|-------|-----------| +| `UNAUTHORIZED` | authentication required | The access controller was unable to authenticate the client. Often this will be accompanied by a Www-Authenticate HTTP response header indicating how to authenticate. | + + + +###### On Failure: No Such Repository Error + +``` +404 Not Found +Content-Length: +Content-Type: application/json; charset=utf-8 + +{ + "errors:" [ + { + "code": , + "message": "", + "detail": ... + }, + ... + ] +} +``` + +The repository is not known to the registry. + +The following headers will be returned on the response: + +|Name|Description| +|----|-----------| +|`Content-Length`|Length of the JSON response body.| + + + +The error codes that may be included in the response body are enumerated below: + +|Code|Message|Description| +|----|-------|-----------| +| `NAME_UNKNOWN` | repository name not known to registry | This is returned if the name used during an operation is unknown to the registry. | + + + +###### On Failure: Access Denied + +``` +403 Forbidden +Content-Length: +Content-Type: application/json; charset=utf-8 + +{ + "errors:" [ + { + "code": , + "message": "", + "detail": ... + }, + ... + ] +} +``` + +The client does not have required access to the repository. + +The following headers will be returned on the response: + +|Name|Description| +|----|-----------| +|`Content-Length`|Length of the JSON response body.| + + + +The error codes that may be included in the response body are enumerated below: + +|Code|Message|Description| +|----|-------|-----------| +| `DENIED` | requested access to the resource is denied | The access controller denied access for the operation on a resource. | + + + ### Blob Upload diff --git a/docs/spec/api.md.tmpl b/docs/spec/api.md.tmpl index da778acf..ab14d7a2 100644 --- a/docs/spec/api.md.tmpl +++ b/docs/spec/api.md.tmpl @@ -582,7 +582,7 @@ the uploads endpoint, including the "size" and "digest" parameters: POST /v2//blobs/uploads/?digest= Content-Length: Content-Type: application/octet-stream - + ``` @@ -594,7 +594,7 @@ a place to continue the download. The single `POST` method is provided for convenience and most clients should implement `POST` + `PUT` to support reliable resume of uploads. - + ##### Chunked Upload To carry out an upload of a chunk, the client can specify a range header and @@ -707,6 +707,34 @@ registry server will dump all intermediate data. While uploads will time out if not completed, clients should issue this request if they encounter a fatal error but still have the ability to issue an http request. +##### Cross Repository Blob Mount + +A blob may be mounted from another repository that the client has read access +to, removing the need to upload a blob already known to the registry. To issue +a blob mount instead of an upload, a POST request should be issued in the +following format: + +``` +POST /v2//blobs/uploads/?mount=&from= +Content-Length: 0 +``` + +If the blob is successfully mounted, the client will receive a `201 Created` +response: + +``` +201 Created +Location: /v2//blobs/ +Content-Length: 0 +Docker-Content-Digest: +``` + +The `Location` header will contain the registry URL to access the accepted +layer file. The `Docker-Content-Digest` header returns the canonical digest of +the uploaded blob which may differ from the provided digest. Most clients may +ignore the value but if it is used, the client should verify the value against +the uploaded blob data. + ##### Errors If an 502, 503 or 504 error is received, the client should assume that the diff --git a/manifest/schema1/config_builder_test.go b/manifest/schema1/config_builder_test.go index 18024d6c..3e219141 100644 --- a/manifest/schema1/config_builder_test.go +++ b/manifest/schema1/config_builder_test.go @@ -50,6 +50,10 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution. panic("not implemented") } +func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + panic("not implemented") +} + func TestEmptyTar(t *testing.T) { // Confirm that gzippedEmptyTar expands to 1024 NULL bytes. var decompressed [2048]byte diff --git a/manifest/schema2/builder_test.go b/manifest/schema2/builder_test.go index ffe51f42..53175e45 100644 --- a/manifest/schema2/builder_test.go +++ b/manifest/schema2/builder_test.go @@ -46,6 +46,10 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution. panic("not implemented") } +func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + panic("not implemented") +} + func TestBuilder(t *testing.T) { imgJSON := []byte(`{ "architecture": "amd64", diff --git a/registry/api/v2/descriptors.go b/registry/api/v2/descriptors.go index 52c725dc..ad3da3ef 100644 --- a/registry/api/v2/descriptors.go +++ b/registry/api/v2/descriptors.go @@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{ deniedResponseDescriptor, }, }, + { + Name: "Mount Blob", + Description: "Mount a blob identified by the `mount` parameter from another repository.", + Headers: []ParameterDescriptor{ + hostHeader, + authHeader, + contentLengthZeroHeader, + }, + PathParameters: []ParameterDescriptor{ + nameParameterDescriptor, + }, + QueryParameters: []ParameterDescriptor{ + { + Name: "mount", + Type: "query", + Format: "", + Regexp: digest.DigestRegexp, + Description: `Digest of blob to mount from the source repository.`, + }, + { + Name: "from", + Type: "query", + Format: "", + Regexp: reference.NameRegexp, + Description: `Name of the source repository.`, + }, + }, + Successes: []ResponseDescriptor{ + { + Description: "The blob has been mounted in the repository and is available at the provided location.", + StatusCode: http.StatusCreated, + Headers: []ParameterDescriptor{ + { + Name: "Location", + Type: "url", + Format: "", + }, + contentLengthZeroHeader, + dockerUploadUUIDHeader, + }, + }, + }, + Failures: []ResponseDescriptor{ + { + Name: "Invalid Name or Digest", + StatusCode: http.StatusBadRequest, + ErrorCodes: []errcode.ErrorCode{ + ErrorCodeDigestInvalid, + ErrorCodeNameInvalid, + }, + }, + { + Name: "Not allowed", + Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason", + StatusCode: http.StatusMethodNotAllowed, + ErrorCodes: []errcode.ErrorCode{ + errcode.ErrorCodeUnsupported, + }, + }, + unauthorizedResponseDescriptor, + repositoryNotFoundResponseDescriptor, + deniedResponseDescriptor, + }, + }, }, }, }, diff --git a/registry/client/repository.go b/registry/client/repository.go index 758c6e5e..8f30b4f1 100644 --- a/registry/client/repository.go +++ b/registry/client/repository.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" "github.com/docker/distribution" @@ -499,6 +500,9 @@ type blobs struct { statter distribution.BlobDescriptorService distribution.BlobDeleter + + cacheLock sync.Mutex + cachedBlobUpload distribution.BlobWriter } func sanitizeLocation(location, base string) (string, error) { @@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut } func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { + bs.cacheLock.Lock() + if bs.cachedBlobUpload != nil { + upload := bs.cachedBlobUpload + bs.cachedBlobUpload = nil + bs.cacheLock.Unlock() + + return upload, nil + } + bs.cacheLock.Unlock() + u, err := bs.ub.BuildBlobUploadURL(bs.name) + if err != nil { + return nil, err + } resp, err := bs.client.Post(u, "", nil) if err != nil { @@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter panic("not implemented") } +func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}}) + if err != nil { + return distribution.Descriptor{}, err + } + + resp, err := bs.client.Post(u, "", nil) + if err != nil { + return distribution.Descriptor{}, err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusCreated: + return bs.Stat(ctx, dgst) + case http.StatusAccepted: + // Triggered a blob upload (legacy behavior), so cache the creation info + uuid := resp.Header.Get("Docker-Upload-UUID") + location, err := sanitizeLocation(resp.Header.Get("Location"), u) + if err != nil { + return distribution.Descriptor{}, err + } + + bs.cacheLock.Lock() + bs.cachedBlobUpload = &httpBlobUpload{ + statter: bs.statter, + client: bs.client, + uuid: uuid, + startedAt: time.Now(), + location: location, + } + bs.cacheLock.Unlock() + + return distribution.Descriptor{}, HandleErrorResponse(resp) + default: + return distribution.Descriptor{}, HandleErrorResponse(resp) + } +} + func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error { return bs.statter.Clear(ctx, dgst) } diff --git a/registry/client/repository_test.go b/registry/client/repository_test.go index c1032ec1..8a7fc1c9 100644 --- a/registry/client/repository_test.go +++ b/registry/client/repository_test.go @@ -466,6 +466,61 @@ func TestBlobUploadMonolithic(t *testing.T) { } } +func TestBlobMount(t *testing.T) { + dgst, content := newRandomBlob(1024) + var m testutil.RequestResponseMap + repo := "test.example.com/uploadrepo" + sourceRepo := "test.example.com/sourcerepo" + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "POST", + Route: "/v2/" + repo + "/blobs/uploads/", + QueryParams: map[string][]string{"from": {sourceRepo}, "mount": {dgst.String()}}, + }, + Response: testutil.Response{ + StatusCode: http.StatusCreated, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo + "/blobs/" + dgst.String()}, + "Docker-Content-Digest": {dgst.String()}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "HEAD", + Route: "/v2/" + repo + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(content))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + ctx := context.Background() + r, err := NewRepository(ctx, repo, e, nil) + if err != nil { + t.Fatal(err) + } + + l := r.Blobs(ctx) + + stat, err := l.Mount(ctx, sourceRepo, dgst) + if err != nil { + t.Fatal(err) + } + + if stat.Digest != dgst { + t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst) + } +} + func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*schema1.SignedManifest, digest.Digest, []byte) { blobs := make([]schema1.FSLayer, blobCount) history := make([]schema1.History, blobCount) diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 8ea89da8..23225493 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -710,6 +710,11 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont if repo != "" { accessRecords = appendAccessRecords(accessRecords, r.Method, repo) + if fromRepo := r.FormValue("from"); fromRepo != "" { + // mounting a blob from one repository to another requires pull (GET) + // access to the source repository. + accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo) + } } else { // Only allow the name not to be set on the base route. if app.nameRequired(r) { diff --git a/registry/handlers/blobupload.go b/registry/handlers/blobupload.go index 1bd33d33..c5638c83 100644 --- a/registry/handlers/blobupload.go +++ b/registry/handlers/blobupload.go @@ -116,8 +116,16 @@ type blobUploadHandler struct { } // StartBlobUpload begins the blob upload process and allocates a server-side -// blob writer session. +// blob writer session, optionally mounting the blob from a separate repository. func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) { + fromRepo := r.FormValue("from") + mountDigest := r.FormValue("mount") + + if mountDigest != "" && fromRepo != "" { + buh.mountBlob(w, fromRepo, mountDigest) + return + } + blobs := buh.Repository.Blobs(buh) upload, err := blobs.Create(buh) @@ -254,18 +262,10 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht return } - - // Build our canonical blob url - blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest) - if err != nil { + if err := buh.writeBlobCreatedHeaders(w, desc); err != nil { buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) return } - - w.Header().Set("Location", blobURL) - w.Header().Set("Content-Length", "0") - w.Header().Set("Docker-Content-Digest", desc.Digest.String()) - w.WriteHeader(http.StatusCreated) } // CancelBlobUpload cancels an in-progress upload of a blob. @@ -335,3 +335,45 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http. return nil } + +// mountBlob attempts to mount a blob from another repository by its digest. If +// successful, the blob is linked into the blob store and 201 Created is +// returned with the canonical url of the blob. +func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) { + dgst, err := digest.ParseDigest(mountDigest) + if err != nil { + buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err)) + return + } + + blobs := buh.Repository.Blobs(buh) + desc, err := blobs.Mount(buh, fromRepo, dgst) + if err != nil { + if err == distribution.ErrBlobUnknown { + buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst)) + } else { + buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) + } + return + } + if err := buh.writeBlobCreatedHeaders(w, desc); err != nil { + buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) + return + } +} + +// writeBlobCreatedHeaders writes the standard headers describing a newly +// created blob. A 201 Created is written as well as the canonical URL and +// blob digest. +func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc distribution.Descriptor) error { + blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest) + if err != nil { + return err + } + + w.Header().Set("Location", blobURL) + w.Header().Set("Content-Length", "0") + w.Header().Set("Docker-Content-Digest", desc.Digest.String()) + w.WriteHeader(http.StatusCreated) + return nil +} diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index 976dc8d7..ca39f9f8 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -169,6 +169,10 @@ func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution. return nil, distribution.ErrUnsupported } +func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + return distribution.Descriptor{}, distribution.ErrUnsupported +} + func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { return nil, distribution.ErrUnsupported } diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index eb623197..5c5015a0 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -58,6 +58,14 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B return sbs.blobs.Resume(ctx, id) } +func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + sbsMu.Lock() + sbs.stats["mount"]++ + sbsMu.Unlock() + + return sbs.blobs.Mount(ctx, sourceRepo, dgst) +} + func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { sbsMu.Lock() sbs.stats["open"]++ diff --git a/registry/storage/blob_test.go b/registry/storage/blob_test.go index c6cfbcda..b89814c7 100644 --- a/registry/storage/blob_test.go +++ b/registry/storage/blob_test.go @@ -310,6 +310,154 @@ func TestSimpleBlobRead(t *testing.T) { } } +// TestBlobMount covers the blob mount process, exercising common +// error paths that might be seen during a mount. +func TestBlobMount(t *testing.T) { + randomDataReader, dgst, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating random reader: %v", err) + } + + ctx := context.Background() + imageName := "foo/bar" + sourceImageName := "foo/source" + driver := inmemory.New() + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect) + if err != nil { + t.Fatalf("error creating registry: %v", err) + } + + repository, err := registry.Repository(ctx, imageName) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + sourceRepository, err := registry.Repository(ctx, sourceImageName) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + + sbs := sourceRepository.Blobs(ctx) + + blobUpload, err := sbs.Create(ctx) + + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + + // Get the size of our random tarfile + randomDataSize, err := seekerSize(randomDataReader) + if err != nil { + t.Fatalf("error getting seeker size of random data: %v", err) + } + + nn, err := io.Copy(blobUpload, randomDataReader) + if err != nil { + t.Fatalf("unexpected error uploading layer data: %v", err) + } + + desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst}) + if err != nil { + t.Fatalf("unexpected error finishing layer upload: %v", err) + } + + // Test for existence. + statDesc, err := sbs.Stat(ctx, desc.Digest) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs) + } + + if statDesc != desc { + t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) + } + + bs := repository.Blobs(ctx) + // Test destination for existence. + statDesc, err = bs.Stat(ctx, desc.Digest) + if err == nil { + t.Fatalf("unexpected non-error stating unmounted blob: %v", desc) + } + + mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest) + if err != nil { + t.Fatalf("unexpected error mounting layer: %v", err) + } + + if mountDesc != desc { + t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc) + } + + // Test for existence. + statDesc, err = bs.Stat(ctx, desc.Digest) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs) + } + + if statDesc != desc { + t.Fatalf("descriptors not equal: %v != %v", statDesc, desc) + } + + rc, err := bs.Open(ctx, desc.Digest) + if err != nil { + t.Fatalf("unexpected error opening blob for read: %v", err) + } + defer rc.Close() + + h := sha256.New() + nn, err = io.Copy(h, rc) + if err != nil { + t.Fatalf("error reading layer: %v", err) + } + + if nn != randomDataSize { + t.Fatalf("incorrect read length") + } + + if digest.NewDigest("sha256", h) != dgst { + t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst) + } + + // Delete the blob from the source repo + err = sbs.Delete(ctx, desc.Digest) + if err != nil { + t.Fatalf("Unexpected error deleting blob") + } + + d, err := bs.Stat(ctx, desc.Digest) + if err != nil { + t.Fatalf("unexpected error stating blob deleted from source repository: %v", err) + } + + d, err = sbs.Stat(ctx, desc.Digest) + if err == nil { + t.Fatalf("unexpected non-error stating deleted blob: %v", d) + } + + switch err { + case distribution.ErrBlobUnknown: + break + default: + t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err) + } + + // Delete the blob from the dest repo + err = bs.Delete(ctx, desc.Digest) + if err != nil { + t.Fatalf("Unexpected error deleting blob") + } + + d, err = bs.Stat(ctx, desc.Digest) + if err == nil { + t.Fatalf("unexpected non-error stating deleted blob: %v", d) + } + + switch err { + case distribution.ErrBlobUnknown: + break + default: + t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err) + } +} + // TestLayerUploadZeroLength uploads zero-length func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index 430da1ca..8b7f9f51 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -20,6 +20,7 @@ type linkPathFunc func(name string, dgst digest.Digest) (string, error) // that grant access to the global blob store. type linkedBlobStore struct { *blobStore + registry *registry blobServer distribution.BlobServer blobAccessController distribution.BlobDescriptorService repository distribution.Repository @@ -185,6 +186,28 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro return nil } +func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + repo, err := lbs.registry.Repository(ctx, sourceRepo) + if err != nil { + return distribution.Descriptor{}, err + } + stat, err := repo.Blobs(ctx).Stat(ctx, dgst) + if err != nil { + return distribution.Descriptor{}, err + } + + desc := distribution.Descriptor{ + Size: stat.Size, + + // NOTE(stevvooe): The central blob store firewalls media types from + // other users. The caller should look this up and override the value + // for the specific repository. + MediaType: "application/octet-stream", + Digest: dgst, + } + return desc, lbs.linkBlob(ctx, desc) +} + // newBlobUpload allocates a new upload controller with the given state. func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { fw, err := newFileWriter(ctx, lbs.driver, path) diff --git a/registry/storage/registry.go b/registry/storage/registry.go index b3810676..869895dd 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -233,6 +233,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore { } return &linkedBlobStore{ + registry: repo.registry, blobStore: repo.blobStore, blobServer: repo.blobServer, blobAccessController: statter, From 3a35a2d95351e2907de84e0e105cdbeb483cf52f Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 5 Jan 2016 11:13:27 -0800 Subject: [PATCH 2/3] Allows token authentication handler to request additional scopes When an auth request provides the "from" query parameter, the token handler will add a "pull" scope for the provided repository, refreshing the token if the overall scope has increased Signed-off-by: Brian Bland --- registry/client/auth/session.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/registry/client/auth/session.go b/registry/client/auth/session.go index 9819b3cb..6b483c62 100644 --- a/registry/client/auth/session.go +++ b/registry/client/auth/session.go @@ -108,6 +108,8 @@ type tokenHandler struct { tokenLock sync.Mutex tokenCache string tokenExpiration time.Time + + additionalScopes map[string]struct{} } // tokenScope represents the scope at which a token will be requested. @@ -145,6 +147,7 @@ func newTokenHandler(transport http.RoundTripper, creds CredentialStore, c clock Scope: scope, Actions: actions, }, + additionalScopes: map[string]struct{}{}, } } @@ -160,7 +163,15 @@ func (th *tokenHandler) Scheme() string { } func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error { - if err := th.refreshToken(params); err != nil { + var additionalScopes []string + if fromParam := req.URL.Query().Get("from"); fromParam != "" { + additionalScopes = append(additionalScopes, tokenScope{ + Resource: "repository", + Scope: fromParam, + Actions: []string{"pull"}, + }.String()) + } + if err := th.refreshToken(params, additionalScopes...); err != nil { return err } @@ -169,11 +180,18 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st return nil } -func (th *tokenHandler) refreshToken(params map[string]string) error { +func (th *tokenHandler) refreshToken(params map[string]string, additionalScopes ...string) error { th.tokenLock.Lock() defer th.tokenLock.Unlock() + var addedScopes bool + for _, scope := range additionalScopes { + if _, ok := th.additionalScopes[scope]; !ok { + th.additionalScopes[scope] = struct{}{} + addedScopes = true + } + } now := th.clock.Now() - if now.After(th.tokenExpiration) { + if now.After(th.tokenExpiration) || addedScopes { tr, err := th.fetchToken(params) if err != nil { return err @@ -223,6 +241,10 @@ func (th *tokenHandler) fetchToken(params map[string]string) (token *tokenRespon reqParams.Add("scope", scopeField) } + for scope := range th.additionalScopes { + reqParams.Add("scope", scope) + } + if th.creds != nil { username, password := th.creds.Basic(realmURL) if username != "" && password != "" { From 613cfc861dbb44fa3cf46298e9fdd2f95e89d366 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 6 Jan 2016 15:57:35 -0800 Subject: [PATCH 3/3] Fires a new Mount event when blobs are cross-repo mounted Adds an optional "fromRepository" field to the event target Signed-off-by: Brian Bland --- notifications/bridge.go | 9 +++++++++ notifications/event.go | 5 +++++ notifications/listener.go | 12 ++++++++++++ notifications/listener_test.go | 5 +++++ 4 files changed, 31 insertions(+) diff --git a/notifications/bridge.go b/notifications/bridge.go index 93a2362a..efaa3ac3 100644 --- a/notifications/bridge.go +++ b/notifications/bridge.go @@ -72,6 +72,15 @@ func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionPull, repo, desc) } +func (b *bridge) BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error { + event, err := b.createBlobEvent(EventActionMount, repo, desc) + if err != nil { + return err + } + event.Target.FromRepository = fromRepo + return b.sink.Write(*event) +} + func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionDelete, repo, desc) } diff --git a/notifications/event.go b/notifications/event.go index 97030026..19d6a776 100644 --- a/notifications/event.go +++ b/notifications/event.go @@ -11,6 +11,7 @@ import ( const ( EventActionPull = "pull" EventActionPush = "push" + EventActionMount = "mount" EventActionDelete = "delete" ) @@ -61,6 +62,10 @@ type Event struct { // Repository identifies the named repository. Repository string `json:"repository,omitempty"` + // FromRepository identifies the named repository which a blob was mounted + // from if appropriate. + FromRepository string `json:"fromRepository,omitempty"` + // URL provides a direct link to the content. URL string `json:"url,omitempty"` } `json:"target,omitempty"` diff --git a/notifications/listener.go b/notifications/listener.go index baecbdbd..d4e1abe7 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -24,6 +24,7 @@ type ManifestListener interface { type BlobListener interface { BlobPushed(repo string, desc distribution.Descriptor) error BlobPulled(repo string, desc distribution.Descriptor) error + BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error // TODO(stevvooe): Please note that delete support is still a little shaky // and we'll need to propagate these in the future. @@ -169,6 +170,17 @@ func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribu return bsl.decorateWriter(wr), err } +func (bsl *blobServiceListener) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) { + desc, err := bsl.BlobStore.Mount(ctx, sourceRepo, dgst) + if err == nil { + if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), desc, sourceRepo); err != nil { + context.GetLogger(ctx).Errorf("error dispatching layer mount to listener: %v", err) + } + } + + return desc, err +} + func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter { return &blobWriterListener{ BlobWriter: wr, diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 319406c3..17ffa288 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -81,6 +81,11 @@ func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) er return nil } +func (tl *testListener) BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error { + tl.ops["layer:mount"]++ + return nil +} + func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error { tl.ops["layer:delete"]++ return nil