Merge pull request #387 from dmcgowan/registry-client-update

Add client implementation of distribution interface
This commit is contained in:
Stephen Day 2015-05-20 15:55:20 -07:00
commit ea54a5560a
21 changed files with 2747 additions and 1698 deletions

View File

@ -0,0 +1,174 @@
package client
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
type httpBlobUpload struct {
statter distribution.BlobStatter
client *http.Client
uuid string
startedAt time.Time
location string // always the last value of the location header.
offset int64
closed bool
}
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound {
return distribution.ErrBlobUploadUnknown
}
return handleErrorResponse(resp)
}
func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
if err != nil {
return 0, err
}
defer req.Body.Close()
resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusAccepted {
return 0, hbu.handleErrorResponse(resp)
}
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
if err != nil {
return 0, err
}
rng := resp.Header.Get("Range")
var start, end int64
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
return 0, err
} else if n != 2 || end < start {
return 0, fmt.Errorf("bad range format: %s", rng)
}
return (end - start + 1), nil
}
func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
if err != nil {
return 0, err
}
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusAccepted {
return 0, hbu.handleErrorResponse(resp)
}
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
if err != nil {
return 0, err
}
rng := resp.Header.Get("Range")
var start, end int
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
return 0, err
} else if n != 2 || end < start {
return 0, fmt.Errorf("bad range format: %s", rng)
}
return (end - start + 1), nil
}
func (hbu *httpBlobUpload) Seek(offset int64, whence int) (int64, error) {
newOffset := hbu.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset += int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
hbu.offset = newOffset
return hbu.offset, nil
}
func (hbu *httpBlobUpload) ID() string {
return hbu.uuid
}
func (hbu *httpBlobUpload) StartedAt() time.Time {
return hbu.startedAt
}
func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
// TODO(dmcgowan): Check if already finished, if so just fetch
req, err := http.NewRequest("PUT", hbu.location, nil)
if err != nil {
return distribution.Descriptor{}, err
}
values := req.URL.Query()
values.Set("digest", desc.Digest.String())
req.URL.RawQuery = values.Encode()
resp, err := hbu.client.Do(req)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
}
return hbu.statter.Stat(ctx, desc.Digest)
}
func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
req, err := http.NewRequest("DELETE", hbu.location, nil)
if err != nil {
return err
}
resp, err := hbu.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNoContent, http.StatusNotFound:
return nil
default:
return hbu.handleErrorResponse(resp)
}
}
func (hbu *httpBlobUpload) Close() error {
hbu.closed = true
return nil
}

View File

@ -0,0 +1,207 @@
package client
import (
"bytes"
"fmt"
"net/http"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/testutil"
)
// Test implements distribution.BlobWriter
var _ distribution.BlobWriter = &httpBlobUpload{}
func TestUploadReadFrom(t *testing.T) {
_, b := newRandomBlob(64)
repo := "test/upload/readfrom"
locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo)
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
// Test Valid case
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {"0-63"},
}),
},
},
// Test invalid range
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {""},
}),
},
},
// Test 404
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusNotFound,
},
},
// Test 400 valid json
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Body: []byte(`
{
"errors": [
{
"code": "BLOB_UPLOAD_INVALID",
"message": "invalid upload identifier",
"detail": "more detail"
}
]
}`),
},
},
// Test 400 invalid json
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Body: []byte("something bad happened"),
},
},
// Test 500
{
Request: testutil.Request{
Method: "PATCH",
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusInternalServerError,
},
},
})
e, c := testServer(m)
defer c()
blobUpload := &httpBlobUpload{
client: &http.Client{},
}
// Valid case
blobUpload.location = e + locationPath
n, err := blobUpload.ReadFrom(bytes.NewReader(b))
if err != nil {
t.Fatalf("Error calling ReadFrom: %s", err)
}
if n != 64 {
t.Fatalf("Wrong length returned from ReadFrom: %d, expected 64", n)
}
// Bad range
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when bad range received")
}
// 404
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if err != distribution.ErrBlobUploadUnknown {
t.Fatalf("Wrong error thrown: %s, expected %s", err, distribution.ErrBlobUploadUnknown)
}
// 400 valid json
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*v2.Errors); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if len(uploadErr.Errors) != 1 {
t.Fatalf("Unexpected number of errors: %d, expected 1", len(uploadErr.Errors))
} else {
v2Err := uploadErr.Errors[0]
if v2Err.Code != v2.ErrorCodeBlobUploadInvalid {
t.Fatalf("Unexpected error code: %s, expected %s", v2Err.Code.String(), v2.ErrorCodeBlobUploadInvalid.String())
}
if expected := "invalid upload identifier"; v2Err.Message != expected {
t.Fatalf("Unexpected error message: %s, expected %s", v2Err.Message, expected)
}
if expected := "more detail"; v2Err.Detail.(string) != expected {
t.Fatalf("Unexpected error message: %s, expected %s", v2Err.Detail.(string), expected)
}
}
// 400 invalid json
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPResponseError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else {
respStr := string(uploadErr.Response)
if expected := "something bad happened"; respStr != expected {
t.Fatalf("Unexpected response string: %s, expected: %s", respStr, expected)
}
}
// 500
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPStatusError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if expected := "500 " + http.StatusText(http.StatusInternalServerError); uploadErr.Status != expected {
t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected)
}
}

View File

