From ce614b6de8b2f760bbb6b426447f159821e0ea5d Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 17 Apr 2015 13:32:51 -0700 Subject: [PATCH] Add client implementation of distribution interface Adds functionality to create a Repository client which connects to a remote endpoint. Signed-off-by: Derek McGowan (github: dmcgowan) --- registry/client/authchallenge.go | 150 +++++++ registry/client/endpoint.go | 266 ++++++++++++ registry/client/errors.go | 37 ++ registry/client/repository.go | 657 +++++++++++++++++++++++++++++ registry/client/repository_test.go | 605 ++++++++++++++++++++++++++ registry/client/token.go | 78 ++++ 6 files changed, 1793 insertions(+) create mode 100644 registry/client/authchallenge.go create mode 100644 registry/client/endpoint.go create mode 100644 registry/client/repository.go create mode 100644 registry/client/repository_test.go create mode 100644 registry/client/token.go diff --git a/registry/client/authchallenge.go b/registry/client/authchallenge.go new file mode 100644 index 00000000..0485f42d --- /dev/null +++ b/registry/client/authchallenge.go @@ -0,0 +1,150 @@ +package client + +import ( + "net/http" + "strings" +) + +// Octet types from RFC 2616. +type octetType byte + +// AuthorizationChallenge carries information +// from a WWW-Authenticate response header. +type AuthorizationChallenge struct { + Scheme string + Parameters map[string]string +} + +var octetTypes [256]octetType + +const ( + isToken octetType = 1 << iota + isSpace +) + +func init() { + // OCTET = + // CHAR = + // CTL = + // CR = + // LF = + // SP = + // HT = + // <"> = + // CRLF = CR LF + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <"> + // | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT + // token = 1* + // qdtext = > + + for c := 0; c < 256; c++ { + var t octetType + isCtl := c <= 31 || c == 127 + isChar := 0 <= c && c <= 127 + isSeparator := strings.IndexRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) >= 0 + if strings.IndexRune(" \t\r\n", rune(c)) >= 0 { + t |= isSpace + } + if isChar && !isCtl && !isSeparator { + t |= isToken + } + octetTypes[c] = t + } +} + +func parseAuthHeader(header http.Header) []AuthorizationChallenge { + var challenges []AuthorizationChallenge + for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] { + v, p := parseValueAndParams(h) + if v != "" { + challenges = append(challenges, AuthorizationChallenge{Scheme: v, Parameters: p}) + } + } + return challenges +} + +func parseValueAndParams(header string) (value string, params map[string]string) { + params = make(map[string]string) + value, s := expectToken(header) + if value == "" { + return + } + value = strings.ToLower(value) + s = "," + skipSpace(s) + for strings.HasPrefix(s, ",") { + var pkey string + pkey, s = expectToken(skipSpace(s[1:])) + if pkey == "" { + return + } + if !strings.HasPrefix(s, "=") { + return + } + var pvalue string + pvalue, s = expectTokenOrQuoted(s[1:]) + if pvalue == "" { + return + } + pkey = strings.ToLower(pkey) + params[pkey] = pvalue + s = skipSpace(s) + } + return +} + +func skipSpace(s string) (rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isSpace == 0 { + break + } + } + return s[i:] +} + +func expectToken(s string) (token, rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isToken == 0 { + break + } + } + return s[:i], s[i:] +} + +func expectTokenOrQuoted(s string) (value string, rest string) { + if !strings.HasPrefix(s, "\"") { + return expectToken(s) + } + s = s[1:] + for i := 0; i < len(s); i++ { + switch s[i] { + case '"': + return s[:i], s[i+1:] + case '\\': + p := make([]byte, len(s)-1) + j := copy(p, s[:i]) + escape := true + for i = i + i; i < len(s); i++ { + b := s[i] + switch { + case escape: + escape = false + p[j] = b + j++ + case b == '\\': + escape = true + case b == '"': + return string(p[:j]), s[i+1:] + default: + p[j] = b + j++ + } + } + return "", "" + } + } + return "", "" +} diff --git a/registry/client/endpoint.go b/registry/client/endpoint.go new file mode 100644 index 00000000..83d3d991 --- /dev/null +++ b/registry/client/endpoint.go @@ -0,0 +1,266 @@ +package client + +import ( + "fmt" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/registry/api/v2" +) + +// Authorizer is used to apply Authorization to an HTTP request +type Authorizer interface { + // Authorizer updates an HTTP request with the needed authorization + Authorize(req *http.Request) error +} + +// CredentialStore is an interface for getting credentials for +// a given URL +type CredentialStore interface { + // Basic returns basic auth for the given URL + Basic(*url.URL) (string, string) +} + +// RepositoryEndpoint represents a single host endpoint serving up +// the distribution API. +type RepositoryEndpoint struct { + Endpoint string + Mirror bool + + Header http.Header + Credentials CredentialStore + + ub *v2.URLBuilder +} + +type nullAuthorizer struct{} + +func (na nullAuthorizer) Authorize(req *http.Request) error { + return nil +} + +type repositoryTransport struct { + Transport http.RoundTripper + Header http.Header + Authorizer Authorizer +} + +func (rt *repositoryTransport) RoundTrip(req *http.Request) (*http.Response, error) { + reqCopy := new(http.Request) + *reqCopy = *req + + // Copy existing headers then static headers + reqCopy.Header = make(http.Header, len(req.Header)+len(rt.Header)) + for k, s := range req.Header { + reqCopy.Header[k] = append([]string(nil), s...) + } + for k, s := range rt.Header { + reqCopy.Header[k] = append(reqCopy.Header[k], s...) + } + + if rt.Authorizer != nil { + if err := rt.Authorizer.Authorize(reqCopy); err != nil { + return nil, err + } + } + + logrus.Debugf("HTTP: %s %s", req.Method, req.URL) + + if rt.Transport != nil { + return rt.Transport.RoundTrip(reqCopy) + } + return http.DefaultTransport.RoundTrip(reqCopy) +} + +type authTransport struct { + Transport http.RoundTripper + Header http.Header +} + +func (rt *authTransport) RoundTrip(req *http.Request) (*http.Response, error) { + reqCopy := new(http.Request) + *reqCopy = *req + + // Copy existing headers then static headers + reqCopy.Header = make(http.Header, len(req.Header)+len(rt.Header)) + for k, s := range req.Header { + reqCopy.Header[k] = append([]string(nil), s...) + } + for k, s := range rt.Header { + reqCopy.Header[k] = append(reqCopy.Header[k], s...) + } + + logrus.Debugf("HTTP: %s %s", req.Method, req.URL) + + if rt.Transport != nil { + return rt.Transport.RoundTrip(reqCopy) + } + return http.DefaultTransport.RoundTrip(reqCopy) +} + +// URLBuilder returns a new URL builder +func (e *RepositoryEndpoint) URLBuilder() (*v2.URLBuilder, error) { + if e.ub == nil { + var err error + e.ub, err = v2.NewURLBuilderFromString(e.Endpoint) + if err != nil { + return nil, err + } + } + + return e.ub, nil +} + +// HTTPClient returns a new HTTP client configured for this endpoint +func (e *RepositoryEndpoint) HTTPClient(name string) (*http.Client, error) { + transport := &repositoryTransport{ + Header: e.Header, + } + client := &http.Client{ + Transport: transport, + } + + challenges, err := e.ping(client) + if err != nil { + return nil, err + } + actions := []string{"pull"} + if !e.Mirror { + actions = append(actions, "push") + } + + transport.Authorizer = &endpointAuthorizer{ + client: &http.Client{Transport: &authTransport{Header: e.Header}}, + challenges: challenges, + creds: e.Credentials, + resource: "repository", + scope: name, + actions: actions, + } + + return client, nil +} + +func (e *RepositoryEndpoint) ping(client *http.Client) ([]AuthorizationChallenge, error) { + ub, err := e.URLBuilder() + if err != nil { + return nil, err + } + u, err := ub.BuildBaseURL() + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", u, nil) + if err != nil { + return nil, err + } + req.Header = make(http.Header, len(e.Header)) + for k, s := range e.Header { + req.Header[k] = append([]string(nil), s...) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var supportsV2 bool +HeaderLoop: + for _, supportedVersions := range resp.Header[http.CanonicalHeaderKey("Docker-Distribution-API-Version")] { + for _, versionName := range strings.Fields(supportedVersions) { + if versionName == "registry/2.0" { + supportsV2 = true + break HeaderLoop + } + } + } + + if !supportsV2 { + return nil, fmt.Errorf("%s does not appear to be a v2 registry endpoint", e.Endpoint) + } + + if resp.StatusCode == http.StatusUnauthorized { + // Parse the WWW-Authenticate Header and store the challenges + // on this endpoint object. + return parseAuthHeader(resp.Header), nil + } else if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unable to get valid ping response: %d", resp.StatusCode) + } + + return nil, nil +} + +type endpointAuthorizer struct { + client *http.Client + challenges []AuthorizationChallenge + creds CredentialStore + + resource string + scope string + actions []string + + tokenLock sync.Mutex + tokenCache string + tokenExpiration time.Time +} + +func (ta *endpointAuthorizer) Authorize(req *http.Request) error { + token, err := ta.getToken() + if err != nil { + return err + } + if token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } else if ta.creds != nil { + username, password := ta.creds.Basic(req.URL) + if username != "" && password != "" { + req.SetBasicAuth(username, password) + } + } + return nil +} + +func (ta *endpointAuthorizer) getToken() (string, error) { + ta.tokenLock.Lock() + defer ta.tokenLock.Unlock() + now := time.Now() + if now.Before(ta.tokenExpiration) { + //log.Debugf("Using cached token for %q", ta.auth.Username) + return ta.tokenCache, nil + } + + for _, challenge := range ta.challenges { + switch strings.ToLower(challenge.Scheme) { + case "basic": + // no token necessary + case "bearer": + //log.Debugf("Getting bearer token with %s for %s", challenge.Parameters, ta.auth.Username) + params := map[string]string{} + for k, v := range challenge.Parameters { + params[k] = v + } + params["scope"] = fmt.Sprintf("%s:%s:%s", ta.resource, ta.scope, strings.Join(ta.actions, ",")) + token, err := getToken(ta.creds, params, ta.client) + if err != nil { + return "", err + } + ta.tokenCache = token + ta.tokenExpiration = now.Add(time.Minute) + + return token, nil + default: + //log.Infof("Unsupported auth scheme: %q", challenge.Scheme) + } + } + + // Do not expire cache since there are no challenges which use a token + ta.tokenExpiration = time.Now().Add(time.Hour * 24) + + return "", nil +} diff --git a/registry/client/errors.go b/registry/client/errors.go index 3e89e674..4ef2cc23 100644 --- a/registry/client/errors.go +++ b/registry/client/errors.go @@ -1,9 +1,14 @@ package client import ( + "bytes" + "encoding/json" "fmt" + "io/ioutil" + "net/http" "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/api/v2" ) // RepositoryNotFoundError is returned when making an operation against a @@ -77,3 +82,35 @@ type UnexpectedHTTPStatusError struct { func (e *UnexpectedHTTPStatusError) Error() string { return fmt.Sprintf("Received unexpected HTTP status: %s", e.Status) } + +// UnexpectedHTTPResponseError is returned when an expected HTTP status code +// is returned, but the content was unexpected and failed to be parsed. +type UnexpectedHTTPResponseError struct { + ParseErr error + Response []byte +} + +func (e *UnexpectedHTTPResponseError) Error() string { + shortenedResponse := string(e.Response) + if len(shortenedResponse) > 15 { + shortenedResponse = shortenedResponse[:12] + "..." + } + return fmt.Sprintf("Error parsing HTTP response: %s: %q", e.ParseErr.Error(), shortenedResponse) +} + +func parseHTTPErrorResponse(response *http.Response) error { + var errors v2.Errors + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + decoder := json.NewDecoder(bytes.NewReader(body)) + err = decoder.Decode(&errors) + if err != nil { + return &UnexpectedHTTPResponseError{ + ParseErr: err, + Response: body, + } + } + return &errors +} diff --git a/registry/client/repository.go b/registry/client/repository.go new file mode 100644 index 00000000..a96390fa --- /dev/null +++ b/registry/client/repository.go @@ -0,0 +1,657 @@ +package client + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "strconv" + "time" + + ctxu "github.com/docker/distribution/context" + + "github.com/docker/distribution/manifest" + + "github.com/docker/distribution/digest" + + "github.com/docker/distribution" + "github.com/docker/distribution/registry/api/v2" + "golang.org/x/net/context" +) + +// NewRepositoryClient creates a new Repository for the given repository name and endpoint +func NewRepositoryClient(ctx context.Context, name string, endpoint *RepositoryEndpoint) (distribution.Repository, error) { + if err := v2.ValidateRespositoryName(name); err != nil { + return nil, err + } + + ub, err := endpoint.URLBuilder() + if err != nil { + return nil, err + } + + client, err := endpoint.HTTPClient(name) + if err != nil { + return nil, err + } + + return &repository{ + client: client, + ub: ub, + name: name, + context: ctx, + mirror: endpoint.Mirror, + }, nil +} + +type repository struct { + client *http.Client + ub *v2.URLBuilder + context context.Context + name string + mirror bool +} + +func (r *repository) Name() string { + return r.name +} + +func (r *repository) Layers() distribution.LayerService { + return &layers{ + repository: r, + } +} + +func (r *repository) Manifests() distribution.ManifestService { + return &manifests{ + repository: r, + } +} + +func (r *repository) Signatures() distribution.SignatureService { + return &signatures{ + repository: r, + } +} + +type signatures struct { + *repository +} + +func (s *signatures) Get(dgst digest.Digest) ([][]byte, error) { + panic("not implemented") +} + +func (s *signatures) Put(dgst digest.Digest, signatures ...[]byte) error { + panic("not implemented") +} + +type manifests struct { + *repository +} + +func (ms *manifests) Tags() ([]string, error) { + panic("not implemented") +} + +func (ms *manifests) Exists(dgst digest.Digest) (bool, error) { + return ms.ExistsByTag(dgst.String()) +} + +func (ms *manifests) ExistsByTag(tag string) (bool, error) { + u, err := ms.ub.BuildManifestURL(ms.name, tag) + if err != nil { + return false, err + } + + resp, err := ms.client.Head(u) + if err != nil { + return false, err + } + + switch { + case resp.StatusCode == http.StatusOK: + return true, nil + case resp.StatusCode == http.StatusNotFound: + return false, nil + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return false, parseHTTPErrorResponse(resp) + default: + return false, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (ms *manifests) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { + return ms.GetByTag(dgst.String()) +} + +func (ms *manifests) GetByTag(tag string) (*manifest.SignedManifest, error) { + u, err := ms.ub.BuildManifestURL(ms.name, tag) + if err != nil { + return nil, err + } + + resp, err := ms.client.Get(u) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusOK: + var sm manifest.SignedManifest + decoder := json.NewDecoder(resp.Body) + + if err := decoder.Decode(&sm); err != nil { + return nil, err + } + + return &sm, nil + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return nil, parseHTTPErrorResponse(resp) + default: + return nil, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (ms *manifests) Put(m *manifest.SignedManifest) error { + manifestURL, err := ms.ub.BuildManifestURL(ms.name, m.Tag) + if err != nil { + return err + } + + putRequest, err := http.NewRequest("PUT", manifestURL, bytes.NewReader(m.Raw)) + if err != nil { + return err + } + + resp, err := ms.client.Do(putRequest) + if err != nil { + return err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusAccepted: + // TODO(dmcgowan): Use or check digest header + return nil + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return parseHTTPErrorResponse(resp) + default: + return &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (ms *manifests) Delete(dgst digest.Digest) error { + u, err := ms.ub.BuildManifestURL(ms.name, dgst.String()) + if err != nil { + return err + } + req, err := http.NewRequest("DELETE", u, nil) + if err != nil { + return err + } + + resp, err := ms.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusOK: + return nil + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return parseHTTPErrorResponse(resp) + default: + return &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +type layers struct { + *repository +} + +func sanitizeLocation(location, source string) (string, error) { + locationURL, err := url.Parse(location) + if err != nil { + return "", err + } + + if locationURL.Scheme == "" { + sourceURL, err := url.Parse(source) + if err != nil { + return "", err + } + locationURL = &url.URL{ + Scheme: sourceURL.Scheme, + Host: sourceURL.Host, + Path: location, + } + location = locationURL.String() + } + return location, nil +} + +func (ls *layers) Exists(dgst digest.Digest) (bool, error) { + _, err := ls.fetchLayer(dgst) + if err != nil { + switch err := err.(type) { + case distribution.ErrUnknownLayer: + return false, nil + default: + return false, err + } + } + + return true, nil +} + +func (ls *layers) Fetch(dgst digest.Digest) (distribution.Layer, error) { + return ls.fetchLayer(dgst) +} + +func (ls *layers) Upload() (distribution.LayerUpload, error) { + u, err := ls.ub.BuildBlobUploadURL(ls.name) + + resp, err := ls.client.Post(u, "", nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusAccepted: + // TODO(dmcgowan): Check for invalid UUID + uuid := resp.Header.Get("Docker-Upload-UUID") + location, err := sanitizeLocation(resp.Header.Get("Location"), u) + if err != nil { + return nil, err + } + + return &httpLayerUpload{ + layers: ls, + uuid: uuid, + startedAt: time.Now(), + location: location, + }, nil + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return nil, parseHTTPErrorResponse(resp) + default: + return nil, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (ls *layers) Resume(uuid string) (distribution.LayerUpload, error) { + panic("not implemented") +} + +func (ls *layers) fetchLayer(dgst digest.Digest) (distribution.Layer, error) { + u, err := ls.ub.BuildBlobURL(ls.name, dgst) + if err != nil { + return nil, err + } + + resp, err := ls.client.Head(u) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusOK: + lengthHeader := resp.Header.Get("Content-Length") + length, err := strconv.ParseInt(lengthHeader, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing content-length: %v", err) + } + + var t time.Time + lastModified := resp.Header.Get("Last-Modified") + if lastModified != "" { + t, err = http.ParseTime(lastModified) + if err != nil { + return nil, fmt.Errorf("error parsing last-modified: %v", err) + } + } + + return &httpLayer{ + layers: ls, + size: length, + digest: dgst, + createdAt: t, + }, nil + case resp.StatusCode == http.StatusNotFound: + return nil, distribution.ErrUnknownLayer{ + FSLayer: manifest.FSLayer{ + BlobSum: dgst, + }, + } + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return nil, parseHTTPErrorResponse(resp) + default: + return nil, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +type httpLayer struct { + *layers + + size int64 + digest digest.Digest + createdAt time.Time + + rc io.ReadCloser // remote read closer + brd *bufio.Reader // internal buffered io + offset int64 + err error +} + +func (hl *httpLayer) CreatedAt() time.Time { + return hl.createdAt +} + +func (hl *httpLayer) Digest() digest.Digest { + return hl.digest +} + +func (hl *httpLayer) Read(p []byte) (n int, err error) { + if hl.err != nil { + return 0, hl.err + } + + rd, err := hl.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hl.offset += int64(n) + + // Simulate io.EOR error if we reach filesize. + if err == nil && hl.offset >= hl.size { + err = io.EOF + } + + return n, err +} + +func (hl *httpLayer) Seek(offset int64, whence int) (int64, error) { + if hl.err != nil { + return 0, hl.err + } + + var err error + newOffset := hl.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = hl.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else { + if hl.offset != newOffset { + hl.reset() + } + + // No problems, set the offset. + hl.offset = newOffset + } + + return hl.offset, err +} + +func (hl *httpLayer) Close() error { + if hl.err != nil { + return hl.err + } + + // close and release reader chain + if hl.rc != nil { + hl.rc.Close() + } + + hl.rc = nil + hl.brd = nil + + hl.err = fmt.Errorf("httpLayer: closed") + + return nil +} + +func (hl *httpLayer) reset() { + if hl.err != nil { + return + } + if hl.rc != nil { + hl.rc.Close() + hl.rc = nil + } +} + +func (hl *httpLayer) reader() (io.Reader, error) { + if hl.err != nil { + return nil, hl.err + } + + if hl.rc != nil { + return hl.brd, nil + } + + // If the offset is great than or equal to size, return a empty, noop reader. + if hl.offset >= hl.size { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + } + + blobURL, err := hl.ub.BuildBlobURL(hl.name, hl.digest) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", blobURL, nil) + if err != nil { + return nil, err + } + + if hl.offset > 0 { + // TODO(stevvooe): Get this working correctly. + + // If we are at different offset, issue a range request from there. + req.Header.Add("Range", fmt.Sprintf("1-")) + ctxu.GetLogger(hl.context).Infof("Range: %s", req.Header.Get("Range")) + } + + resp, err := hl.client.Do(req) + if err != nil { + return nil, err + } + + switch { + case resp.StatusCode == 200: + hl.rc = resp.Body + default: + defer resp.Body.Close() + return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) + } + + if hl.brd == nil { + hl.brd = bufio.NewReader(hl.rc) + } else { + hl.brd.Reset(hl.rc) + } + + return hl.brd, nil +} + +func (hl *httpLayer) Length() int64 { + return hl.size +} + +func (hl *httpLayer) Handler(r *http.Request) (http.Handler, error) { + panic("Not implemented") +} + +type httpLayerUpload struct { + *layers + + uuid string + startedAt time.Time + + location string // always the last value of the location header. + offset int64 + closed bool +} + +var _ distribution.LayerUpload = &httpLayerUpload{} + +func (hlu *httpLayerUpload) ReadFrom(r io.Reader) (n int64, err error) { + req, err := http.NewRequest("PATCH", hlu.location, r) + if err != nil { + return 0, err + } + defer req.Body.Close() + + resp, err := hlu.client.Do(req) + if err != nil { + return 0, err + } + + switch { + case resp.StatusCode == http.StatusAccepted: + // TODO(dmcgowan): Validate headers + hlu.uuid = resp.Header.Get("Docker-Upload-UUID") + hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) + if err != nil { + return 0, err + } + rng := resp.Header.Get("Range") + var start, end int64 + if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { + return 0, err + } else if n != 2 || end < start { + return 0, fmt.Errorf("bad range format: %s", rng) + } + + return (end - start + 1), nil + case resp.StatusCode == http.StatusNotFound: + return 0, &BlobUploadNotFoundError{Location: hlu.location} + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return 0, parseHTTPErrorResponse(resp) + default: + return 0, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (hlu *httpLayerUpload) Write(p []byte) (n int, err error) { + req, err := http.NewRequest("PATCH", hlu.location, bytes.NewReader(p)) + if err != nil { + return 0, err + } + req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hlu.offset, hlu.offset+int64(len(p)-1))) + req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p))) + req.Header.Set("Content-Type", "application/octet-stream") + + resp, err := hlu.client.Do(req) + if err != nil { + return 0, err + } + + switch { + case resp.StatusCode == http.StatusAccepted: + // TODO(dmcgowan): Validate headers + hlu.uuid = resp.Header.Get("Docker-Upload-UUID") + hlu.location, err = sanitizeLocation(resp.Header.Get("Location"), hlu.location) + if err != nil { + return 0, err + } + rng := resp.Header.Get("Range") + var start, end int + if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil { + return 0, err + } else if n != 2 || end < start { + return 0, fmt.Errorf("bad range format: %s", rng) + } + + return (end - start + 1), nil + case resp.StatusCode == http.StatusNotFound: + return 0, &BlobUploadNotFoundError{Location: hlu.location} + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return 0, parseHTTPErrorResponse(resp) + default: + return 0, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (hlu *httpLayerUpload) Seek(offset int64, whence int) (int64, error) { + newOffset := hlu.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + return newOffset, errors.New("Cannot seek from end on incomplete upload") + case os.SEEK_SET: + newOffset = int64(offset) + } + + hlu.offset = newOffset + + return hlu.offset, nil +} + +func (hlu *httpLayerUpload) UUID() string { + return hlu.uuid +} + +func (hlu *httpLayerUpload) StartedAt() time.Time { + return hlu.startedAt +} + +func (hlu *httpLayerUpload) Finish(digest digest.Digest) (distribution.Layer, error) { + // TODO(dmcgowan): Check if already finished, if so just fetch + req, err := http.NewRequest("PUT", hlu.location, nil) + if err != nil { + return nil, err + } + + values := req.URL.Query() + values.Set("digest", digest.String()) + req.URL.RawQuery = values.Encode() + + resp, err := hlu.client.Do(req) + if err != nil { + return nil, err + } + + switch { + case resp.StatusCode == http.StatusCreated: + return hlu.Layers().Fetch(digest) + case resp.StatusCode == http.StatusNotFound: + return nil, &BlobUploadNotFoundError{Location: hlu.location} + case resp.StatusCode >= 400 && resp.StatusCode < 500: + return nil, parseHTTPErrorResponse(resp) + default: + return nil, &UnexpectedHTTPStatusError{Status: resp.Status} + } +} + +func (hlu *httpLayerUpload) Cancel() error { + panic("not implemented") +} + +func (hlu *httpLayerUpload) Close() error { + hlu.closed = true + return nil +} diff --git a/registry/client/repository_test.go b/registry/client/repository_test.go new file mode 100644 index 00000000..67138db6 --- /dev/null +++ b/registry/client/repository_test.go @@ -0,0 +1,605 @@ +package client + +import ( + "bytes" + "crypto/rand" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "testing" + "time" + + "code.google.com/p/go-uuid/uuid" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/testutil" + "golang.org/x/net/context" +) + +func testServer(rrm testutil.RequestResponseMap) (*RepositoryEndpoint, func()) { + h := testutil.NewHandler(rrm) + s := httptest.NewServer(h) + e := RepositoryEndpoint{Endpoint: s.URL, Mirror: false} + return &e, s.Close +} + +func newRandomBlob(size int) (digest.Digest, []byte) { + b := make([]byte, size) + if n, err := rand.Read(b); err != nil { + panic(err) + } else if n != size { + panic("unable to read enough bytes") + } + + dgst, err := digest.FromBytes(b) + if err != nil { + panic(err) + } + + return dgst, b +} + +func addTestFetch(repo string, dgst digest.Digest, content []byte, m *testutil.RequestResponseMap) { + *m = append(*m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + repo + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: content, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(content))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + *m = append(*m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "HEAD", + Route: "/v2/" + repo + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(content))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) +} + +func addPing(m *testutil.RequestResponseMap) { + *m = append(*m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/", + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Docker-Distribution-API-Version": {"registry/2.0"}, + }), + }, + }) +} + +func TestLayerFetch(t *testing.T) { + d1, b1 := newRandomBlob(1024) + var m testutil.RequestResponseMap + addTestFetch("test.example.com/repo1", d1, b1, &m) + addPing(&m) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), "test.example.com/repo1", e) + if err != nil { + t.Fatal(err) + } + l := r.Layers() + + layer, err := l.Fetch(d1) + if err != nil { + t.Fatal(err) + } + b, err := ioutil.ReadAll(layer) + if err != nil { + t.Fatal(err) + } + if bytes.Compare(b, b1) != 0 { + t.Fatalf("Wrong bytes values fetched: [%d]byte != [%d]byte", len(b), len(b1)) + } + + // TODO(dmcgowan): Test error cases +} + +func TestLayerExists(t *testing.T) { + d1, b1 := newRandomBlob(1024) + var m testutil.RequestResponseMap + addTestFetch("test.example.com/repo1", d1, b1, &m) + addPing(&m) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), "test.example.com/repo1", e) + if err != nil { + t.Fatal(err) + } + l := r.Layers() + + ok, err := l.Exists(d1) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatalf("Blob does not exist: %s", d1) + } + + // TODO(dmcgowan): Test error cases +} + +func TestLayerUploadChunked(t *testing.T) { + dgst, b1 := newRandomBlob(1024) + var m testutil.RequestResponseMap + addPing(&m) + chunks := [][]byte{ + b1[0:256], + b1[256:512], + b1[512:513], + b1[513:1024], + } + repo := "test.example.com/uploadrepo" + uuids := []string{uuid.New()} + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "POST", + Route: "/v2/" + repo + "/blobs/uploads/", + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo + "/blobs/uploads/" + uuids[0]}, + "Docker-Upload-UUID": {uuids[0]}, + "Range": {"0-0"}, + }), + }, + }) + offset := 0 + for i, chunk := range chunks { + uuids = append(uuids, uuid.New()) + newOffset := offset + len(chunk) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PATCH", + Route: "/v2/" + repo + "/blobs/uploads/" + uuids[i], + Body: chunk, + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo + "/blobs/uploads/" + uuids[i+1]}, + "Docker-Upload-UUID": {uuids[i+1]}, + "Range": {fmt.Sprintf("%d-%d", offset, newOffset-1)}, + }), + }, + }) + offset = newOffset + } + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PUT", + Route: "/v2/" + repo + "/blobs/uploads/" + uuids[len(uuids)-1], + QueryParams: map[string][]string{ + "digest": {dgst.String()}, + }, + }, + Response: testutil.Response{ + StatusCode: http.StatusCreated, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Docker-Content-Digest": {dgst.String()}, + "Content-Range": {fmt.Sprintf("0-%d", offset-1)}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "HEAD", + Route: "/v2/" + repo + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(offset)}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + l := r.Layers() + + upload, err := l.Upload() + if err != nil { + t.Fatal(err) + } + + if upload.UUID() != uuids[0] { + log.Fatalf("Unexpected UUID %s; expected %s", upload.UUID(), uuids[0]) + } + + for _, chunk := range chunks { + n, err := upload.Write(chunk) + if err != nil { + t.Fatal(err) + } + if n != len(chunk) { + t.Fatalf("Unexpected length returned from write: %d; expected: %d", n, len(chunk)) + } + } + + layer, err := upload.Finish(dgst) + if err != nil { + t.Fatal(err) + } + + if layer.Length() != int64(len(b1)) { + t.Fatalf("Unexpected layer size: %d; expected: %d", layer.Length(), len(b1)) + } +} + +func TestLayerUploadMonolithic(t *testing.T) { + dgst, b1 := newRandomBlob(1024) + var m testutil.RequestResponseMap + addPing(&m) + repo := "test.example.com/uploadrepo" + uploadID := uuid.New() + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "POST", + Route: "/v2/" + repo + "/blobs/uploads/", + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Location": {"/v2/" + repo + "/blobs/uploads/" + uploadID}, + "Docker-Upload-UUID": {uploadID}, + "Range": {"0-0"}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PATCH", + Route: "/v2/" + repo + "/blobs/uploads/" + uploadID, + Body: b1, + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Location": {"/v2/" + repo + "/blobs/uploads/" + uploadID}, + "Docker-Upload-UUID": {uploadID}, + "Content-Length": {"0"}, + "Docker-Content-Digest": {dgst.String()}, + "Range": {fmt.Sprintf("0-%d", len(b1)-1)}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PUT", + Route: "/v2/" + repo + "/blobs/uploads/" + uploadID, + QueryParams: map[string][]string{ + "digest": {dgst.String()}, + }, + }, + Response: testutil.Response{ + StatusCode: http.StatusCreated, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Docker-Content-Digest": {dgst.String()}, + "Content-Range": {fmt.Sprintf("0-%d", len(b1)-1)}, + }), + }, + }) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "HEAD", + Route: "/v2/" + repo + "/blobs/" + dgst.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(b1))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + l := r.Layers() + + upload, err := l.Upload() + if err != nil { + t.Fatal(err) + } + + if upload.UUID() != uploadID { + log.Fatalf("Unexpected UUID %s; expected %s", upload.UUID(), uploadID) + } + + n, err := upload.ReadFrom(bytes.NewReader(b1)) + if err != nil { + t.Fatal(err) + } + if n != int64(len(b1)) { + t.Fatalf("Unexpected ReadFrom length: %d; expected: %d", n, len(b1)) + } + + layer, err := upload.Finish(dgst) + if err != nil { + t.Fatal(err) + } + + if layer.Length() != int64(len(b1)) { + t.Fatalf("Unexpected layer size: %d; expected: %d", layer.Length(), len(b1)) + } +} + +func TestLayerUploadResume(t *testing.T) { + // TODO(dmcgowan): implement +} + +func newRandomSchema1Manifest(name, tag string, blobCount int) (*manifest.SignedManifest, digest.Digest) { + blobs := make([]manifest.FSLayer, blobCount) + history := make([]manifest.History, blobCount) + + for i := 0; i < blobCount; i++ { + dgst, blob := newRandomBlob((i % 5) * 16) + + blobs[i] = manifest.FSLayer{BlobSum: dgst} + history[i] = manifest.History{V1Compatibility: fmt.Sprintf("{\"Hex\": \"%x\"}", blob)} + } + + m := &manifest.SignedManifest{ + Manifest: manifest.Manifest{ + Name: name, + Tag: tag, + Architecture: "x86", + FSLayers: blobs, + History: history, + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + }, + } + manifestBytes, err := json.Marshal(m) + if err != nil { + panic(err) + } + dgst, err := digest.FromBytes(manifestBytes) + if err != nil { + panic(err) + } + + m.Raw = manifestBytes + + return m, dgst +} + +func addTestManifest(repo, reference string, content []byte, m *testutil.RequestResponseMap) { + *m = append(*m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "GET", + Route: "/v2/" + repo + "/manifests/" + reference, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Body: content, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(content))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + *m = append(*m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "HEAD", + Route: "/v2/" + repo + "/manifests/" + reference, + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {fmt.Sprint(len(content))}, + "Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)}, + }), + }, + }) + +} + +func checkEqualManifest(m1, m2 *manifest.SignedManifest) error { + if m1.Name != m2.Name { + return fmt.Errorf("name does not match %q != %q", m1.Name, m2.Name) + } + if m1.Tag != m2.Tag { + return fmt.Errorf("tag does not match %q != %q", m1.Tag, m2.Tag) + } + if len(m1.FSLayers) != len(m2.FSLayers) { + return fmt.Errorf("fs layer length does not match %d != %d", len(m1.FSLayers), len(m2.FSLayers)) + } + for i := range m1.FSLayers { + if m1.FSLayers[i].BlobSum != m2.FSLayers[i].BlobSum { + return fmt.Errorf("blobsum does not match %q != %q", m1.FSLayers[i].BlobSum, m2.FSLayers[i].BlobSum) + } + } + if len(m1.History) != len(m2.History) { + return fmt.Errorf("history length does not match %d != %d", len(m1.History), len(m2.History)) + } + for i := range m1.History { + if m1.History[i].V1Compatibility != m2.History[i].V1Compatibility { + return fmt.Errorf("blobsum does not match %q != %q", m1.History[i].V1Compatibility, m2.History[i].V1Compatibility) + } + } + return nil +} + +func TestManifestFetch(t *testing.T) { + repo := "test.example.com/repo" + m1, dgst := newRandomSchema1Manifest(repo, "latest", 6) + var m testutil.RequestResponseMap + addPing(&m) + addTestManifest(repo, dgst.String(), m1.Raw, &m) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + ms := r.Manifests() + + ok, err := ms.Exists(dgst) + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("Manifest does not exist") + } + + manifest, err := ms.Get(dgst) + if err != nil { + t.Fatal(err) + } + if err := checkEqualManifest(manifest, m1); err != nil { + t.Fatal(err) + } +} + +func TestManifestFetchByTag(t *testing.T) { + repo := "test.example.com/repo/by/tag" + m1, _ := newRandomSchema1Manifest(repo, "latest", 6) + var m testutil.RequestResponseMap + addPing(&m) + addTestManifest(repo, "latest", m1.Raw, &m) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + + ms := r.Manifests() + ok, err := ms.ExistsByTag("latest") + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("Manifest does not exist") + } + + manifest, err := ms.GetByTag("latest") + if err != nil { + t.Fatal(err) + } + if err := checkEqualManifest(manifest, m1); err != nil { + t.Fatal(err) + } +} + +func TestManifestDelete(t *testing.T) { + repo := "test.example.com/repo/delete" + _, dgst1 := newRandomSchema1Manifest(repo, "latest", 6) + _, dgst2 := newRandomSchema1Manifest(repo, "latest", 6) + var m testutil.RequestResponseMap + addPing(&m) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "DELETE", + Route: "/v2/" + repo + "/manifests/" + dgst1.String(), + }, + Response: testutil.Response{ + StatusCode: http.StatusOK, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + + ms := r.Manifests() + if err := ms.Delete(dgst1); err != nil { + t.Fatal(err) + } + if err := ms.Delete(dgst2); err == nil { + t.Fatal("Expected error deleting unknown manifest") + } + // TODO(dmcgowan): Check for specific unknown error +} + +func TestManifestPut(t *testing.T) { + repo := "test.example.com/repo/delete" + m1, dgst := newRandomSchema1Manifest(repo, "other", 6) + var m testutil.RequestResponseMap + addPing(&m) + m = append(m, testutil.RequestResponseMapping{ + Request: testutil.Request{ + Method: "PUT", + Route: "/v2/" + repo + "/manifests/other", + Body: m1.Raw, + }, + Response: testutil.Response{ + StatusCode: http.StatusAccepted, + Headers: http.Header(map[string][]string{ + "Content-Length": {"0"}, + "Docker-Content-Digest": {dgst.String()}, + }), + }, + }) + + e, c := testServer(m) + defer c() + + r, err := NewRepositoryClient(context.Background(), repo, e) + if err != nil { + t.Fatal(err) + } + + ms := r.Manifests() + if err := ms.Put(m1); err != nil { + t.Fatal(err) + } + + // TODO(dmcgowan): Check for error cases +} diff --git a/registry/client/token.go b/registry/client/token.go new file mode 100644 index 00000000..6439e01e --- /dev/null +++ b/registry/client/token.go @@ -0,0 +1,78 @@ +package client + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strings" +) + +type tokenResponse struct { + Token string `json:"token"` +} + +func getToken(creds CredentialStore, params map[string]string, client *http.Client) (token string, err error) { + realm, ok := params["realm"] + if !ok { + return "", errors.New("no realm specified for token auth challenge") + } + + realmURL, err := url.Parse(realm) + if err != nil { + return "", fmt.Errorf("invalid token auth challenge realm: %s", err) + } + + // TODO(dmcgowan): Handle empty scheme + + req, err := http.NewRequest("GET", realmURL.String(), nil) + if err != nil { + return "", err + } + + reqParams := req.URL.Query() + service := params["service"] + scope := params["scope"] + + if service != "" { + reqParams.Add("service", service) + } + + for _, scopeField := range strings.Fields(scope) { + reqParams.Add("scope", scopeField) + } + + if creds != nil { + username, password := creds.Basic(realmURL) + if username != "" && password != "" { + reqParams.Add("account", username) + req.SetBasicAuth(username, password) + } + } + + req.URL.RawQuery = reqParams.Encode() + + resp, err := client.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("token auth attempt for registry: %s request failed with status: %d %s", req.URL, resp.StatusCode, http.StatusText(resp.StatusCode)) + } + + decoder := json.NewDecoder(resp.Body) + + tr := new(tokenResponse) + if err = decoder.Decode(tr); err != nil { + return "", fmt.Errorf("unable to decode token response: %s", err) + } + + if tr.Token == "" { + return "", errors.New("authorization server did not include a token in the response") + } + + return tr.Token, nil +}