diff --git a/docs/client/client.go b/docs/client/client.go new file mode 100644 index 00000000..c697e01c --- /dev/null +++ b/docs/client/client.go @@ -0,0 +1,574 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "regexp" + "strconv" + + "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" +) + +// Client implements the client interface to the registry http api +type Client interface { + // GetImageManifest returns an image manifest for the image at the given + // name, tag pair. + GetImageManifest(name, tag string) (*manifest.SignedManifest, error) + + // PutImageManifest uploads an image manifest for the image at the given + // name, tag pair. + PutImageManifest(name, tag string, imageManifest *manifest.SignedManifest) error + + // DeleteImage removes the image at the given name, tag pair. + DeleteImage(name, tag string) error + + // ListImageTags returns a list of all image tags with the given repository + // name. + ListImageTags(name string) ([]string, error) + + // BlobLength returns the length of the blob stored at the given name, + // digest pair. + // Returns a length value of -1 on error or if the blob does not exist. + BlobLength(name string, dgst digest.Digest) (int, error) + + // GetBlob returns the blob stored at the given name, digest pair in the + // form of an io.ReadCloser with the length of this blob. + // A nonzero byteOffset can be provided to receive a partial blob beginning + // at the given offset. + GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) + + // InitiateBlobUpload starts a blob upload in the given repository namespace + // and returns a unique location url to use for other blob upload methods. + InitiateBlobUpload(name string) (string, error) + + // GetBlobUploadStatus returns the byte offset and length of the blob at the + // given upload location. + GetBlobUploadStatus(location string) (int, int, error) + + // UploadBlob uploads a full blob to the registry. + UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error + + // UploadBlobChunk uploads a blob chunk with a given length and startByte to + // the registry. + // FinishChunkedBlobUpload must be called to finalize this upload. + UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error + + // FinishChunkedBlobUpload completes a chunked blob upload at a given + // location. + FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error + + // CancelBlobUpload deletes all content at the unfinished blob upload + // location and invalidates any future calls to this blob upload. + CancelBlobUpload(location string) error +} + +var ( + patternRangeHeader = regexp.MustCompile("bytes=0-(\\d+)/(\\d+)") +) + +// New returns a new Client which operates against a registry with the +// given base endpoint +// This endpoint should not include /v2/ or any part of the url after this. +func New(endpoint string) (Client, error) { + ub, err := v2.NewURLBuilderFromString(endpoint) + if err != nil { + return nil, err + } + + return &clientImpl{ + endpoint: endpoint, + ub: ub, + }, nil +} + +// clientImpl is the default implementation of the Client interface +type clientImpl struct { + endpoint string + ub *v2.URLBuilder +} + +// TODO(bbland): use consistent route generation between server and client + +func (r *clientImpl) GetImageManifest(name, tag string) (*manifest.SignedManifest, error) { + manifestURL, err := r.ub.BuildManifestURL(name, tag) + if err != nil { + return nil, err + } + + response, err := http.Get(manifestURL) + if err != nil { + return nil, err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK: + break + case response.StatusCode == http.StatusNotFound: + return nil, &ImageManifestNotFoundError{Name: name, Tag: tag} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return nil, err + } + return nil, &errs + default: + return nil, &UnexpectedHTTPStatusError{Status: response.Status} + } + + decoder := json.NewDecoder(response.Body) + + manifest := new(manifest.SignedManifest) + err = decoder.Decode(manifest) + if err != nil { + return nil, err + } + return manifest, nil +} + +func (r *clientImpl) PutImageManifest(name, tag string, manifest *manifest.SignedManifest) error { + manifestURL, err := r.ub.BuildManifestURL(name, tag) + if err != nil { + return err + } + + putRequest, err := http.NewRequest("PUT", manifestURL, bytes.NewReader(manifest.Raw)) + if err != nil { + return err + } + + response, err := http.DefaultClient.Do(putRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK || response.StatusCode == http.StatusAccepted: + return nil + case response.StatusCode >= 400 && response.StatusCode < 500: + var errors v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errors) + if err != nil { + return err + } + + return &errors + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) DeleteImage(name, tag string) error { + manifestURL, err := r.ub.BuildManifestURL(name, tag) + if err != nil { + return err + } + + deleteRequest, err := http.NewRequest("DELETE", manifestURL, nil) + if err != nil { + return err + } + + response, err := http.DefaultClient.Do(deleteRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusNoContent: + break + case response.StatusCode == http.StatusNotFound: + return &ImageManifestNotFoundError{Name: name, Tag: tag} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return err + } + return &errs + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } + + return nil +} + +func (r *clientImpl) ListImageTags(name string) ([]string, error) { + tagsURL, err := r.ub.BuildTagsURL(name) + if err != nil { + return nil, err + } + + response, err := http.Get(tagsURL) + if err != nil { + return nil, err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK: + break + case response.StatusCode == http.StatusNotFound: + return nil, &RepositoryNotFoundError{Name: name} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return nil, err + } + return nil, &errs + default: + return nil, &UnexpectedHTTPStatusError{Status: response.Status} + } + + tags := struct { + Tags []string `json:"tags"` + }{} + + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&tags) + if err != nil { + return nil, err + } + + return tags.Tags, nil +} + +func (r *clientImpl) BlobLength(name string, dgst digest.Digest) (int, error) { + blobURL, err := r.ub.BuildBlobURL(name, dgst) + if err != nil { + return -1, err + } + + response, err := http.Head(blobURL) + if err != nil { + return -1, err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK: + lengthHeader := response.Header.Get("Content-Length") + length, err := strconv.ParseInt(lengthHeader, 10, 64) + if err != nil { + return -1, err + } + return int(length), nil + case response.StatusCode == http.StatusNotFound: + return -1, nil + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return -1, err + } + return -1, &errs + default: + response.Body.Close() + return -1, &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) GetBlob(name string, dgst digest.Digest, byteOffset int) (io.ReadCloser, int, error) { + blobURL, err := r.ub.BuildBlobURL(name, dgst) + if err != nil { + return nil, 0, err + } + + getRequest, err := http.NewRequest("GET", blobURL, nil) + if err != nil { + return nil, 0, err + } + + getRequest.Header.Add("Range", fmt.Sprintf("%d-", byteOffset)) + response, err := http.DefaultClient.Do(getRequest) + if err != nil { + return nil, 0, err + } + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusOK: + lengthHeader := response.Header.Get("Content-Length") + length, err := strconv.ParseInt(lengthHeader, 10, 0) + if err != nil { + return nil, 0, err + } + return response.Body, int(length), nil + case response.StatusCode == http.StatusNotFound: + response.Body.Close() + return nil, 0, &BlobNotFoundError{Name: name, Digest: dgst} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return nil, 0, err + } + return nil, 0, &errs + default: + response.Body.Close() + return nil, 0, &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) InitiateBlobUpload(name string) (string, error) { + uploadURL, err := r.ub.BuildBlobUploadURL(name) + if err != nil { + return "", err + } + + postRequest, err := http.NewRequest("POST", uploadURL, nil) + if err != nil { + return "", err + } + + response, err := http.DefaultClient.Do(postRequest) + if err != nil { + return "", err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusAccepted: + return response.Header.Get("Location"), nil + // case response.StatusCode == http.StatusNotFound: + // return + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return "", err + } + return "", &errs + default: + return "", &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) GetBlobUploadStatus(location string) (int, int, error) { + response, err := http.Get(location) + if err != nil { + return 0, 0, err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusNoContent: + return parseRangeHeader(response.Header.Get("Range")) + case response.StatusCode == http.StatusNotFound: + return 0, 0, &BlobUploadNotFoundError{Location: location} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return 0, 0, err + } + return 0, 0, &errs + default: + return 0, 0, &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) UploadBlob(location string, blob io.ReadCloser, length int, dgst digest.Digest) error { + defer blob.Close() + + putRequest, err := http.NewRequest("PUT", location, blob) + if err != nil { + return err + } + + values := putRequest.URL.Query() + values.Set("digest", dgst.String()) + putRequest.URL.RawQuery = values.Encode() + + putRequest.Header.Set("Content-Type", "application/octet-stream") + putRequest.Header.Set("Content-Length", fmt.Sprint(length)) + + response, err := http.DefaultClient.Do(putRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusCreated: + return nil + case response.StatusCode == http.StatusNotFound: + return &BlobUploadNotFoundError{Location: location} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return err + } + return &errs + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) UploadBlobChunk(location string, blobChunk io.ReadCloser, length, startByte int) error { + defer blobChunk.Close() + + putRequest, err := http.NewRequest("PUT", location, blobChunk) + if err != nil { + return err + } + + endByte := startByte + length + + putRequest.Header.Set("Content-Type", "application/octet-stream") + putRequest.Header.Set("Content-Length", fmt.Sprint(length)) + putRequest.Header.Set("Content-Range", + fmt.Sprintf("%d-%d/%d", startByte, endByte, endByte)) + + response, err := http.DefaultClient.Do(putRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusAccepted: + return nil + case response.StatusCode == http.StatusRequestedRangeNotSatisfiable: + lastValidRange, blobSize, err := parseRangeHeader(response.Header.Get("Range")) + if err != nil { + return err + } + return &BlobUploadInvalidRangeError{ + Location: location, + LastValidRange: lastValidRange, + BlobSize: blobSize, + } + case response.StatusCode == http.StatusNotFound: + return &BlobUploadNotFoundError{Location: location} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return err + } + return &errs + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) FinishChunkedBlobUpload(location string, length int, dgst digest.Digest) error { + putRequest, err := http.NewRequest("PUT", location, nil) + if err != nil { + return err + } + + values := putRequest.URL.Query() + values.Set("digest", dgst.String()) + putRequest.URL.RawQuery = values.Encode() + + putRequest.Header.Set("Content-Type", "application/octet-stream") + putRequest.Header.Set("Content-Length", "0") + putRequest.Header.Set("Content-Range", + fmt.Sprintf("%d-%d/%d", length, length, length)) + + response, err := http.DefaultClient.Do(putRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusCreated: + return nil + case response.StatusCode == http.StatusNotFound: + return &BlobUploadNotFoundError{Location: location} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return err + } + return &errs + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +func (r *clientImpl) CancelBlobUpload(location string) error { + deleteRequest, err := http.NewRequest("DELETE", location, nil) + if err != nil { + return err + } + + response, err := http.DefaultClient.Do(deleteRequest) + if err != nil { + return err + } + defer response.Body.Close() + + // TODO(bbland): handle other status codes, like 5xx errors + switch { + case response.StatusCode == http.StatusNoContent: + return nil + case response.StatusCode == http.StatusNotFound: + return &BlobUploadNotFoundError{Location: location} + case response.StatusCode >= 400 && response.StatusCode < 500: + var errs v2.Errors + decoder := json.NewDecoder(response.Body) + err = decoder.Decode(&errs) + if err != nil { + return err + } + return &errs + default: + return &UnexpectedHTTPStatusError{Status: response.Status} + } +} + +// parseRangeHeader parses out the offset and length from a returned Range +// header +func parseRangeHeader(byteRangeHeader string) (int, int, error) { + submatches := patternRangeHeader.FindStringSubmatch(byteRangeHeader) + if submatches == nil || len(submatches) < 3 { + return 0, 0, fmt.Errorf("Malformed Range header") + } + + offset, err := strconv.Atoi(submatches[1]) + if err != nil { + return 0, 0, err + } + length, err := strconv.Atoi(submatches[2]) + if err != nil { + return 0, 0, err + } + return offset, length, nil +} diff --git a/docs/client/client_test.go b/docs/client/client_test.go new file mode 100644 index 00000000..2c1d1cc2 --- /dev/null +++ b/docs/client/client_test.go @@ -0,0 +1,440 @@ +package client + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/testutil" +) + +type testBlob struct { + digest digest.Digest + contents []byte +} + +func TestRangeHeaderParser(t *testing.T) { + const ( + malformedRangeHeader = "bytes=0-A/C" + emptyRangeHeader = "" + rFirst = 100 + rSecond = 200 + ) + + var ( + wellformedRangeHeader = fmt.Sprintf("bytes=0-%d/%d", rFirst, rSecond) + ) + + if _, _, err := parseRangeHeader(malformedRangeHeader); err == nil { + t.Fatalf("malformedRangeHeader: error expected, got nil") + } + + if _, _, err := parseRangeHeader(emptyRangeHeader); err == nil { + t.Fatalf("emptyRangeHeader: error expected, got nil") + } + + first, second, err := parseRangeHeader(wellformedRangeHeader) + if err != nil { + t.Fatalf("wellformedRangeHeader: unexpected error %v", err) + } + + if first != rFirst || second != rSecond { + t.Fatalf("Range has been parsed unproperly: %d/%d", first, second) + } + +} + +func TestPush(t *testing.T) { + name := "hello/world" + tag := "sometag" + testBlobs := []testBlob{ + { + digest: "tarsum.v2+sha256:12345", + contents: []byte("some contents"), + }, + { + digest: "tarsum.v2+sha256:98765", + contents: []byte("some other contents"), + }, + } + uploadLocations := make([]string, len(testBlobs)) + blobs := make([]manifest.FSLayer, len(testBlobs)) + history := make([]manifest.History, len(testBlobs)) + + for i, blob := range testBlobs { + // TODO(bbland): this is returning the same location for all uploads, + // because we can't know which blob will get which location. + // It's sort of okay because we're using unique digests, but this needs + // to change at some point. + uploadLocations[i] = fmt.Sprintf("/v2/%s/blobs/test-uuid", name) + blobs[i] = manifest.FSLayer{BlobSum: blob.digest} + history[i] = manifest.History{V1Compatibility: blob.digest.String()} + } + + m := &manifest.SignedManifest{ + Manifest: manifest.Manifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: blobs, + History: history, + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + }, + } + var err error + m.Raw, err = json.Marshal(m) + + blobRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs)) + for i, blob := range testBlobs { + blobRequestResponseMappings[2*i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "POST", + Route: "/v2/" + name + "/blobs/uploads/", + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Location": {uploadLocations[i]}, + }), + }, + } + blobRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PUT", + Route: uploadLocations[i], + QueryParams: map[string][]string{ + "digest": {blob.digest.String()}, + }, + Body: blob.contents, + }, + Response: testutil.Response{ + StatusCode: http.StatusCreated, + }, + } + } + + handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PUT", + Route: "/v2/" + name + "/manifests/" + tag, + Body: m.Raw, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + }, + })) + var server *httptest.Server + + // HACK(stevvooe): Super hack to follow: the request response map approach + // above does not let us correctly format the location header to the + // server url. This handler intercepts and re-writes the location header + // to the server url. + + hack := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w = &headerInterceptingResponseWriter{ResponseWriter: w, serverURL: server.URL} + handler.ServeHTTP(w, r) + }) + + server = httptest.NewServer(hack) + client, err := New(server.URL) + if err != nil { + t.Fatalf("error creating client: %v", err) + } + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*manifest.SignedManifest), + layerStorage: make(map[digest.Digest]Layer), + } + + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) + if err != nil { + t.Fatal(err) + } + + writer, err := l.Writer() + if err != nil { + t.Fatal(err) + } + + writer.SetSize(len(blob.contents)) + writer.Write(blob.contents) + writer.Close() + } + + objectStore.WriteManifest(name, tag, m) + + err = Push(client, objectStore, name, tag) + if err != nil { + t.Fatal(err) + } +} + +func TestPull(t *testing.T) { + name := "hello/world" + tag := "sometag" + testBlobs := []testBlob{ + { + digest: "tarsum.v2+sha256:12345", + contents: []byte("some contents"), + }, + { + digest: "tarsum.v2+sha256:98765", + contents: []byte("some other contents"), + }, + } + blobs := make([]manifest.FSLayer, len(testBlobs)) + history := make([]manifest.History, len(testBlobs)) + + for i, blob := range testBlobs { + blobs[i] = manifest.FSLayer{BlobSum: blob.digest} + history[i] = manifest.History{V1Compatibility: blob.digest.String()} + } + + m := &manifest.SignedManifest{ + Manifest: manifest.Manifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: blobs, + History: history, + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + }, + } + manifestBytes, err := json.Marshal(m) + + blobRequestResponseMappings := make([]testutil.RequestResponseMapping, len(testBlobs)) + for i, blob := range testBlobs { + blobRequestResponseMappings[i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/blobs/" + blob.digest.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: blob.contents, + }, + } + } + + handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/manifests/" + tag, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: manifestBytes, + }, + })) + server := httptest.NewServer(handler) + client, err := New(server.URL) + if err != nil { + t.Fatalf("error creating client: %v", err) + } + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*manifest.SignedManifest), + layerStorage: make(map[digest.Digest]Layer), + } + + err = Pull(client, objectStore, name, tag) + if err != nil { + t.Fatal(err) + } + + m, err = objectStore.Manifest(name, tag) + if err != nil { + t.Fatal(err) + } + + mBytes, err := json.Marshal(m) + if err != nil { + t.Fatal(err) + } + + if string(mBytes) != string(manifestBytes) { + t.Fatal("Incorrect manifest") + } + + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) + if err != nil { + t.Fatal(err) + } + + reader, err := l.Reader() + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + blobBytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if string(blobBytes) != string(blob.contents) { + t.Fatal("Incorrect blob") + } + } +} + +func TestPullResume(t *testing.T) { + name := "hello/world" + tag := "sometag" + testBlobs := []testBlob{ + { + digest: "tarsum.v2+sha256:12345", + contents: []byte("some contents"), + }, + { + digest: "tarsum.v2+sha256:98765", + contents: []byte("some other contents"), + }, + } + layers := make([]manifest.FSLayer, len(testBlobs)) + history := make([]manifest.History, len(testBlobs)) + + for i, layer := range testBlobs { + layers[i] = manifest.FSLayer{BlobSum: layer.digest} + history[i] = manifest.History{V1Compatibility: layer.digest.String()} + } + + m := &manifest.Manifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: layers, + History: history, + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + } + manifestBytes, err := json.Marshal(m) + + layerRequestResponseMappings := make([]testutil.RequestResponseMapping, 2*len(testBlobs)) + for i, blob := range testBlobs { + layerRequestResponseMappings[2*i] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/blobs/" + blob.digest.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: blob.contents[:len(blob.contents)/2], + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(blob.contents))}, + }), + }, + } + layerRequestResponseMappings[2*i+1] = testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/blobs/" + blob.digest.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: blob.contents[len(blob.contents)/2:], + }, + } + } + + for i := 0; i < 3; i++ { + layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + name + "/manifests/" + tag, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: manifestBytes, + }, + }) + } + + handler := testutil.NewHandler(layerRequestResponseMappings) + server := httptest.NewServer(handler) + client, err := New(server.URL) + if err != nil { + t.Fatalf("error creating client: %v", err) + } + objectStore := &memoryObjectStore{ + mutex: new(sync.Mutex), + manifestStorage: make(map[string]*manifest.SignedManifest), + layerStorage: make(map[digest.Digest]Layer), + } + + for attempts := 0; attempts < 3; attempts++ { + err = Pull(client, objectStore, name, tag) + if err == nil { + break + } + } + + if err != nil { + t.Fatal(err) + } + + sm, err := objectStore.Manifest(name, tag) + if err != nil { + t.Fatal(err) + } + + mBytes, err := json.Marshal(sm) + if err != nil { + t.Fatal(err) + } + + if string(mBytes) != string(manifestBytes) { + t.Fatal("Incorrect manifest") + } + + for _, blob := range testBlobs { + l, err := objectStore.Layer(blob.digest) + if err != nil { + t.Fatal(err) + } + + reader, err := l.Reader() + if err != nil { + t.Fatal(err) + } + defer reader.Close() + + layerBytes, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if string(layerBytes) != string(blob.contents) { + t.Fatal("Incorrect blob") + } + } +} + +// headerInterceptingResponseWriter is a hacky workaround to re-write the +// location header to have the server url. +type headerInterceptingResponseWriter struct { + http.ResponseWriter + serverURL string +} + +func (hirw *headerInterceptingResponseWriter) WriteHeader(status int) { + location := hirw.Header().Get("Location") + if location != "" { + hirw.Header().Set("Location", hirw.serverURL+location) + } + + hirw.ResponseWriter.WriteHeader(status) +} diff --git a/docs/client/errors.go b/docs/client/errors.go new file mode 100644 index 00000000..3e89e674 --- /dev/null +++ b/docs/client/errors.go @@ -0,0 +1,79 @@ +package client + +import ( + "fmt" + + "github.com/docker/distribution/digest" +) + +// RepositoryNotFoundError is returned when making an operation against a +// repository that does not exist in the registry. +type RepositoryNotFoundError struct { + Name string +} + +func (e *RepositoryNotFoundError) Error() string { + return fmt.Sprintf("No repository found with Name: %s", e.Name) +} + +// ImageManifestNotFoundError is returned when making an operation against a +// given image manifest that does not exist in the registry. +type ImageManifestNotFoundError struct { + Name string + Tag string +} + +func (e *ImageManifestNotFoundError) Error() string { + return fmt.Sprintf("No manifest found with Name: %s, Tag: %s", + e.Name, e.Tag) +} + +// BlobNotFoundError is returned when making an operation against a given image +// layer that does not exist in the registry. +type BlobNotFoundError struct { + Name string + Digest digest.Digest +} + +func (e *BlobNotFoundError) Error() string { + return fmt.Sprintf("No blob found with Name: %s, Digest: %s", + e.Name, e.Digest) +} + +// BlobUploadNotFoundError is returned when making a blob upload operation against an +// invalid blob upload location url. +// This may be the result of using a cancelled, completed, or stale upload +// location. +type BlobUploadNotFoundError struct { + Location string +} + +func (e *BlobUploadNotFoundError) Error() string { + return fmt.Sprintf("No blob upload found at Location: %s", e.Location) +} + +// BlobUploadInvalidRangeError is returned when attempting to upload an image +// blob chunk that is out of order. +// This provides the known BlobSize and LastValidRange which can be used to +// resume the upload. +type BlobUploadInvalidRangeError struct { + Location string + LastValidRange int + BlobSize int +} + +func (e *BlobUploadInvalidRangeError) Error() string { + return fmt.Sprintf( + "Invalid range provided for upload at Location: %s. Last Valid Range: %d, Blob Size: %d", + e.Location, e.LastValidRange, e.BlobSize) +} + +// UnexpectedHTTPStatusError is returned when an unexpected HTTP status is +// returned when making a registry api call. +type UnexpectedHTTPStatusError struct { + Status string +} + +func (e *UnexpectedHTTPStatusError) Error() string { + return fmt.Sprintf("Received unexpected HTTP status: %s", e.Status) +} diff --git a/docs/client/objectstore.go b/docs/client/objectstore.go new file mode 100644 index 00000000..5969c9d2 --- /dev/null +++ b/docs/client/objectstore.go @@ -0,0 +1,239 @@ +package client + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" +) + +var ( + // ErrLayerAlreadyExists is returned when attempting to create a layer with + // a tarsum that is already in use. + ErrLayerAlreadyExists = fmt.Errorf("Layer already exists") + + // ErrLayerLocked is returned when attempting to write to a layer which is + // currently being written to. + ErrLayerLocked = fmt.Errorf("Layer locked") +) + +// ObjectStore is an interface which is designed to approximate the docker +// engine storage. This interface is subject to change to conform to the +// future requirements of the engine. +type ObjectStore interface { + // Manifest retrieves the image manifest stored at the given repository name + // and tag + Manifest(name, tag string) (*manifest.SignedManifest, error) + + // WriteManifest stores an image manifest at the given repository name and + // tag + WriteManifest(name, tag string, manifest *manifest.SignedManifest) error + + // Layer returns a handle to a layer for reading and writing + Layer(dgst digest.Digest) (Layer, error) +} + +// Layer is a generic image layer interface. +// A Layer may not be written to if it is already complete. +type Layer interface { + // Reader returns a LayerReader or an error if the layer has not been + // written to or is currently being written to. + Reader() (LayerReader, error) + + // Writer returns a LayerWriter or an error if the layer has been fully + // written to or is currently being written to. + Writer() (LayerWriter, error) + + // Wait blocks until the Layer can be read from. + Wait() error +} + +// LayerReader is a read-only handle to a Layer, which exposes the CurrentSize +// and full Size in addition to implementing the io.ReadCloser interface. +type LayerReader interface { + io.ReadCloser + + // CurrentSize returns the number of bytes written to the underlying Layer + CurrentSize() int + + // Size returns the full size of the underlying Layer + Size() int +} + +// LayerWriter is a write-only handle to a Layer, which exposes the CurrentSize +// and full Size in addition to implementing the io.WriteCloser interface. +// SetSize must be called on this LayerWriter before it can be written to. +type LayerWriter interface { + io.WriteCloser + + // CurrentSize returns the number of bytes written to the underlying Layer + CurrentSize() int + + // Size returns the full size of the underlying Layer + Size() int + + // SetSize sets the full size of the underlying Layer. + // This must be called before any calls to Write + SetSize(int) error +} + +// memoryObjectStore is an in-memory implementation of the ObjectStore interface +type memoryObjectStore struct { + mutex *sync.Mutex + manifestStorage map[string]*manifest.SignedManifest + layerStorage map[digest.Digest]Layer +} + +func (objStore *memoryObjectStore) Manifest(name, tag string) (*manifest.SignedManifest, error) { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + manifest, ok := objStore.manifestStorage[name+":"+tag] + if !ok { + return nil, fmt.Errorf("No manifest found with Name: %q, Tag: %q", name, tag) + } + return manifest, nil +} + +func (objStore *memoryObjectStore) WriteManifest(name, tag string, manifest *manifest.SignedManifest) error { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + objStore.manifestStorage[name+":"+tag] = manifest + return nil +} + +func (objStore *memoryObjectStore) Layer(dgst digest.Digest) (Layer, error) { + objStore.mutex.Lock() + defer objStore.mutex.Unlock() + + layer, ok := objStore.layerStorage[dgst] + if !ok { + layer = &memoryLayer{cond: sync.NewCond(new(sync.Mutex))} + objStore.layerStorage[dgst] = layer + } + + return layer, nil +} + +type memoryLayer struct { + cond *sync.Cond + contents []byte + expectedSize int + writing bool +} + +func (ml *memoryLayer) Reader() (LayerReader, error) { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.contents == nil { + return nil, fmt.Errorf("Layer has not been written to yet") + } + if ml.writing { + return nil, ErrLayerLocked + } + + return &memoryLayerReader{ml: ml, reader: bytes.NewReader(ml.contents)}, nil +} + +func (ml *memoryLayer) Writer() (LayerWriter, error) { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.contents != nil { + if ml.writing { + return nil, ErrLayerLocked + } + if ml.expectedSize == len(ml.contents) { + return nil, ErrLayerAlreadyExists + } + } else { + ml.contents = make([]byte, 0) + } + + ml.writing = true + return &memoryLayerWriter{ml: ml, buffer: bytes.NewBuffer(ml.contents)}, nil +} + +func (ml *memoryLayer) Wait() error { + ml.cond.L.Lock() + defer ml.cond.L.Unlock() + + if ml.contents == nil { + return fmt.Errorf("No writer to wait on") + } + + for ml.writing { + ml.cond.Wait() + } + + return nil +} + +type memoryLayerReader struct { + ml *memoryLayer + reader *bytes.Reader +} + +func (mlr *memoryLayerReader) Read(p []byte) (int, error) { + return mlr.reader.Read(p) +} + +func (mlr *memoryLayerReader) Close() error { + return nil +} + +func (mlr *memoryLayerReader) CurrentSize() int { + return len(mlr.ml.contents) +} + +func (mlr *memoryLayerReader) Size() int { + return mlr.ml.expectedSize +} + +type memoryLayerWriter struct { + ml *memoryLayer + buffer *bytes.Buffer +} + +func (mlw *memoryLayerWriter) Write(p []byte) (int, error) { + if mlw.ml.expectedSize == 0 { + return 0, fmt.Errorf("Must set size before writing to layer") + } + wrote, err := mlw.buffer.Write(p) + mlw.ml.contents = mlw.buffer.Bytes() + return wrote, err +} + +func (mlw *memoryLayerWriter) Close() error { + mlw.ml.cond.L.Lock() + defer mlw.ml.cond.L.Unlock() + + return mlw.close() +} + +func (mlw *memoryLayerWriter) close() error { + mlw.ml.writing = false + mlw.ml.cond.Broadcast() + return nil +} + +func (mlw *memoryLayerWriter) CurrentSize() int { + return len(mlw.ml.contents) +} + +func (mlw *memoryLayerWriter) Size() int { + return mlw.ml.expectedSize +} + +func (mlw *memoryLayerWriter) SetSize(size int) error { + if !mlw.ml.writing { + return fmt.Errorf("Layer is closed for writing") + } + mlw.ml.expectedSize = size + return nil +} diff --git a/docs/client/pull.go b/docs/client/pull.go new file mode 100644 index 00000000..385158db --- /dev/null +++ b/docs/client/pull.go @@ -0,0 +1,151 @@ +package client + +import ( + "fmt" + "io" + + log "github.com/Sirupsen/logrus" + + "github.com/docker/distribution/manifest" +) + +// simultaneousLayerPullWindow is the size of the parallel layer pull window. +// A layer may not be pulled until the layer preceeding it by the length of the +// pull window has been successfully pulled. +const simultaneousLayerPullWindow = 4 + +// Pull implements a client pull workflow for the image defined by the given +// name and tag pair, using the given ObjectStore for local manifest and layer +// storage +func Pull(c Client, objectStore ObjectStore, name, tag string) error { + manifest, err := c.GetImageManifest(name, tag) + if err != nil { + return err + } + log.WithField("manifest", manifest).Info("Pulled manifest") + + if len(manifest.FSLayers) != len(manifest.History) { + return fmt.Errorf("Length of history not equal to number of layers") + } + if len(manifest.FSLayers) == 0 { + return fmt.Errorf("Image has no layers") + } + + errChans := make([]chan error, len(manifest.FSLayers)) + for i := range manifest.FSLayers { + errChans[i] = make(chan error) + } + + // To avoid leak of goroutines we must notify + // pullLayer goroutines about a cancelation, + // otherwise they will lock forever. + cancelCh := make(chan struct{}) + + // Iterate over each layer in the manifest, simultaneously pulling no more + // than simultaneousLayerPullWindow layers at a time. If an error is + // received from a layer pull, we abort the push. + for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ { + dependentLayer := i - simultaneousLayerPullWindow + if dependentLayer >= 0 { + err := <-errChans[dependentLayer] + if err != nil { + log.WithField("error", err).Warn("Pull aborted") + close(cancelCh) + return err + } + } + + if i < len(manifest.FSLayers) { + go func(i int) { + select { + case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]): + case <-cancelCh: // no chance to recv until cancelCh's closed + } + }(i) + } + } + + err = objectStore.WriteManifest(name, tag, manifest) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "manifest": manifest, + }).Warn("Unable to write image manifest") + return err + } + + return nil +} + +func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error { + log.WithField("layer", fsLayer).Info("Pulling layer") + + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + + layerWriter, err := layer.Writer() + if err == ErrLayerAlreadyExists { + log.WithField("layer", fsLayer).Info("Layer already exists") + return nil + } + if err == ErrLayerLocked { + log.WithField("layer", fsLayer).Info("Layer download in progress, waiting") + layer.Wait() + return nil + } + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to write local layer") + return err + } + defer layerWriter.Close() + + if layerWriter.CurrentSize() > 0 { + log.WithFields(log.Fields{ + "layer": fsLayer, + "currentSize": layerWriter.CurrentSize(), + "size": layerWriter.Size(), + }).Info("Layer partially downloaded, resuming") + } + + layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize()) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + defer layerReader.Close() + + layerWriter.SetSize(layerWriter.CurrentSize() + length) + + _, err = io.Copy(layerWriter, layerReader) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to download layer") + return err + } + if layerWriter.CurrentSize() != layerWriter.Size() { + log.WithFields(log.Fields{ + "size": layerWriter.Size(), + "currentSize": layerWriter.CurrentSize(), + "layer": fsLayer, + }).Warn("Layer invalid size") + return fmt.Errorf( + "Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d", + fsLayer, layerWriter.Size(), layerWriter.CurrentSize(), + ) + } + return nil +} diff --git a/docs/client/push.go b/docs/client/push.go new file mode 100644 index 00000000..c26bd174 --- /dev/null +++ b/docs/client/push.go @@ -0,0 +1,137 @@ +package client + +import ( + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/docker/distribution/manifest" +) + +// simultaneousLayerPushWindow is the size of the parallel layer push window. +// A layer may not be pushed until the layer preceeding it by the length of the +// push window has been successfully pushed. +const simultaneousLayerPushWindow = 4 + +type pushFunction func(fsLayer manifest.FSLayer) error + +// Push implements a client push workflow for the image defined by the given +// name and tag pair, using the given ObjectStore for local manifest and layer +// storage +func Push(c Client, objectStore ObjectStore, name, tag string) error { + manifest, err := objectStore.Manifest(name, tag) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "name": name, + "tag": tag, + }).Info("No image found") + return err + } + + errChans := make([]chan error, len(manifest.FSLayers)) + for i := range manifest.FSLayers { + errChans[i] = make(chan error) + } + + cancelCh := make(chan struct{}) + + // Iterate over each layer in the manifest, simultaneously pushing no more + // than simultaneousLayerPushWindow layers at a time. If an error is + // received from a layer push, we abort the push. + for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ { + dependentLayer := i - simultaneousLayerPushWindow + if dependentLayer >= 0 { + err := <-errChans[dependentLayer] + if err != nil { + log.WithField("error", err).Warn("Push aborted") + close(cancelCh) + return err + } + } + + if i < len(manifest.FSLayers) { + go func(i int) { + select { + case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]): + case <-cancelCh: // recv broadcast notification about cancelation + } + }(i) + } + } + + err = c.PutImageManifest(name, tag, manifest) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "manifest": manifest, + }).Warn("Unable to upload manifest") + return err + } + + return nil +} + +func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error { + log.WithField("layer", fsLayer).Info("Pushing layer") + + layer, err := objectStore.Layer(fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + + layerReader, err := layer.Reader() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to read local layer") + return err + } + defer layerReader.Close() + + if layerReader.CurrentSize() != layerReader.Size() { + log.WithFields(log.Fields{ + "layer": fsLayer, + "currentSize": layerReader.CurrentSize(), + "size": layerReader.Size(), + }).Warn("Local layer incomplete") + return fmt.Errorf("Local layer incomplete") + } + + length, err := c.BlobLength(name, fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to check existence of remote layer") + return err + } + if length >= 0 { + log.WithField("layer", fsLayer).Info("Layer already exists") + return nil + } + + location, err := c.InitiateBlobUpload(name) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "layer": fsLayer, + }).Warn("Unable to upload layer") + return err + } + + return nil +}