@ -1,573 +0,0 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
)
// Client implements the client interface to the registry http api
type Client interface {
// GetImageManifest returns an image manifest for the image at the given
// name, tag pair.
GetImageManifest(name, tag string) (*manifest.SignedManifest, error)
// PutImageManifest uploads an image manifest for the image at the given
// name, tag pair.
PutImageManifest(name, tag string, imageManifest *manifest.SignedManifest) error
// DeleteImage removes the image at the given name, tag pair.
DeleteImage(name, tag string) error
// ListImageTags returns a list of all image tags with the given repository
// name.
ListImageTags(name string) ([]string, error)
// BlobLength returns the length of the blob stored at the given name,
// digest pair.
// Returns a length value of -1 on error or if the blob does not exist.
BlobLength(name string, dgst digest.Digest) (int, error)
// GetBlob returns the blob stored at the given name, digest pair in the
// form of an io.ReadCloser with the length of this blob.
// A nonzero byteOffset can be provided to receive a partial blob beginning
// at the given offset.
GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error)
// InitiateBlobUpload starts a blob upload in the given repository namespace
// and returns a unique location url to use for other blob upload methods.
InitiateBlobUpload(name string) (string, error)
// GetBlobUploadStatus returns the byte offset and length of the blob at the
// given upload location.
GetBlobUploadStatus(location string) (int, int, error)
// UploadBlob uploads a full blob to the registry.
UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error
// UploadBlobChunk uploads a blob chunk with a given length and startByte to
// the registry.
// FinishChunkedBlobUpload must be called to finalize this upload.
UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error
// FinishChunkedBlobUpload completes a chunked blob upload at a given
// location.
FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error
// CancelBlobUpload deletes all content at the unfinished blob upload
// location and invalidates any future calls to this blob upload.
CancelBlobUpload(location string) error
}
var (
patternRangeHeader = regexp.MustCompile("bytes=0-(\\d+)/(\\d+)")
)
// New returns a new Client which operates against a registry with the
// given base endpoint
// This endpoint should not include /v2/ or any part of the url after this.
func New(endpoint string) (Client, error) {
ub, err := v2.NewURLBuilderFromString(endpoint)
if err != nil {
return nil, err
}
return &clientImpl{
endpoint: endpoint,
ub: ub,
}, nil
}
// clientImpl is the default implementation of the Client interface
type clientImpl struct {
endpoint string
ub *v2.URLBuilder
}
// TODO(bbland): use consistent route generation between server and client
func (r *clientImpl) GetImageManifest(name, tag string) (*manifest.SignedManifest, error) {
manifestURL, err := r.ub.BuildManifestURL(name, tag)
if err != nil {
return nil, err
}
response, err := http.Get(manifestURL)
if err != nil {
return nil, err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusOK:
break
case response.StatusCode == http.StatusNotFound:
return nil, &ImageManifestNotFoundError{Name: name, Tag: tag}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return nil, err
}
return nil, &errs
default:
return nil, &UnexpectedHTTPStatusError{Status: response.Status}
}
decoder := json.NewDecoder(response.Body)
manifest := new(manifest.SignedManifest)
err = decoder.Decode(manifest)
if err != nil {
return nil, err
}
return manifest, nil
}
func (r *clientImpl) PutImageManifest(name, tag string, manifest *manifest.SignedManifest) error {
manifestURL, err := r.ub.BuildManifestURL(name, tag)
if err != nil {
return err
}
putRequest, err := http.NewRequest("PUT", manifestURL, bytes.NewReader(manifest.Raw))
if err != nil {
return err
}
response, err := http.DefaultClient.Do(putRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusOK || response.StatusCode == http.StatusAccepted:
return nil
case response.StatusCode >= 400 && response.StatusCode < 500:
var errors v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errors)
if err != nil {
return err
}
return &errors
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) DeleteImage(name, tag string) error {
manifestURL, err := r.ub.BuildManifestURL(name, tag)
if err != nil {
return err
}
deleteRequest, err := http.NewRequest("DELETE", manifestURL, nil)
if err != nil {
return err
}
response, err := http.DefaultClient.Do(deleteRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusNoContent:
break
case response.StatusCode == http.StatusNotFound:
return &ImageManifestNotFoundError{Name: name, Tag: tag}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return err
}
return &errs
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
return nil
}
func (r *clientImpl) ListImageTags(name string) ([]string, error) {
tagsURL, err := r.ub.BuildTagsURL(name)
if err != nil {
return nil, err
}
response, err := http.Get(tagsURL)
if err != nil {
return nil, err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusOK:
break
case response.StatusCode == http.StatusNotFound:
return nil, &RepositoryNotFoundError{Name: name}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return nil, err
}
return nil, &errs
default:
return nil, &UnexpectedHTTPStatusError{Status: response.Status}
}
tags := struct {
Tags []string `json:"tags"`
}{}
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&tags)
if err != nil {
return nil, err
}
return tags.Tags, nil
}
func (r *clientImpl) BlobLength(name string, dgst digest.Digest) (int, error) {
blobURL, err := r.ub.BuildBlobURL(name, dgst)
if err != nil {
return -1, err
}
response, err := http.Head(blobURL)
if err != nil {
return -1, err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusOK:
lengthHeader := response.Header.Get("Content-Length")
length, err := strconv.ParseInt(lengthHeader, 10, 64)
if err != nil {
return -1, err
}
return int(length), nil
case response.StatusCode == http.StatusNotFound:
return -1, nil
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return -1, err
}
return -1, &errs
default:
return -1, &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) {
blobURL, err := r.ub.BuildBlobURL(name, dgst)
if err != nil {
return nil, 0, err
}
getRequest, err := http.NewRequest("GET", blobURL, nil)
if err != nil {
return nil, 0, err
}
getRequest.Header.Add("Range", fmt.Sprintf("%d-", byteOffset))
response, err := http.DefaultClient.Do(getRequest)
if err != nil {
return nil, 0, err
}
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusOK:
lengthHeader := response.Header.Get("Content-Length")
length, err := strconv.ParseInt(lengthHeader, 10, 0)
if err != nil {
return nil, 0, err
}
return response.Body, int(length), nil
case response.StatusCode == http.StatusNotFound:
response.Body.Close()
return nil, 0, &BlobNotFoundError{Name: name, Digest: dgst}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return nil, 0, err
}
return nil, 0, &errs
default:
response.Body.Close()
return nil, 0, &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) InitiateBlobUpload(name string) (string, error) {
uploadURL, err := r.ub.BuildBlobUploadURL(name)
if err != nil {
return "", err
}
postRequest, err := http.NewRequest("POST", uploadURL, nil)
if err != nil {
return "", err
}
response, err := http.DefaultClient.Do(postRequest)
if err != nil {
return "", err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusAccepted:
return response.Header.Get("Location"), nil
// case response.StatusCode == http.StatusNotFound:
// return
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return "", err
}
return "", &errs
default:
return "", &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) GetBlobUploadStatus(location string) (int, int, error) {
response, err := http.Get(location)
if err != nil {
return 0, 0, err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusNoContent:
return parseRangeHeader(response.Header.Get("Range"))
case response.StatusCode == http.StatusNotFound:
return 0, 0, &BlobUploadNotFoundError{Location: location}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return 0, 0, err
}
return 0, 0, &errs
default:
return 0, 0, &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error {
defer blob.Close()
putRequest, err := http.NewRequest("PUT", location, blob)
if err != nil {
return err
}
values := putRequest.URL.Query()
values.Set("digest", dgst.String())
putRequest.URL.RawQuery = values.Encode()
putRequest.Header.Set("Content-Type", "application/octet-stream")
putRequest.Header.Set("Content-Length", fmt.Sprint(length))
response, err := http.DefaultClient.Do(putRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusCreated:
return nil
case response.StatusCode == http.StatusNotFound:
return &BlobUploadNotFoundError{Location: location}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return err
}
return &errs
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error {
defer blobChunk.Close()
putRequest, err := http.NewRequest("PUT", location, blobChunk)
if err != nil {
return err
}
endByte := startByte + length
putRequest.Header.Set("Content-Type", "application/octet-stream")
putRequest.Header.Set("Content-Length", fmt.Sprint(length))
putRequest.Header.Set("Content-Range",
fmt.Sprintf("%d-%d/%d", startByte, endByte, endByte))
response, err := http.DefaultClient.Do(putRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusAccepted:
return nil
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
lastValidRange, blobSize, err := parseRangeHeader(response.Header.Get("Range"))
if err != nil {
return err
}
return &BlobUploadInvalidRangeError{
Location: location,
LastValidRange: lastValidRange,
BlobSize: blobSize,
}
case response.StatusCode == http.StatusNotFound:
return &BlobUploadNotFoundError{Location: location}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return err
}
return &errs
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error {
putRequest, err := http.NewRequest("PUT", location, nil)
if err != nil {
return err
}
values := putRequest.URL.Query()
values.Set("digest", dgst.String())
putRequest.URL.RawQuery = values.Encode()
putRequest.Header.Set("Content-Type", "application/octet-stream")
putRequest.Header.Set("Content-Length", "0")
putRequest.Header.Set("Content-Range",
fmt.Sprintf("%d-%d/%d", length, length, length))
response, err := http.DefaultClient.Do(putRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusCreated:
return nil
case response.StatusCode == http.StatusNotFound:
return &BlobUploadNotFoundError{Location: location}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return err
}
return &errs
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
}
func (r *clientImpl) CancelBlobUpload(location string) error {
deleteRequest, err := http.NewRequest("DELETE", location, nil)
if err != nil {
return err
}
response, err := http.DefaultClient.Do(deleteRequest)
if err != nil {
return err
}
defer response.Body.Close()
// TODO(bbland): handle other status codes, like 5xx errors
switch {
case response.StatusCode == http.StatusNoContent:
return nil
case response.StatusCode == http.StatusNotFound:
return &BlobUploadNotFoundError{Location: location}
case response.StatusCode >= 400 && response.StatusCode < 500:
var errs v2.Errors
decoder := json.NewDecoder(response.Body)
err = decoder.Decode(&errs)
if err != nil {
return err
}
return &errs
default:
return &UnexpectedHTTPStatusError{Status: response.Status}
}
}
// parseRangeHeader parses out the offset and length from a returned Range
// header
func parseRangeHeader(byteRangeHeader string) (int, int, error) {
submatches := patternRangeHeader.FindStringSubmatch(byteRangeHeader)
if submatches == nil || len(submatches) < 3 {
return 0, 0, fmt.Errorf("Malformed Range header")
}
offset, err := strconv.Atoi(submatches[1])
if err != nil {
return 0, 0, err
}
length, err := strconv.Atoi(submatches[2])
if err != nil {
return 0, 0, err
}
return offset, length, nil
}

View File

@ -1,440 +0,0 @@
package client
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/testutil"
)
type testBlob struct {
digest digest.Digest
contents []byte
}
func TestRangeHeaderParser(t *testing.T) {
const (
malformedRangeHeader = "bytes=0-A/C"
emptyRangeHeader = ""
rFirst = 100
rSecond = 200
)
var (
wellformedRangeHeader = fmt.Sprintf("bytes=0-%d/%d", rFirst, rSecond)
)
if _, _, err := parseRangeHeader(malformedRangeHeader); err == nil {
t.Fatalf("malformedRangeHeader: error expected, got nil")
}
if _, _, err := parseRangeHeader(emptyRangeHeader); err == nil {
t.Fatalf("emptyRangeHeader: error expected, got nil")
}
first, second, err := parseRangeHeader(wellformedRangeHeader)
if err != nil {
t.Fatalf("wellformedRangeHeader: unexpected error %v", err)
}
if first != rFirst || second != rSecond {
t.Fatalf("Range has been parsed unproperly: %d/%d", first, second)
}
}
func TestPush(t *testing.T) {
name := "hello/world"
tag := "sometag"
testBlobs := []testBlob{
{
digest: "tarsum.v2+sha256:12345",
contents: []byte("some contents"),
},
{
digest: "tarsum.v2+sha256:98765",
contents: []byte("some other contents"),
},
}
uploadLocations := make([]string, len(testBlobs))
blobs := make([]manifest.FSLayer, len(testBlobs))
history := make([]manifest.History, len(testBlobs))
for i, blob := range testBlobs {
// TODO(bbland): this is returning the same location for all uploads,
// because we can't know which blob will get which location.
// It's sort of okay because we're using unique digests, but this needs
// to change at some point.
uploadLocations[i] = fmt.Sprintf("/v2/%s/blobs/test-uuid", name)
blobs[i] = manifest.FSLayer{BlobSum: blob.digest}
history[i] = manifest.History{V1Compatibility: blob.digest.String()}
}
m := &manifest.SignedManifest{
Manifest: manifest.Manifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: blobs,
History: history,
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
},
}
var err error
m.Raw, err = json.Marshal(m)
blobRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs))
for i, blob := range testBlobs {
blobRequestResponseMappings[2*i] = testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
Route: "/v2/" + name + "/blobs/uploads/",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Location": {uploadLocations[i]},
}),
},
}
blobRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PUT",
Route: uploadLocations[i],
QueryParams: map[string][]string{
"digest": {blob.digest.String()},
},
Body: blob.contents,
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
},
}
}
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + name + "/manifests/" + tag,
Body: m.Raw,
},
Response: testutil.Response{
StatusCode: http.StatusOK,
},
}))
var server *httptest.Server
// HACK(stevvooe): Super hack to follow: the request response map approach
// above does not let us correctly format the location header to the
// server url. This handler intercepts and re-writes the location header
// to the server url.
hack := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w = &headerInterceptingResponseWriter{ResponseWriter: w, serverURL: server.URL}
handler.ServeHTTP(w, r)
})
server = httptest.NewServer(hack)
client, err := New(server.URL)
if err != nil {
t.Fatalf("error creating client: %v", err)
}
objectStore := &memoryObjectStore{
mutex: new(sync.Mutex),
manifestStorage: make(map[string]*manifest.SignedManifest),
layerStorage: make(map[digest.Digest]Layer),
}
for _, blob := range testBlobs {
l, err := objectStore.Layer(blob.digest)
if err != nil {
t.Fatal(err)
}
writer, err := l.Writer()
if err != nil {
t.Fatal(err)
}
writer.SetSize(len(blob.contents))
writer.Write(blob.contents)
writer.Close()
}
objectStore.WriteManifest(name, tag, m)
err = Push(client, objectStore, name, tag)
if err != nil {
t.Fatal(err)
}
}
func TestPull(t *testing.T) {
name := "hello/world"
tag := "sometag"
testBlobs := []testBlob{
{
digest: "tarsum.v2+sha256:12345",
contents: []byte("some contents"),
},
{
digest: "tarsum.v2+sha256:98765",
contents: []byte("some other contents"),
},
}
blobs := make([]manifest.FSLayer, len(testBlobs))
history := make([]manifest.History, len(testBlobs))
for i, blob := range testBlobs {
blobs[i] = manifest.FSLayer{BlobSum: blob.digest}
history[i] = manifest.History{V1Compatibility: blob.digest.String()}
}
m := &manifest.SignedManifest{
Manifest: manifest.Manifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: blobs,
History: history,
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
},
}
manifestBytes, err := json.Marshal(m)
blobRequestResponseMappings := make([]testutil.RequestResponseMapping, len(testBlobs))
for i, blob := range testBlobs {
blobRequestResponseMappings[i] = testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: blob.contents,
},
}
}
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifests/" + tag,
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: manifestBytes,
},
}))
server := httptest.NewServer(handler)
client, err := New(server.URL)
if err != nil {
t.Fatalf("error creating client: %v", err)
}
objectStore := &memoryObjectStore{
mutex: new(sync.Mutex),
manifestStorage: make(map[string]*manifest.SignedManifest),
layerStorage: make(map[digest.Digest]Layer),
}
err = Pull(client, objectStore, name, tag)
if err != nil {
t.Fatal(err)
}
m, err = objectStore.Manifest(name, tag)
if err != nil {
t.Fatal(err)
}
mBytes, err := json.Marshal(m)
if err != nil {
t.Fatal(err)
}
if string(mBytes) != string(manifestBytes) {
t.Fatal("Incorrect manifest")
}
for _, blob := range testBlobs {
l, err := objectStore.Layer(blob.digest)
if err != nil {
t.Fatal(err)
}
reader, err := l.Reader()
if err != nil {
t.Fatal(err)
}
defer reader.Close()
blobBytes, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if string(blobBytes) != string(blob.contents) {
t.Fatal("Incorrect blob")
}
}
}
func TestPullResume(t *testing.T) {
name := "hello/world"
tag := "sometag"
testBlobs := []testBlob{
{
digest: "tarsum.v2+sha256:12345",
contents: []byte("some contents"),
},
{
digest: "tarsum.v2+sha256:98765",
contents: []byte("some other contents"),
},
}
layers := make([]manifest.FSLayer, len(testBlobs))
history := make([]manifest.History, len(testBlobs))
for i, layer := range testBlobs {
layers[i] = manifest.FSLayer{BlobSum: layer.digest}
history[i] = manifest.History{V1Compatibility: layer.digest.String()}
}
m := &manifest.Manifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: layers,
History: history,
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
}
manifestBytes, err := json.Marshal(m)
layerRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs))
for i, blob := range testBlobs {
layerRequestResponseMappings[2*i] = testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: blob.contents[:len(blob.contents)/2],
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(blob.contents))},
}),
},
}
layerRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/blobs/" + blob.digest.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: blob.contents[len(blob.contents)/2:],
},
}
}
for i := 0; i < 3; i++ {
layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifests/" + tag,
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: manifestBytes,
},
})
}
handler := testutil.NewHandler(layerRequestResponseMappings)
server := httptest.NewServer(handler)
client, err := New(server.URL)
if err != nil {
t.Fatalf("error creating client: %v", err)
}
objectStore := &memoryObjectStore{
mutex: new(sync.Mutex),
manifestStorage: make(map[string]*manifest.SignedManifest),
layerStorage: make(map[digest.Digest]Layer),
}
for attempts := 0; attempts < 3; attempts++ {
err = Pull(client, objectStore, name, tag)
if err == nil {
break
}
}
if err != nil {
t.Fatal(err)
}
sm, err := objectStore.Manifest(name, tag)
if err != nil {
t.Fatal(err)
}
mBytes, err := json.Marshal(sm)
if err != nil {
t.Fatal(err)
}
if string(mBytes) != string(manifestBytes) {
t.Fatal("Incorrect manifest")
}
for _, blob := range testBlobs {
l, err := objectStore.Layer(blob.digest)
if err != nil {
t.Fatal(err)
}
reader, err := l.Reader()
if err != nil {
t.Fatal(err)
}
defer reader.Close()
layerBytes, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if string(layerBytes) != string(blob.contents) {
t.Fatal("Incorrect blob")
}
}
}
// headerInterceptingResponseWriter is a hacky workaround to re-write the
// location header to have the server url.
type headerInterceptingResponseWriter struct {
http.ResponseWriter
serverURL string
}
func (hirw *headerInterceptingResponseWriter) WriteHeader(status int) {
location := hirw.Header().Get("Location")
if location != "" {
hirw.Header().Set("Location", hirw.serverURL+location)
}
hirw.ResponseWriter.WriteHeader(status)
}

View File

@ -1,73 +1,15 @@
package client package client
import ( import (
"encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"net/http"
"github.com/docker/distribution/digest" "github.com/docker/distribution/registry/api/v2"
) )
// RepositoryNotFoundError is returned when making an operation against a
// repository that does not exist in the registry.
type RepositoryNotFoundError struct {
Name string
}
func (e *RepositoryNotFoundError) Error() string {
return fmt.Sprintf("No repository found with Name: %s", e.Name)
}
// ImageManifestNotFoundError is returned when making an operation against a
// given image manifest that does not exist in the registry.
type ImageManifestNotFoundError struct {
Name string
Tag string
}
func (e *ImageManifestNotFoundError) Error() string {
return fmt.Sprintf("No manifest found with Name: %s, Tag: %s",
e.Name, e.Tag)
}
// BlobNotFoundError is returned when making an operation against a given image
// layer that does not exist in the registry.
type BlobNotFoundError struct {
Name string
Digest digest.Digest
}
func (e *BlobNotFoundError) Error() string {
return fmt.Sprintf("No blob found with Name: %s, Digest: %s",
e.Name, e.Digest)
}
// BlobUploadNotFoundError is returned when making a blob upload operation against an
// invalid blob upload location url.
// This may be the result of using a cancelled, completed, or stale upload
// location.
type BlobUploadNotFoundError struct {
Location string
}
func (e *BlobUploadNotFoundError) Error() string {
return fmt.Sprintf("No blob upload found at Location: %s", e.Location)
}
// BlobUploadInvalidRangeError is returned when attempting to upload an image
// blob chunk that is out of order.
// This provides the known BlobSize and LastValidRange which can be used to
// resume the upload.
type BlobUploadInvalidRangeError struct {
Location string
LastValidRange int
BlobSize int
}
func (e *BlobUploadInvalidRangeError) Error() string {
return fmt.Sprintf(
"Invalid range provided for upload at Location: %s. Last Valid Range: %d, Blob Size: %d",
e.Location, e.LastValidRange, e.BlobSize)
}
// UnexpectedHTTPStatusError is returned when an unexpected HTTP status is // UnexpectedHTTPStatusError is returned when an unexpected HTTP status is
// returned when making a registry api call. // returned when making a registry api call.
type UnexpectedHTTPStatusError struct { type UnexpectedHTTPStatusError struct {
@ -77,3 +19,48 @@ type UnexpectedHTTPStatusError struct {
func (e *UnexpectedHTTPStatusError) Error() string { func (e *UnexpectedHTTPStatusError) Error() string {
return fmt.Sprintf("Received unexpected HTTP status: %s", e.Status) return fmt.Sprintf("Received unexpected HTTP status: %s", e.Status)
} }
// UnexpectedHTTPResponseError is returned when an expected HTTP status code
// is returned, but the content was unexpected and failed to be parsed.
type UnexpectedHTTPResponseError struct {
ParseErr error
Response []byte
}
func (e *UnexpectedHTTPResponseError) Error() string {
return fmt.Sprintf("Error parsing HTTP response: %s: %q", e.ParseErr.Error(), string(e.Response))
}
func parseHTTPErrorResponse(r io.Reader) error {
var errors v2.Errors
body, err := ioutil.ReadAll(r)
if err != nil {
return err
}
if err := json.Unmarshal(body, &errors); err != nil {
return &UnexpectedHTTPResponseError{
ParseErr: err,
Response: body,
}
}
return &errors
}
func handleErrorResponse(resp *http.Response) error {
if resp.StatusCode == 401 {
err := parseHTTPErrorResponse(resp.Body)
if uErr, ok := err.(*UnexpectedHTTPResponseError); ok {
return &v2.Error{
Code: v2.ErrorCodeUnauthorized,
Message: "401 Unauthorized",
Detail: uErr.Response,
}
}
return err
}
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
return parseHTTPErrorResponse(resp.Body)
}
return &UnexpectedHTTPStatusError{Status: resp.Status}
}

View File

@ -1,239 +0,0 @@
package client
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
)
var (
// ErrLayerAlreadyExists is returned when attempting to create a layer with
// a tarsum that is already in use.
ErrLayerAlreadyExists = fmt.Errorf("Layer already exists")
// ErrLayerLocked is returned when attempting to write to a layer which is
// currently being written to.
ErrLayerLocked = fmt.Errorf("Layer locked")
)
// ObjectStore is an interface which is designed to approximate the docker
// engine storage. This interface is subject to change to conform to the
// future requirements of the engine.
type ObjectStore interface {
// Manifest retrieves the image manifest stored at the given repository name
// and tag
Manifest(name, tag string) (*manifest.SignedManifest, error)
// WriteManifest stores an image manifest at the given repository name and
// tag
WriteManifest(name, tag string, manifest *manifest.SignedManifest) error
// Layer returns a handle to a layer for reading and writing
Layer(dgst digest.Digest) (Layer, error)
}
// Layer is a generic image layer interface.
// A Layer may not be written to if it is already complete.
type Layer interface {
// Reader returns a LayerReader or an error if the layer has not been
// written to or is currently being written to.
Reader() (LayerReader, error)
// Writer returns a LayerWriter or an error if the layer has been fully
// written to or is currently being written to.
Writer() (LayerWriter, error)
// Wait blocks until the Layer can be read from.
Wait() error
}
// LayerReader is a read-only handle to a Layer, which exposes the CurrentSize
// and full Size in addition to implementing the io.ReadCloser interface.
type LayerReader interface {
io.ReadCloser
// CurrentSize returns the number of bytes written to the underlying Layer
CurrentSize() int
// Size returns the full size of the underlying Layer
Size() int
}
// LayerWriter is a write-only handle to a Layer, which exposes the CurrentSize
// and full Size in addition to implementing the io.WriteCloser interface.
// SetSize must be called on this LayerWriter before it can be written to.
type LayerWriter interface {
io.WriteCloser
// CurrentSize returns the number of bytes written to the underlying Layer
CurrentSize() int
// Size returns the full size of the underlying Layer
Size() int
// SetSize sets the full size of the underlying Layer.
// This must be called before any calls to Write
SetSize(int) error
}
// memoryObjectStore is an in-memory implementation of the ObjectStore interface
type memoryObjectStore struct {
mutex *sync.Mutex
manifestStorage map[string]*manifest.SignedManifest
layerStorage map[digest.Digest]Layer
}
func (objStore *memoryObjectStore) Manifest(name, tag string) (*manifest.SignedManifest, error) {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
manifest, ok := objStore.manifestStorage[name+":"+tag]
if !ok {
return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag)
}
return manifest, nil
}
func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *manifest.SignedManifest) error {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
objStore.manifestStorage[name+":"+tag] = manifest
return nil
}
func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) {
objStore.mutex.Lock()
defer objStore.mutex.Unlock()
layer, ok := objStore.layerStorage[dgst]
if !ok {
layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))}
objStore.layerStorage[dgst] = layer
}
return layer, nil
}
type memoryLayer struct {
cond *sync.Cond
contents []byte
expectedSize int
writing bool
}
func (ml *memoryLayer) Reader() (LayerReader, error) {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.contents == nil {
return nil, fmt.Errorf("Layer has not been written to yet")
}
if ml.writing {
return nil, ErrLayerLocked
}
return &memoryLayerReader{ml: ml, reader: bytes.NewReader(ml.contents)}, nil
}
func (ml *memoryLayer) Writer() (LayerWriter, error) {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.contents != nil {
if ml.writing {
return nil, ErrLayerLocked
}
if ml.expectedSize == len(ml.contents) {
return nil, ErrLayerAlreadyExists
}
} else {
ml.contents = make([]byte, 0)
}
ml.writing = true
return &memoryLayerWriter{ml: ml, buffer: bytes.NewBuffer(ml.contents)}, nil
}
func (ml *memoryLayer) Wait() error {
ml.cond.L.Lock()
defer ml.cond.L.Unlock()
if ml.contents == nil {
return fmt.Errorf("No writer to wait on")
}
for ml.writing {
ml.cond.Wait()
}
return nil
}
type memoryLayerReader struct {
ml *memoryLayer
reader *bytes.Reader
}
func (mlr *memoryLayerReader) Read(p []byte) (int, error) {
return mlr.reader.Read(p)
}
func (mlr *memoryLayerReader) Close() error {
return nil
}
func (mlr *memoryLayerReader) CurrentSize() int {
return len(mlr.ml.contents)
}
func (mlr *memoryLayerReader) Size() int {
return mlr.ml.expectedSize
}
type memoryLayerWriter struct {
ml *memoryLayer
buffer *bytes.Buffer
}
func (mlw *memoryLayerWriter) Write(p []byte) (int, error) {
if mlw.ml.expectedSize == 0 {
return 0, fmt.Errorf("Must set size before writing to layer")
}
wrote, err := mlw.buffer.Write(p)
mlw.ml.contents = mlw.buffer.Bytes()
return wrote, err
}
func (mlw *memoryLayerWriter) Close() error {
mlw.ml.cond.L.Lock()
defer mlw.ml.cond.L.Unlock()
return mlw.close()
}
func (mlw *memoryLayerWriter) close() error {
mlw.ml.writing = false
mlw.ml.cond.Broadcast()
return nil
}
func (mlw *memoryLayerWriter) CurrentSize() int {
return len(mlw.ml.contents)
}
func (mlw *memoryLayerWriter) Size() int {
return mlw.ml.expectedSize
}
func (mlw *memoryLayerWriter) SetSize(size int) error {
if !mlw.ml.writing {
return fmt.Errorf("Layer is closed for writing")
}
mlw.ml.expectedSize = size
return nil
}

View File

@ -1,151 +0,0 @@
package client
import (
"fmt"
"io"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/manifest"
)
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
// A layer may not be pulled until the layer preceeding it by the length of the
// pull window has been successfully pulled.
const simultaneousLayerPullWindow = 4
// Pull implements a client pull workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := c.GetImageManifest(name, tag)
if err != nil {
return err
}
log.WithField("manifest", manifest).Info("Pulled manifest")
if len(manifest.FSLayers) != len(manifest.History) {
return fmt.Errorf("Length of history not equal to number of layers")
}
if len(manifest.FSLayers) == 0 {
return fmt.Errorf("Image has no layers")
}
errChans := make([]chan error, len(manifest.FSLayers))
for i := range manifest.FSLayers {
errChans[i] = make(chan error)
}
// To avoid leak of goroutines we must notify
// pullLayer goroutines about a cancelation,
// otherwise they will lock forever.
cancelCh := make(chan struct{})
// Iterate over each layer in the manifest, simultaneously pulling no more
// than simultaneousLayerPullWindow layers at a time. If an error is
// received from a layer pull, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
dependentLayer := i - simultaneousLayerPullWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Pull aborted")
close(cancelCh)
return err
}
}
if i < len(manifest.FSLayers) {
go func(i int) {
select {
case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]):
case <-cancelCh: // no chance to recv until cancelCh's closed
}
}(i)
}
}
err = objectStore.WriteManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to write image manifest")
return err
}
return nil
}
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pulling layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
layerWriter, err := layer.Writer()
if err == ErrLayerAlreadyExists {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
if err == ErrLayerLocked {
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
layer.Wait()
return nil
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
defer layerWriter.Close()
if layerWriter.CurrentSize() > 0 {
log.WithFields(log.Fields{
"layer": fsLayer,
"currentSize": layerWriter.CurrentSize(),
"size": layerWriter.Size(),
}).Info("Layer partially downloaded, resuming")
}
layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize())
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
defer layerReader.Close()
layerWriter.SetSize(layerWriter.CurrentSize() + length)
_, err = io.Copy(layerWriter, layerReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
if layerWriter.CurrentSize() != layerWriter.Size() {
log.WithFields(log.Fields{
"size": layerWriter.Size(),
"currentSize": layerWriter.CurrentSize(),
"layer": fsLayer,
}).Warn("Layer invalid size")
return fmt.Errorf(
"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
fsLayer, layerWriter.Size(), layerWriter.CurrentSize(),
)
}
return nil
}

View File

@ -1,137 +0,0 @@
package client
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/manifest"
)
// simultaneousLayerPushWindow is the size of the parallel layer push window.
// A layer may not be pushed until the layer preceeding it by the length of the
// push window has been successfully pushed.
const simultaneousLayerPushWindow = 4
type pushFunction func(fsLayer manifest.FSLayer) error
// Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Push(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := objectStore.Manifest(name, tag)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"name": name,
"tag": tag,
}).Info("No image found")
return err
}
errChans := make([]chan error, len(manifest.FSLayers))
for i := range manifest.FSLayers {
errChans[i] = make(chan error)
}
cancelCh := make(chan struct{})
// Iterate over each layer in the manifest, simultaneously pushing no more
// than simultaneousLayerPushWindow layers at a time. If an error is
// received from a layer push, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
dependentLayer := i - simultaneousLayerPushWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Push aborted")
close(cancelCh)
return err
}
}
if i < len(manifest.FSLayers) {
go func(i int) {
select {
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
case <-cancelCh: // recv broadcast notification about cancelation
}
}(i)
}
}
err = c.PutImageManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to upload manifest")
return err
}
return nil
}
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pushing layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
layerReader, err := layer.Reader()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
defer layerReader.Close()
if layerReader.CurrentSize() != layerReader.Size() {
log.WithFields(log.Fields{
"layer": fsLayer,
"currentSize": layerReader.CurrentSize(),
"size": layerReader.Size(),
}).Warn("Local layer incomplete")
return fmt.Errorf("Local layer incomplete")
}
length, err := c.BlobLength(name, fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to check existence of remote layer")
return err
}
if length >= 0 {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
location, err := c.InitiateBlobUpload(name)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
return nil
}

View File

@ -0,0 +1,412 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client/transport"
"github.com/docker/distribution/registry/storage/cache"
)
// NewRepository creates a new Repository for the given repository name and base URL
func NewRepository(ctx context.Context, name, baseURL string, transport http.RoundTripper) (distribution.Repository, error) {
if err := v2.ValidateRespositoryName(name); err != nil {
return nil, err
}
ub, err := v2.NewURLBuilderFromString(baseURL)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: 1 * time.Minute,
// TODO(dmcgowan): create cookie jar
}
return &repository{
client: client,
ub: ub,
name: name,
context: ctx,
}, nil
}
type repository struct {
client *http.Client
ub *v2.URLBuilder
context context.Context
name string
}
func (r *repository) Name() string {
return r.name
}
func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
statter := &blobStatter{
name: r.Name(),
ub: r.ub,
client: r.client,
}
return &blobs{
name: r.Name(),
ub: r.ub,
client: r.client,
statter: cache.NewCachedBlobStatter(cache.NewInMemoryBlobDescriptorCacheProvider(), statter),
}
}
func (r *repository) Manifests() distribution.ManifestService {
return &manifests{
name: r.Name(),
ub: r.ub,
client: r.client,
}
}
func (r *repository) Signatures() distribution.SignatureService {
return &signatures{
manifests: r.Manifests(),
}
}
type signatures struct {
manifests distribution.ManifestService
}
func (s *signatures) Get(dgst digest.Digest) ([][]byte, error) {
m, err := s.manifests.Get(dgst)
if err != nil {
return nil, err
}
return m.Signatures()
}
func (s *signatures) Put(dgst digest.Digest, signatures ...[]byte) error {
panic("not implemented")
}
type manifests struct {
name string
ub *v2.URLBuilder
client *http.Client
}
func (ms *manifests) Tags() ([]string, error) {
u, err := ms.ub.BuildTagsURL(ms.name)
if err != nil {
return nil, err
}
resp, err := ms.client.Get(u)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
tagsResponse := struct {
Tags []string `json:"tags"`
}{}
if err := json.Unmarshal(b, &tagsResponse); err != nil {
return nil, err
}
return tagsResponse.Tags, nil
case http.StatusNotFound:
return nil, nil
default:
return nil, handleErrorResponse(resp)
}
}
func (ms *manifests) Exists(dgst digest.Digest) (bool, error) {
// Call by Tag endpoint since the API uses the same
// URL endpoint for tags and digests.
return ms.ExistsByTag(dgst.String())
}
func (ms *manifests) ExistsByTag(tag string) (bool, error) {
u, err := ms.ub.BuildManifestURL(ms.name, tag)
if err != nil {
return false, err
}
resp, err := ms.client.Head(u)
if err != nil {
return false, err
}
switch resp.StatusCode {
case http.StatusOK:
return true, nil
case http.StatusNotFound:
return false, nil
default:
return false, handleErrorResponse(resp)
}
}
func (ms *manifests) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
// Call by Tag endpoint since the API uses the same
// URL endpoint for tags and digests.
return ms.GetByTag(dgst.String())
}
func (ms *manifests) GetByTag(tag string) (*manifest.SignedManifest, error) {
u, err := ms.ub.BuildManifestURL(ms.name, tag)
if err != nil {
return nil, err
}
resp, err := ms.client.Get(u)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
var sm manifest.SignedManifest
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&sm); err != nil {
return nil, err
}
return &sm, nil
default:
return nil, handleErrorResponse(resp)
}
}
func (ms *manifests) Put(m *manifest.SignedManifest) error {
manifestURL, err := ms.ub.BuildManifestURL(ms.name, m.Tag)
if err != nil {
return err
}
putRequest, err := http.NewRequest("PUT", manifestURL, bytes.NewReader(m.Raw))
if err != nil {
return err
}
resp, err := ms.client.Do(putRequest)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusAccepted:
// TODO(dmcgowan): make use of digest header
return nil
default:
return handleErrorResponse(resp)
}
}
func (ms *manifests) Delete(dgst digest.Digest) error {
u, err := ms.ub.BuildManifestURL(ms.name, dgst.String())
if err != nil {
return err
}
req, err := http.NewRequest("DELETE", u, nil)
if err != nil {
return err
}
resp, err := ms.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
return nil
default:
return handleErrorResponse(resp)
}
}
type blobs struct {
name string
ub *v2.URLBuilder
client *http.Client
statter distribution.BlobStatter
}
func sanitizeLocation(location, source string) (string, error) {
locationURL, err := url.Parse(location)
if err != nil {
return "", err
}
if locationURL.Scheme == "" {
sourceURL, err := url.Parse(source)
if err != nil {
return "", err
}
locationURL = &url.URL{
Scheme: sourceURL.Scheme,
Host: sourceURL.Host,
Path: location,
}
location = locationURL.String()
}
return location, nil
}
func (bs *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return bs.statter.Stat(ctx, dgst)
}
func (bs *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
desc, err := bs.Stat(ctx, dgst)
if err != nil {
return nil, err
}
reader, err := bs.Open(ctx, desc.Digest)
if err != nil {
return nil, err
}
defer reader.Close()
return ioutil.ReadAll(reader)
}
func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
stat, err := bs.statter.Stat(ctx, dgst)
if err != nil {
return nil, err
}
blobURL, err := bs.ub.BuildBlobURL(bs.name, stat.Digest)
if err != nil {
return nil, err
}
return transport.NewHTTPReadSeeker(bs.client, blobURL, stat.Length), nil
}
func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
panic("not implemented")
}
func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
writer, err := bs.Create(ctx)
if err != nil {
return distribution.Descriptor{}, err
}
dgstr := digest.NewCanonicalDigester()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr))
if err != nil {
return distribution.Descriptor{}, err
}
if n < int64(len(p)) {
return distribution.Descriptor{}, fmt.Errorf("short copy: wrote %d of %d", n, len(p))
}
desc := distribution.Descriptor{
MediaType: mediaType,
Length: int64(len(p)),
Digest: dgstr.Digest(),
}
return writer.Commit(ctx, desc)
}
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
u, err := bs.ub.BuildBlobUploadURL(bs.name)
resp, err := bs.client.Post(u, "", nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusAccepted:
// TODO(dmcgowan): Check for invalid UUID
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return nil, err
}
return &httpBlobUpload{
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}, nil
default:
return nil, handleErrorResponse(resp)
}
}
func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
panic("not implemented")
}
type blobStatter struct {
name string
ub *v2.URLBuilder
client *http.Client
}
func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := bs.ub.BuildBlobURL(bs.name, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Head(u)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
lengthHeader := resp.Header.Get("Content-Length")
length, err := strconv.ParseInt(lengthHeader, 10, 64)
if err != nil {
return distribution.Descriptor{}, fmt.Errorf("error parsing content-length: %v", err)
}
return distribution.Descriptor{
MediaType: resp.Header.Get("Content-Type"),
Length: length,
Digest: dgst,
}, nil
case http.StatusNotFound:
return distribution.Descriptor{}, distribution.ErrBlobUnknown
default:
return distribution.Descriptor{}, handleErrorResponse(resp)
}
}

View File

@ -0,0 +1,681 @@
package client
import (
"bytes"
"crypto/rand"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/testutil"
)
func testServer(rrm testutil.RequestResponseMap) (string, func()) {
h := testutil.NewHandler(rrm)
s := httptest.NewServer(h)
return s.URL, s.Close
}
func newRandomBlob(size int) (digest.Digest, []byte) {
b := make([]byte, size)
if n, err := rand.Read(b); err != nil {
panic(err)
} else if n != size {
panic("unable to read enough bytes")
}
dgst, err := digest.FromBytes(b)
if err != nil {
panic(err)
}
return dgst, b
}
func addTestFetch(repo string, dgst digest.Digest, content []byte, m *testutil.RequestResponseMap) {
*m = append(*m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: content,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
*m = append(*m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
}
func TestBlobFetch(t *testing.T) {
d1, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap
addTestFetch("test.example.com/repo1", d1, b1, &m)
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, "test.example.com/repo1", e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
b, err := l.Get(ctx, d1)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(b, b1) != 0 {
t.Fatalf("Wrong bytes values fetched: [%d]byte != [%d]byte", len(b), len(b1))
}
// TODO(dmcgowan): Test for unknown blob case
}
func TestBlobExists(t *testing.T) {
d1, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap
addTestFetch("test.example.com/repo1", d1, b1, &m)
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, "test.example.com/repo1", e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
stat, err := l.Stat(ctx, d1)
if err != nil {
t.Fatal(err)
}
if stat.Digest != d1 {
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, d1)
}
if stat.Length != int64(len(b1)) {
t.Fatalf("Unexpected length: %d, expected %d", stat.Length, len(b1))
}
// TODO(dmcgowan): Test error cases and ErrBlobUnknown case
}
func TestBlobUploadChunked(t *testing.T) {
dgst, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap
chunks := [][]byte{
b1[0:256],
b1[256:512],
b1[512:513],
b1[513:1024],
}
repo := "test.example.com/uploadrepo"
uuids := []string{uuid.New()}
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
Route: "/v2/" + repo + "/blobs/uploads/",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo + "/blobs/uploads/" + uuids[0]},
"Docker-Upload-UUID": {uuids[0]},
"Range": {"0-0"},
}),
},
})
offset := 0
for i, chunk := range chunks {
uuids = append(uuids, uuid.New())
newOffset := offset + len(chunk)
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PATCH",
Route: "/v2/" + repo + "/blobs/uploads/" + uuids[i],
Body: chunk,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo + "/blobs/uploads/" + uuids[i+1]},
"Docker-Upload-UUID": {uuids[i+1]},
"Range": {fmt.Sprintf("%d-%d", offset, newOffset-1)},
}),
},
})
offset = newOffset
}
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + repo + "/blobs/uploads/" + uuids[len(uuids)-1],
QueryParams: map[string][]string{
"digest": {dgst.String()},
},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
"Content-Range": {fmt.Sprintf("0-%d", offset-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(offset)},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
upload, err := l.Create(ctx)
if err != nil {
t.Fatal(err)
}
if upload.ID() != uuids[0] {
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uuids[0])
}
for _, chunk := range chunks {
n, err := upload.Write(chunk)
if err != nil {
t.Fatal(err)
}
if n != len(chunk) {
t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, len(chunk))
}
}
blob, err := upload.Commit(ctx, distribution.Descriptor{
Digest: dgst,
Length: int64(len(b1)),
})
if err != nil {
t.Fatal(err)
}
if blob.Length != int64(len(b1)) {
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Length, len(b1))
}
}
func TestBlobUploadMonolithic(t *testing.T) {
dgst, b1 := newRandomBlob(1024)
var m testutil.RequestResponseMap
repo := "test.example.com/uploadrepo"
uploadID := uuid.New()
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
Route: "/v2/" + repo + "/blobs/uploads/",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo + "/blobs/uploads/" + uploadID},
"Docker-Upload-UUID": {uploadID},
"Range": {"0-0"},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PATCH",
Route: "/v2/" + repo + "/blobs/uploads/" + uploadID,
Body: b1,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Location": {"/v2/" + repo + "/blobs/uploads/" + uploadID},
"Docker-Upload-UUID": {uploadID},
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
"Range": {fmt.Sprintf("0-%d", len(b1)-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + repo + "/blobs/uploads/" + uploadID,
QueryParams: map[string][]string{
"digest": {dgst.String()},
},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
"Content-Range": {fmt.Sprintf("0-%d", len(b1)-1)},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(b1))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
upload, err := l.Create(ctx)
if err != nil {
t.Fatal(err)
}
if upload.ID() != uploadID {
log.Fatalf("Unexpected UUID %s; expected %s", upload.ID(), uploadID)
}
n, err := upload.ReadFrom(bytes.NewReader(b1))
if err != nil {
t.Fatal(err)
}
if n != int64(len(b1)) {
t.Fatalf("Unexpected ReadFrom length: %d; expected: %d", n, len(b1))
}
blob, err := upload.Commit(ctx, distribution.Descriptor{
Digest: dgst,
Length: int64(len(b1)),
})
if err != nil {
t.Fatal(err)
}
if blob.Length != int64(len(b1)) {
t.Fatalf("Unexpected blob size: %d; expected: %d", blob.Length, len(b1))
}
}
func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*manifest.SignedManifest, digest.Digest) {
blobs := make([]manifest.FSLayer, blobCount)
history := make([]manifest.History, blobCount)
for i := 0; i < blobCount; i++ {
dgst, blob := newRandomBlob((i % 5) * 16)
blobs[i] = manifest.FSLayer{BlobSum: dgst}
history[i] = manifest.History{V1Compatibility: fmt.Sprintf("{\"Hex\": \"%x\"}", blob)}
}
m := &manifest.SignedManifest{
Manifest: manifest.Manifest{
Name: name,
Tag: tag,
Architecture: "x86",
FSLayers: blobs,
History: history,
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
},
}
manifestBytes, err := json.Marshal(m)
if err != nil {
panic(err)
}
dgst, err := digest.FromBytes(manifestBytes)
if err != nil {
panic(err)
}
m.Raw = manifestBytes
return m, dgst
}
func addTestManifest(repo, reference string, content []byte, m *testutil.RequestResponseMap) {
*m = append(*m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + repo + "/manifests/" + reference,
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: content,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
*m = append(*m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/manifests/" + reference,
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
}
func checkEqualManifest(m1, m2 *manifest.SignedManifest) error {
if m1.Name != m2.Name {
return fmt.Errorf("name does not match %q != %q", m1.Name, m2.Name)
}
if m1.Tag != m2.Tag {
return fmt.Errorf("tag does not match %q != %q", m1.Tag, m2.Tag)
}
if len(m1.FSLayers) != len(m2.FSLayers) {
return fmt.Errorf("fs blob length does not match %d != %d", len(m1.FSLayers), len(m2.FSLayers))
}
for i := range m1.FSLayers {
if m1.FSLayers[i].BlobSum != m2.FSLayers[i].BlobSum {
return fmt.Errorf("blobsum does not match %q != %q", m1.FSLayers[i].BlobSum, m2.FSLayers[i].BlobSum)
}
}
if len(m1.History) != len(m2.History) {
return fmt.Errorf("history length does not match %d != %d", len(m1.History), len(m2.History))
}
for i := range m1.History {
if m1.History[i].V1Compatibility != m2.History[i].V1Compatibility {
return fmt.Errorf("blobsum does not match %q != %q", m1.History[i].V1Compatibility, m2.History[i].V1Compatibility)
}
}
return nil
}
func TestManifestFetch(t *testing.T) {
repo := "test.example.com/repo"
m1, dgst := newRandomSchemaV1Manifest(repo, "latest", 6)
var m testutil.RequestResponseMap
addTestManifest(repo, dgst.String(), m1.Raw, &m)
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
ok, err := ms.Exists(dgst)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Manifest does not exist")
}
manifest, err := ms.Get(dgst)
if err != nil {
t.Fatal(err)
}
if err := checkEqualManifest(manifest, m1); err != nil {
t.Fatal(err)
}
}
func TestManifestFetchByTag(t *testing.T) {
repo := "test.example.com/repo/by/tag"
m1, _ := newRandomSchemaV1Manifest(repo, "latest", 6)
var m testutil.RequestResponseMap
addTestManifest(repo, "latest", m1.Raw, &m)
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
ok, err := ms.ExistsByTag("latest")
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Manifest does not exist")
}
manifest, err := ms.GetByTag("latest")
if err != nil {
t.Fatal(err)
}
if err := checkEqualManifest(manifest, m1); err != nil {
t.Fatal(err)
}
}
func TestManifestDelete(t *testing.T) {
repo := "test.example.com/repo/delete"
_, dgst1 := newRandomSchemaV1Manifest(repo, "latest", 6)
_, dgst2 := newRandomSchemaV1Manifest(repo, "latest", 6)
var m testutil.RequestResponseMap
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "DELETE",
Route: "/v2/" + repo + "/manifests/" + dgst1.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
}),
},
})
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
if err := ms.Delete(dgst1); err != nil {
t.Fatal(err)
}
if err := ms.Delete(dgst2); err == nil {
t.Fatal("Expected error deleting unknown manifest")
}
// TODO(dmcgowan): Check for specific unknown error
}
func TestManifestPut(t *testing.T) {
repo := "test.example.com/repo/delete"
m1, dgst := newRandomSchemaV1Manifest(repo, "other", 6)
var m testutil.RequestResponseMap
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + repo + "/manifests/other",
Body: m1.Raw,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Docker-Content-Digest": {dgst.String()},
}),
},
})
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
if err := ms.Put(m1); err != nil {
t.Fatal(err)
}
// TODO(dmcgowan): Check for invalid input error
}
func TestManifestTags(t *testing.T) {
repo := "test.example.com/repo/tags/list"
tagsList := []byte(strings.TrimSpace(`
{
"name": "test.example.com/repo/tags/list",
"tags": [
"tag1",
"tag2",
"funtag"
]
}
`))
var m testutil.RequestResponseMap
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + repo + "/tags/list",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: tagsList,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(tagsList))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
tags, err := ms.Tags()
if err != nil {
t.Fatal(err)
}
if len(tags) != 3 {
t.Fatalf("Wrong number of tags returned: %d, expected 3", len(tags))
}
// TODO(dmcgowan): Check array
// TODO(dmcgowan): Check for error cases
}
func TestManifestUnauthorized(t *testing.T) {
repo := "test.example.com/repo"
_, dgst := newRandomSchemaV1Manifest(repo, "latest", 6)
var m testutil.RequestResponseMap
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + repo + "/manifests/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusUnauthorized,
Body: []byte("<html>garbage</html>"),
},
})
e, c := testServer(m)
defer c()
r, err := NewRepository(context.Background(), repo, e, nil)
if err != nil {
t.Fatal(err)
}
ms := r.Manifests()
_, err = ms.Get(dgst)
if err == nil {
t.Fatal("Expected error fetching manifest")
}
v2Err, ok := err.(*v2.Error)
if !ok {
t.Fatalf("Unexpected error type: %#v", err)
}
if v2Err.Code != v2.ErrorCodeUnauthorized {
t.Fatalf("Unexpected error code: %s", v2Err.Code.String())
}
if expected := "401 Unauthorized"; v2Err.Message != expected {
t.Fatalf("Unexpected message value: %s, expected %s", v2Err.Message, expected)
}
}

View File

@ -0,0 +1,150 @@
package transport
import (
"net/http"
"strings"
)
// Octet types from RFC 2616.
type octetType byte
// authorizationChallenge carries information
// from a WWW-Authenticate response header.
type authorizationChallenge struct {
Scheme string
Parameters map[string]string
}
var octetTypes [256]octetType
const (
isToken octetType = 1 << iota
isSpace
)
func init() {
// OCTET = <any 8-bit sequence of data>
// CHAR = <any US-ASCII character (octets 0 - 127)>
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
// CR = <US-ASCII CR, carriage return (13)>
// LF = <US-ASCII LF, linefeed (10)>
// SP = <US-ASCII SP, space (32)>
// HT = <US-ASCII HT, horizontal-tab (9)>
// <"> = <US-ASCII double-quote mark (34)>
// CRLF = CR LF
// LWS = [CRLF] 1*( SP | HT )
// TEXT = <any OCTET except CTLs, but including LWS>
// separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <">
// | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT
// token = 1*<any CHAR except CTLs or separators>
// qdtext = <any TEXT except <">>
for c := 0; c < 256; c++ {
var t octetType
isCtl := c <= 31 || c == 127
isChar := 0 <= c && c <= 127
isSeparator := strings.IndexRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) >= 0
if strings.IndexRune(" \t\r\n", rune(c)) >= 0 {
t |= isSpace
}
if isChar && !isCtl && !isSeparator {
t |= isToken
}
octetTypes[c] = t
}
}
func parseAuthHeader(header http.Header) map[string]authorizationChallenge {
challenges := map[string]authorizationChallenge{}
for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] {
v, p := parseValueAndParams(h)
if v != "" {
challenges[v] = authorizationChallenge{Scheme: v, Parameters: p}
}
}
return challenges
}
func parseValueAndParams(header string) (value string, params map[string]string) {
params = make(map[string]string)
value, s := expectToken(header)
if value == "" {
return
}
value = strings.ToLower(value)
s = "," + skipSpace(s)
for strings.HasPrefix(s, ",") {
var pkey string
pkey, s = expectToken(skipSpace(s[1:]))
if pkey == "" {
return
}
if !strings.HasPrefix(s, "=") {
return
}
var pvalue string
pvalue, s = expectTokenOrQuoted(s[1:])
if pvalue == "" {
return
}
pkey = strings.ToLower(pkey)
params[pkey] = pvalue
s = skipSpace(s)
}
return
}
func skipSpace(s string) (rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isSpace == 0 {
break
}
}
return s[i:]
}
func expectToken(s string) (token, rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isToken == 0 {
break
}
}
return s[:i], s[i:]
}
func expectTokenOrQuoted(s string) (value string, rest string) {
if !strings.HasPrefix(s, "\"") {
return expectToken(s)
}
s = s[1:]
for i := 0; i < len(s); i++ {
switch s[i] {
case '"':
return s[:i], s[i+1:]
case '\\':
p := make([]byte, len(s)-1)
j := copy(p, s[:i])
escape := true
for i = i + 1; i < len(s); i++ {
b := s[i]
switch {
case escape:
escape = false
p[j] = b
j++
case b == '\\':
escape = true
case b == '"':
return string(p[:j]), s[i+1:]
default:
p[j] = b
j++
}
}
return "", ""
}
}
return "", ""
}

View File

@ -0,0 +1,38 @@
package transport
import (
"net/http"
"testing"
)
func TestAuthChallengeParse(t *testing.T) {
header := http.Header{}
header.Add("WWW-Authenticate", `Bearer realm="https://auth.example.com/token",service="registry.example.com",other=fun,slashed="he\"\l\lo"`)
challenges := parseAuthHeader(header)
if len(challenges) != 1 {
t.Fatalf("Unexpected number of auth challenges: %d, expected 1", len(challenges))
}
challenge := challenges["bearer"]
if expected := "bearer"; challenge.Scheme != expected {
t.Fatalf("Unexpected scheme: %s, expected: %s", challenge.Scheme, expected)
}
if expected := "https://auth.example.com/token"; challenge.Parameters["realm"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["realm"], expected)
}
if expected := "registry.example.com"; challenge.Parameters["service"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["service"], expected)
}
if expected := "fun"; challenge.Parameters["other"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["other"], expected)
}
if expected := "he\"llo"; challenge.Parameters["slashed"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["slashed"], expected)
}
}

View File

@ -0,0 +1,172 @@
package transport
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
)
// ReadSeekCloser combines io.ReadSeeker with io.Closer.
type ReadSeekCloser interface {
io.ReadSeeker
io.Closer
}
// NewHTTPReadSeeker handles reading from an HTTP endpoint using a GET
// request. When seeking and starting a read from a non-zero offset
// the a "Range" header will be added which sets the offset.
// TODO(dmcgowan): Move this into a separate utility package
func NewHTTPReadSeeker(client *http.Client, url string, size int64) ReadSeekCloser {
return &httpReadSeeker{
client: client,
url: url,
size: size,
}
}
type httpReadSeeker struct {
client *http.Client
url string
size int64
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64
err error
}
func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
if hrs.err != nil {
return 0, hrs.err
}
rd, err := hrs.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
hrs.offset += int64(n)
// Simulate io.EOF error if we reach filesize.
if err == nil && hrs.offset >= hrs.size {
err = io.EOF
}
return n, err
}
func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
if hrs.err != nil {
return 0, hrs.err
}
var err error
newOffset := hrs.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = hrs.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = errors.New("cannot seek to negative position")
} else {
if hrs.offset != newOffset {
hrs.reset()
}
// No problems, set the offset.
hrs.offset = newOffset
}
return hrs.offset, err
}
func (hrs *httpReadSeeker) Close() error {
if hrs.err != nil {
return hrs.err
}
// close and release reader chain
if hrs.rc != nil {
hrs.rc.Close()
}
hrs.rc = nil
hrs.brd = nil
hrs.err = errors.New("httpLayer: closed")
return nil
}
func (hrs *httpReadSeeker) reset() {
if hrs.err != nil {
return
}
if hrs.rc != nil {
hrs.rc.Close()
hrs.rc = nil
}
}
func (hrs *httpReadSeeker) reader() (io.Reader, error) {
if hrs.err != nil {
return nil, hrs.err
}
if hrs.rc != nil {
return hrs.brd, nil
}
// If the offset is great than or equal to size, return a empty, noop reader.
if hrs.offset >= hrs.size {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
req, err := http.NewRequest("GET", hrs.url, nil)
if err != nil {
return nil, err
}
if hrs.offset > 0 {
// TODO(stevvooe): Get this working correctly.
// If we are at different offset, issue a range request from there.
req.Header.Add("Range", "1-")
// TODO: get context in here
// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
}
resp, err := hrs.client.Do(req)
if err != nil {
return nil, err
}
switch {
case resp.StatusCode == 200:
hrs.rc = resp.Body
default:
defer resp.Body.Close()
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
}
if hrs.brd == nil {
hrs.brd = bufio.NewReader(hrs.rc)
} else {
hrs.brd.Reset(hrs.rc)
}
return hrs.brd, nil
}

View File

@ -0,0 +1,297 @@
package transport
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
// AuthenticationHandler is an interface for authorizing a request from
// params from a "WWW-Authenicate" header for a single scheme.
type AuthenticationHandler interface {
// Scheme returns the scheme as expected from the "WWW-Authenicate" header.
Scheme() string
// AuthorizeRequest adds the authorization header to a request (if needed)
// using the parameters from "WWW-Authenticate" method. The parameters
// values depend on the scheme.
AuthorizeRequest(req *http.Request, params map[string]string) error
}
// CredentialStore is an interface for getting credentials for
// a given URL
type CredentialStore interface {
// Basic returns basic auth for the given URL
Basic(*url.URL) (string, string)
}
// NewAuthorizer creates an authorizer which can handle multiple authentication
// schemes. The handlers are tried in order, the higher priority authentication
// methods should be first.
func NewAuthorizer(transport http.RoundTripper, handlers ...AuthenticationHandler) RequestModifier {
return &tokenAuthorizer{
challenges: map[string]map[string]authorizationChallenge{},
handlers: handlers,
transport: transport,
}
}
type tokenAuthorizer struct {
challenges map[string]map[string]authorizationChallenge
handlers []AuthenticationHandler
transport http.RoundTripper
}
func (ta *tokenAuthorizer) ping(endpoint string) (map[string]authorizationChallenge, error) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: ta.transport,
// Ping should fail fast
Timeout: 5 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// TODO(dmcgowan): Add version string which would allow skipping this section
var supportsV2 bool
HeaderLoop:
for _, supportedVersions := range resp.Header[http.CanonicalHeaderKey("Docker-Distribution-API-Version")] {
for _, versionName := range strings.Fields(supportedVersions) {
if versionName == "registry/2.0" {
supportsV2 = true
break HeaderLoop
}
}
}
if !supportsV2 {
return nil, fmt.Errorf("%s does not appear to be a v2 registry endpoint", endpoint)
}
if resp.StatusCode == http.StatusUnauthorized {
// Parse the WWW-Authenticate Header and store the challenges
// on this endpoint object.
return parseAuthHeader(resp.Header), nil
} else if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unable to get valid ping response: %d", resp.StatusCode)
}
return nil, nil
}
func (ta *tokenAuthorizer) ModifyRequest(req *http.Request) error {
v2Root := strings.Index(req.URL.Path, "/v2/")
// Test if /v2/ does not exist or not at beginning
// TODO(dmcgowan) support v2 endpoints which have a prefix before /v2/
if v2Root == -1 || v2Root > 0 {
return nil
}
ping := url.URL{
Host: req.URL.Host,
Scheme: req.URL.Scheme,
Path: req.URL.Path[:v2Root+4],
}
pingEndpoint := ping.String()
challenges, ok := ta.challenges[pingEndpoint]
if !ok {
var err error
challenges, err = ta.ping(pingEndpoint)
if err != nil {
return err
}
ta.challenges[pingEndpoint] = challenges
}
for _, handler := range ta.handlers {
challenge, ok := challenges[handler.Scheme()]
if ok {
if err := handler.AuthorizeRequest(req, challenge.Parameters); err != nil {
return err
}
}
}
return nil
}
type tokenHandler struct {
header http.Header
creds CredentialStore
scope TokenScope
transport http.RoundTripper
tokenLock sync.Mutex
tokenCache string
tokenExpiration time.Time
}
// TokenScope represents the scope at which a token will be requested.
// This represents a specific action on a registry resource.
type TokenScope struct {
Resource string
Scope string
Actions []string
}
func (ts TokenScope) String() string {
return fmt.Sprintf("%s:%s:%s", ts.Resource, ts.Scope, strings.Join(ts.Actions, ","))
}
// NewTokenHandler creates a new AuthenicationHandler which supports
// fetching tokens from a remote token server.
func NewTokenHandler(transport http.RoundTripper, creds CredentialStore, scope TokenScope) AuthenticationHandler {
return &tokenHandler{
transport: transport,
creds: creds,
scope: scope,
}
}
func (th *tokenHandler) client() *http.Client {
return &http.Client{
Transport: th.transport,
Timeout: 15 * time.Second,
}
}
func (th *tokenHandler) Scheme() string {
return "bearer"
}
func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if err := th.refreshToken(params); err != nil {
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", th.tokenCache))
return nil
}
func (th *tokenHandler) refreshToken(params map[string]string) error {
th.tokenLock.Lock()
defer th.tokenLock.Unlock()
now := time.Now()
if now.After(th.tokenExpiration) {
token, err := th.fetchToken(params)
if err != nil {
return err
}
th.tokenCache = token
th.tokenExpiration = now.Add(time.Minute)
}
return nil
}
type tokenResponse struct {
Token string `json:"token"`
}
func (th *tokenHandler) fetchToken(params map[string]string) (token string, err error) {
//log.Debugf("Getting bearer token with %s for %s", challenge.Parameters, ta.auth.Username)
realm, ok := params["realm"]
if !ok {
return "", errors.New("no realm specified for token auth challenge")
}
// TODO(dmcgowan): Handle empty scheme
realmURL, err := url.Parse(realm)
if err != nil {
return "", fmt.Errorf("invalid token auth challenge realm: %s", err)
}
req, err := http.NewRequest("GET", realmURL.String(), nil)
if err != nil {
return "", err
}
reqParams := req.URL.Query()
service := params["service"]
scope := th.scope.String()
if service != "" {
reqParams.Add("service", service)
}
for _, scopeField := range strings.Fields(scope) {
reqParams.Add("scope", scopeField)
}
if th.creds != nil {
username, password := th.creds.Basic(realmURL)
if username != "" && password != "" {
reqParams.Add("account", username)
req.SetBasicAuth(username, password)
}
}
req.URL.RawQuery = reqParams.Encode()
resp, err := th.client().Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token auth attempt for registry: %s request failed with status: %d %s", req.URL, resp.StatusCode, http.StatusText(resp.StatusCode))
}
decoder := json.NewDecoder(resp.Body)
tr := new(tokenResponse)
if err = decoder.Decode(tr); err != nil {
return "", fmt.Errorf("unable to decode token response: %s", err)
}
if tr.Token == "" {
return "", errors.New("authorization server did not include a token in the response")
}
return tr.Token, nil
}
type basicHandler struct {
creds CredentialStore
}
// NewBasicHandler creaters a new authentiation handler which adds
// basic authentication credentials to a request.
func NewBasicHandler(creds CredentialStore) AuthenticationHandler {
return &basicHandler{
creds: creds,
}
}
func (*basicHandler) Scheme() string {
return "basic"
}
func (bh *basicHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if bh.creds != nil {
username, password := bh.creds.Basic(req.URL)
if username != "" && password != "" {
req.SetBasicAuth(username, password)
return nil
}
}
return errors.New("no basic auth credentials")
}

View File

@ -0,0 +1,271 @@
package transport
import (
"encoding/base64"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/docker/distribution/testutil"
)
func testServer(rrm testutil.RequestResponseMap) (string, func()) {
h := testutil.NewHandler(rrm)
s := httptest.NewServer(h)
return s.URL, s.Close
}
type testAuthenticationWrapper struct {
headers http.Header
authCheck func(string) bool
next http.Handler
}
func (w *testAuthenticationWrapper) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if auth == "" || !w.authCheck(auth) {
h := rw.Header()
for k, values := range w.headers {
h[k] = values
}
rw.WriteHeader(http.StatusUnauthorized)
return
}
w.next.ServeHTTP(rw, r)
}
func testServerWithAuth(rrm testutil.RequestResponseMap, authenticate string, authCheck func(string) bool) (string, func()) {
h := testutil.NewHandler(rrm)
wrapper := &testAuthenticationWrapper{
headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
"WWW-Authenticate": {authenticate},
}),
authCheck: authCheck,
next: h,
}
s := httptest.NewServer(wrapper)
return s.URL, s.Close
}
type testCredentialStore struct {
username string
password string
}
func (tcs *testCredentialStore) Basic(*url.URL) (string, string) {
return tcs.username, tcs.password
}
func TestEndpointAuthorizeToken(t *testing.T) {
service := "localhost.localdomain"
repo1 := "some/registry"
repo2 := "other/registry"
scope1 := fmt.Sprintf("repository:%s:pull,push", repo1)
scope2 := fmt.Sprintf("repository:%s:pull,push", repo2)
tokenScope1 := TokenScope{
Resource: "repository",
Scope: repo1,
Actions: []string{"pull", "push"},
}
tokenScope2 := TokenScope{
Resource: "repository",
Scope: repo2,
Actions: []string{"pull", "push"},
}
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: fmt.Sprintf("/token?scope=%s&service=%s", url.QueryEscape(scope1), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"statictoken"}`),
},
},
{
Request: testutil.Request{
Method: "GET",
Route: fmt.Sprintf("/token?scope=%s&service=%s", url.QueryEscape(scope2), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"badtoken"}`),
},
},
})
te, tc := testServer(tokenMap)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
validCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
transport1 := NewTransport(nil, NewAuthorizer(nil, NewTokenHandler(nil, nil, tokenScope1)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest("GET", e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
badCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e2, c2 := testServerWithAuth(m, authenicate, badCheck)
defer c2()
transport2 := NewTransport(nil, NewAuthorizer(nil, NewTokenHandler(nil, nil, tokenScope2)))
client2 := &http.Client{Transport: transport2}
req, _ = http.NewRequest("GET", e2+"/v2/hello", nil)
resp, err = client2.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusUnauthorized)
}
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
func TestEndpointAuthorizeTokenBasic(t *testing.T) {
service := "localhost.localdomain"
repo := "some/fun/registry"
scope := fmt.Sprintf("repository:%s:pull,push", repo)
username := "tokenuser"
password := "superSecretPa$$word"
tokenScope := TokenScope{
Resource: "repository",
Scope: repo,
Actions: []string{"pull", "push"},
}
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"statictoken"}`),
},
},
})
authenicate1 := fmt.Sprintf("Basic realm=localhost")
basicCheck := func(a string) bool {
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
te, tc := testServerWithAuth(tokenMap, authenicate1, basicCheck)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate2 := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
bearerCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate2, bearerCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
transport1 := NewTransport(nil, NewAuthorizer(nil, NewTokenHandler(nil, creds, tokenScope), NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest("GET", e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
}
func TestEndpointAuthorizeBasic(t *testing.T) {
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
username := "user1"
password := "funSecretPa$$word"
authenicate := fmt.Sprintf("Basic realm=localhost")
validCheck := func(a string) bool {
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
transport1 := NewTransport(nil, NewAuthorizer(nil, NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest("GET", e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
}

View File

@ -0,0 +1,147 @@
package transport
import (
"io"
"net/http"
"sync"
)
// RequestModifier represents an object which will do an inplace
// modification of an HTTP request.
type RequestModifier interface {
ModifyRequest(*http.Request) error
}
type headerModifier http.Header
// NewHeaderRequestModifier returns a new RequestModifier which will
// add the given headers to a request.
func NewHeaderRequestModifier(header http.Header) RequestModifier {
return headerModifier(header)
}
func (h headerModifier) ModifyRequest(req *http.Request) error {
for k, s := range http.Header(h) {
req.Header[k] = append(req.Header[k], s...)
}
return nil
}
// NewTransport creates a new transport which will apply modifiers to
// the request on a RoundTrip call.
func NewTransport(base http.RoundTripper, modifiers ...RequestModifier) http.RoundTripper {
return &transport{
Modifiers: modifiers,
Base: base,
}
}
// transport is an http.RoundTripper that makes HTTP requests after
// copying and modifying the request
type transport struct {
Modifiers []RequestModifier
Base http.RoundTripper
mu sync.Mutex // guards modReq
modReq map[*http.Request]*http.Request // original -> modified
}
// RoundTrip authorizes and authenticates the request with an
// access token. If no token exists or token is expired,
// tries to refresh/fetch a new token.
func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
req2 := cloneRequest(req)
for _, modifier := range t.Modifiers {
if err := modifier.ModifyRequest(req2); err != nil {
return nil, err
}
}
t.setModReq(req, req2)
res, err := t.base().RoundTrip(req2)
if err != nil {
t.setModReq(req, nil)
return nil, err
}
res.Body = &onEOFReader{
rc: res.Body,
fn: func() { t.setModReq(req, nil) },
}
return res, nil
}
// CancelRequest cancels an in-flight request by closing its connection.
func (t *transport) CancelRequest(req *http.Request) {
type canceler interface {
CancelRequest(*http.Request)
}
if cr, ok := t.base().(canceler); ok {
t.mu.Lock()
modReq := t.modReq[req]
delete(t.modReq, req)
t.mu.Unlock()
cr.CancelRequest(modReq)
}
}
func (t *transport) base() http.RoundTripper {
if t.Base != nil {
return t.Base
}
return http.DefaultTransport
}
func (t *transport) setModReq(orig, mod *http.Request) {
t.mu.Lock()
defer t.mu.Unlock()
if t.modReq == nil {
t.modReq = make(map[*http.Request]*http.Request)
}
if mod == nil {
delete(t.modReq, orig)
} else {
t.modReq[orig] = mod
}
}
// cloneRequest returns a clone of the provided *http.Request.
// The clone is a shallow copy of the struct and its Header map.
func cloneRequest(r *http.Request) *http.Request {
// shallow copy of the struct
r2 := new(http.Request)
*r2 = *r
// deep copy of the Header
r2.Header = make(http.Header, len(r.Header))
for k, s := range r.Header {
r2.Header[k] = append([]string(nil), s...)
}
return r2
}
type onEOFReader struct {
rc io.ReadCloser
fn func()
}
func (r *onEOFReader) Read(p []byte) (n int, err error) {
n, err = r.rc.Read(p)
if err == io.EOF {
r.runFunc()
}
return
}
func (r *onEOFReader) Close() error {
err := r.rc.Close()
r.runFunc()
return err
}
func (r *onEOFReader) runFunc() {
if fn := r.fn; fn != nil {
fn()
r.fn = nil
}
}

View File

@ -0,0 +1,60 @@
package storage
import (
"expvar"
"sync/atomic"
"github.com/docker/distribution/registry/storage/cache"
)
type blobStatCollector struct {
metrics cache.Metrics
}
func (bsc *blobStatCollector) Hit() {
atomic.AddUint64(&bsc.metrics.Requests, 1)
atomic.AddUint64(&bsc.metrics.Hits, 1)
}
func (bsc *blobStatCollector) Miss() {
atomic.AddUint64(&bsc.metrics.Requests, 1)
atomic.AddUint64(&bsc.metrics.Misses, 1)
}
func (bsc *blobStatCollector) Metrics() cache.Metrics {
return bsc.metrics
}
// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor
// cache requests. Note this is kept globally and made available via expvar.
// For more detailed metrics, its recommend to instrument a particular cache
// implementation.
var blobStatterCacheMetrics cache.MetricsTracker = &blobStatCollector{}
func init() {
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
cache := registry.(*expvar.Map).Get("cache")
if cache == nil {
cache = &expvar.Map{}
cache.(*expvar.Map).Init()
registry.(*expvar.Map).Set("cache", cache)
}
storage := cache.(*expvar.Map).Get("storage")
if storage == nil {
storage = &expvar.Map{}
storage.(*expvar.Map).Init()
cache.(*expvar.Map).Set("storage", storage)
}
storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} {
// no need for synchronous access: the increments are atomic and
// during reading, we don't care if the data is up to date. The
// numbers will always *eventually* be reported correctly.
return blobStatterCacheMetrics
}))
}

View File

@ -0,0 +1,80 @@
package cache
import (
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution"
)
// Metrics is used to hold metric counters
// related to the number of times a cache was
// hit or missed.
type Metrics struct {
Requests uint64
Hits uint64
Misses uint64
}
// MetricsTracker represents a metric tracker
// which simply counts the number of hits and misses.
type MetricsTracker interface {
Hit()
Miss()
Metrics() Metrics
}
type cachedBlobStatter struct {
cache distribution.BlobDescriptorService
backend distribution.BlobStatter
tracker MetricsTracker
}
// NewCachedBlobStatter creates a new statter which prefers a cache and
// falls back to a backend.
func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend distribution.BlobStatter) distribution.BlobStatter {
return &cachedBlobStatter{
cache: cache,
backend: backend,
}
}
// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and
// falls back to a backend. Hits and misses will send to the tracker.
func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobStatter, tracker MetricsTracker) distribution.BlobStatter {
return &cachedBlobStatter{
cache: cache,
backend: backend,
tracker: tracker,
}
}
func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := cbds.cache.Stat(ctx, dgst)
if err != nil {
if err != distribution.ErrBlobUnknown {
context.GetLogger(ctx).Errorf("error retrieving descriptor from cache: %v", err)
}
goto fallback
}
if cbds.tracker != nil {
cbds.tracker.Hit()
}
return desc, nil
fallback:
if cbds.tracker != nil {
cbds.tracker.Miss()
}
desc, err = cbds.backend.Stat(ctx, dgst)
if err != nil {
return desc, err
}
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
}
return desc, err
}

View File

@ -1,84 +0,0 @@
package storage
import (
"expvar"
"sync/atomic"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution"
)
type cachedBlobStatter struct {
cache distribution.BlobDescriptorService
backend distribution.BlobStatter
}
func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Requests, 1)
desc, err := cbds.cache.Stat(ctx, dgst)
if err != nil {
if err != distribution.ErrBlobUnknown {
context.GetLogger(ctx).Errorf("error retrieving descriptor from cache: %v", err)
}
goto fallback
}
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Hits, 1)
return desc, nil
fallback:
atomic.AddUint64(&blobStatterCacheMetrics.Stat.Misses, 1)
desc, err = cbds.backend.Stat(ctx, dgst)
if err != nil {
return desc, err
}
if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil {
context.GetLogger(ctx).Errorf("error adding descriptor %v to cache: %v", desc.Digest, err)
}
return desc, err
}
// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor
// cache requests. Note this is kept globally and made available via expvar.
// For more detailed metrics, its recommend to instrument a particular cache
// implementation.
var blobStatterCacheMetrics struct {
// Stat tracks calls to the caches.
Stat struct {
Requests uint64
Hits uint64
Misses uint64
}
}
func init() {
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
cache := registry.(*expvar.Map).Get("cache")
if cache == nil {
cache = &expvar.Map{}
cache.(*expvar.Map).Init()
registry.(*expvar.Map).Set("cache", cache)
}
storage := cache.(*expvar.Map).Get("storage")
if storage == nil {
storage = &expvar.Map{}
storage.(*expvar.Map).Init()
cache.(*expvar.Map).Set("storage", storage)
}
storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} {
// no need for synchronous access: the increments are atomic and
// during reading, we don't care if the data is up to date. The
// numbers will always *eventually* be reported correctly.
return blobStatterCacheMetrics
}))
}

View File

@ -29,10 +29,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv
} }
if blobDescriptorCacheProvider != nil { if blobDescriptorCacheProvider != nil {
statter = &cachedBlobStatter{ statter = cache.NewCachedBlobStatter(blobDescriptorCacheProvider, statter)
cache: blobDescriptorCacheProvider,
backend: statter,
}
} }
bs := &blobStore{ bs := &blobStore{
@ -143,10 +140,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
} }
if repo.descriptorCache != nil { if repo.descriptorCache != nil {
statter = &cachedBlobStatter{ statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
cache: repo.descriptorCache,
backend: statter,
}
} }
return &linkedBlobStore{ return &linkedBlobStore{

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url"
"sort" "sort"
"strings" "strings"
) )
@ -40,16 +41,18 @@ type Request struct {
func (r Request) String() string { func (r Request) String() string {
queryString := "" queryString := ""
if len(r.QueryParams) > 0 { if len(r.QueryParams) > 0 {
queryString = "?"
keys := make([]string, 0, len(r.QueryParams)) keys := make([]string, 0, len(r.QueryParams))
queryParts := make([]string, 0, len(r.QueryParams))
for k := range r.QueryParams { for k := range r.QueryParams {
keys = append(keys, k) keys = append(keys, k)
} }
sort.Strings(keys) sort.Strings(keys)
for _, k := range keys { for _, k := range keys {
queryString += strings.Join(r.QueryParams[k], "&") + "&" for _, val := range r.QueryParams[k] {
queryParts = append(queryParts, fmt.Sprintf("%s=%s", k, url.QueryEscape(val)))
}
} }
queryString = queryString[:len(queryString)-1] queryString = "?" + strings.Join(queryParts, "&")
} }
return fmt.Sprintf("%s %s%s\n%s", r.Method, r.Route, queryString, r.Body) return fmt.Sprintf("%s %s%s\n%s", r.Method, r.Route, queryString, r.Body)
} }