From 71e7ac33cac7d71ffcf492e618cc3b7a139a7656 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 10 Feb 2015 17:41:09 -0800 Subject: [PATCH] Move storage package under registry package Signed-off-by: Stephen J Day --- docs/handlers/app.go | 4 +- docs/handlers/app_test.go | 2 +- docs/handlers/context.go | 2 +- docs/handlers/images.go | 2 +- docs/handlers/layer.go | 2 +- docs/handlers/layerupload.go | 2 +- docs/handlers/tags.go | 2 +- docs/storage/blobstore.go | 159 +++++++ docs/storage/cloudfrontlayerhandler.go | 122 ++++++ docs/storage/delegatelayerhandler.go | 94 ++++ docs/storage/doc.go | 3 + docs/storage/filereader.go | 201 +++++++++ docs/storage/filereader_test.go | 193 +++++++++ docs/storage/filewriter.go | 150 +++++++ docs/storage/filewriter_test.go | 148 +++++++ docs/storage/layer.go | 90 ++++ docs/storage/layer_test.go | 364 ++++++++++++++++ docs/storage/layerhandler.go | 50 +++ docs/storage/layerreader.go | 30 ++ docs/storage/layerstore.go | 168 +++++++ docs/storage/layerupload.go | 238 ++++++++++ docs/storage/manifeststore.go | 190 ++++++++ docs/storage/manifeststore_test.go | 233 ++++++++++ docs/storage/notifications/bridge.go | 156 +++++++ docs/storage/notifications/endpoint.go | 86 ++++ docs/storage/notifications/event.go | 154 +++++++ docs/storage/notifications/event_test.go | 145 +++++++ docs/storage/notifications/http.go | 145 +++++++ docs/storage/notifications/http_test.go | 155 +++++++ docs/storage/notifications/listener.go | 140 ++++++ docs/storage/notifications/listener_test.go | 153 +++++++ docs/storage/notifications/metrics.go | 152 +++++++ docs/storage/notifications/sinks.go | 337 ++++++++++++++ docs/storage/notifications/sinks_test.go | 223 ++++++++++ docs/storage/paths.go | 458 ++++++++++++++++++++ docs/storage/paths_test.go | 138 ++++++ docs/storage/registry.go | 80 ++++ docs/storage/revisionstore.go | 207 +++++++++ docs/storage/services.go | 84 ++++ docs/storage/tagstore.go | 157 +++++++ 40 files changed, 5411 insertions(+), 8 deletions(-) create mode 100644 docs/storage/blobstore.go create mode 100644 docs/storage/cloudfrontlayerhandler.go create mode 100644 docs/storage/delegatelayerhandler.go create mode 100644 docs/storage/doc.go create mode 100644 docs/storage/filereader.go create mode 100644 docs/storage/filereader_test.go create mode 100644 docs/storage/filewriter.go create mode 100644 docs/storage/filewriter_test.go create mode 100644 docs/storage/layer.go create mode 100644 docs/storage/layer_test.go create mode 100644 docs/storage/layerhandler.go create mode 100644 docs/storage/layerreader.go create mode 100644 docs/storage/layerstore.go create mode 100644 docs/storage/layerupload.go create mode 100644 docs/storage/manifeststore.go create mode 100644 docs/storage/manifeststore_test.go create mode 100644 docs/storage/notifications/bridge.go create mode 100644 docs/storage/notifications/endpoint.go create mode 100644 docs/storage/notifications/event.go create mode 100644 docs/storage/notifications/event_test.go create mode 100644 docs/storage/notifications/http.go create mode 100644 docs/storage/notifications/http_test.go create mode 100644 docs/storage/notifications/listener.go create mode 100644 docs/storage/notifications/listener_test.go create mode 100644 docs/storage/notifications/metrics.go create mode 100644 docs/storage/notifications/sinks.go create mode 100644 docs/storage/notifications/sinks_test.go create mode 100644 docs/storage/paths.go create mode 100644 docs/storage/paths_test.go create mode 100644 docs/storage/registry.go create mode 100644 docs/storage/revisionstore.go create mode 100644 docs/storage/services.go create mode 100644 docs/storage/tagstore.go diff --git a/docs/handlers/app.go b/docs/handlers/app.go index 5f433e95..09c0c621 100644 --- a/docs/handlers/app.go +++ b/docs/handlers/app.go @@ -11,8 +11,8 @@ import ( "github.com/docker/distribution/registry/auth" "github.com/docker/distribution/configuration" ctxu "github.com/docker/distribution/context" - "github.com/docker/distribution/storage" - "github.com/docker/distribution/storage/notifications" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/notifications" "github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver/factory" "github.com/gorilla/mux" diff --git a/docs/handlers/app_test.go b/docs/handlers/app_test.go index 8da285a3..b27c788a 100644 --- a/docs/handlers/app_test.go +++ b/docs/handlers/app_test.go @@ -10,7 +10,7 @@ import ( "github.com/docker/distribution/registry/api/v2" _ "github.com/docker/distribution/registry/auth/silly" "github.com/docker/distribution/configuration" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "github.com/docker/distribution/storagedriver/inmemory" "golang.org/x/net/context" ) diff --git a/docs/handlers/context.go b/docs/handlers/context.go index c940d8f4..a49253ee 100644 --- a/docs/handlers/context.go +++ b/docs/handlers/context.go @@ -7,7 +7,7 @@ import ( "github.com/docker/distribution/registry/api/v2" ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "golang.org/x/net/context" ) diff --git a/docs/handlers/images.go b/docs/handlers/images.go index 9c2dfa43..6a0e9a40 100644 --- a/docs/handlers/images.go +++ b/docs/handlers/images.go @@ -9,7 +9,7 @@ import ( ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) diff --git a/docs/handlers/layer.go b/docs/handlers/layer.go index 73641dde..8214fbf0 100644 --- a/docs/handlers/layer.go +++ b/docs/handlers/layer.go @@ -6,7 +6,7 @@ import ( "github.com/docker/distribution/registry/api/v2" ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) diff --git a/docs/handlers/layerupload.go b/docs/handlers/layerupload.go index 190cf86b..83ef6fb6 100644 --- a/docs/handlers/layerupload.go +++ b/docs/handlers/layerupload.go @@ -10,7 +10,7 @@ import ( "github.com/docker/distribution/registry/api/v2" ctxu "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) diff --git a/docs/handlers/tags.go b/docs/handlers/tags.go index 1e8e43d5..0a764693 100644 --- a/docs/handlers/tags.go +++ b/docs/handlers/tags.go @@ -5,7 +5,7 @@ import ( "net/http" "github.com/docker/distribution/registry/api/v2" - "github.com/docker/distribution/storage" + "github.com/docker/distribution/registry/storage" "github.com/gorilla/handlers" ) diff --git a/docs/storage/blobstore.go b/docs/storage/blobstore.go new file mode 100644 index 00000000..ac123f44 --- /dev/null +++ b/docs/storage/blobstore.go @@ -0,0 +1,159 @@ +package storage + +import ( + "fmt" + + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver" + "golang.org/x/net/context" +) + +// TODO(stevvooe): Currently, the blobStore implementation used by the +// manifest store. The layer store should be refactored to better leverage the +// blobStore, reducing duplicated code. + +// blobStore implements a generalized blob store over a driver, supporting the +// read side and link management. This object is intentionally a leaky +// abstraction, providing utility methods that support creating and traversing +// backend links. +type blobStore struct { + *registry + ctx context.Context +} + +// exists reports whether or not the path exists. If the driver returns error +// other than storagedriver.PathNotFound, an error may be returned. +func (bs *blobStore) exists(dgst digest.Digest) (bool, error) { + path, err := bs.path(dgst) + + if err != nil { + return false, err + } + + ok, err := exists(bs.driver, path) + if err != nil { + return false, err + } + + return ok, nil +} + +// get retrieves the blob by digest, returning it a byte slice. This should +// only be used for small objects. +func (bs *blobStore) get(dgst digest.Digest) ([]byte, error) { + bp, err := bs.path(dgst) + if err != nil { + return nil, err + } + + return bs.driver.GetContent(bp) +} + +// link links the path to the provided digest by writing the digest into the +// target file. +func (bs *blobStore) link(path string, dgst digest.Digest) error { + if exists, err := bs.exists(dgst); err != nil { + return err + } else if !exists { + return fmt.Errorf("cannot link non-existent blob") + } + + // The contents of the "link" file are the exact string contents of the + // digest, which is specified in that package. + return bs.driver.PutContent(path, []byte(dgst)) +} + +// linked reads the link at path and returns the content. +func (bs *blobStore) linked(path string) ([]byte, error) { + linked, err := bs.readlink(path) + if err != nil { + return nil, err + } + + return bs.get(linked) +} + +// readlink returns the linked digest at path. +func (bs *blobStore) readlink(path string) (digest.Digest, error) { + content, err := bs.driver.GetContent(path) + if err != nil { + return "", err + } + + linked, err := digest.ParseDigest(string(content)) + if err != nil { + return "", err + } + + if exists, err := bs.exists(linked); err != nil { + return "", err + } else if !exists { + return "", fmt.Errorf("link %q invalid: blob %s does not exist", path, linked) + } + + return linked, nil +} + +// resolve reads the digest link at path and returns the blob store link. +func (bs *blobStore) resolve(path string) (string, error) { + dgst, err := bs.readlink(path) + if err != nil { + return "", err + } + + return bs.path(dgst) +} + +// put stores the content p in the blob store, calculating the digest. If the +// content is already present, only the digest will be returned. This should +// only be used for small objects, such as manifests. +func (bs *blobStore) put(p []byte) (digest.Digest, error) { + dgst, err := digest.FromBytes(p) + if err != nil { + ctxu.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p)) + return "", err + } + + bp, err := bs.path(dgst) + if err != nil { + return "", err + } + + // If the content already exists, just return the digest. + if exists, err := bs.exists(dgst); err != nil { + return "", err + } else if exists { + return dgst, nil + } + + return dgst, bs.driver.PutContent(bp, p) +} + +// path returns the canonical path for the blob identified by digest. The blob +// may or may not exist. +func (bs *blobStore) path(dgst digest.Digest) (string, error) { + bp, err := bs.pm.path(blobDataPathSpec{ + digest: dgst, + }) + + if err != nil { + return "", err + } + + return bp, nil +} + +// exists provides a utility method to test whether or not +func exists(driver storagedriver.StorageDriver, path string) (bool, error) { + if _, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return false, nil + default: + return false, err + } + } + + return true, nil +} diff --git a/docs/storage/cloudfrontlayerhandler.go b/docs/storage/cloudfrontlayerhandler.go new file mode 100644 index 00000000..fa420cc7 --- /dev/null +++ b/docs/storage/cloudfrontlayerhandler.go @@ -0,0 +1,122 @@ +package storage + +import ( + "crypto/x509" + "encoding/pem" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/AdRoll/goamz/cloudfront" + "github.com/docker/distribution/storagedriver" +) + +// cloudFrontLayerHandler provides an simple implementation of layerHandler that +// constructs temporary signed CloudFront URLs from the storagedriver layer URL, +// then issues HTTP Temporary Redirects to this CloudFront content URL. +type cloudFrontLayerHandler struct { + cloudfront *cloudfront.CloudFront + delegateLayerHandler *delegateLayerHandler + duration time.Duration +} + +var _ LayerHandler = &cloudFrontLayerHandler{} + +// newCloudFrontLayerHandler constructs and returns a new CloudFront +// LayerHandler implementation. +// Required options: baseurl, privatekey, keypairid +func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { + base, ok := options["baseurl"] + if !ok { + return nil, fmt.Errorf("No baseurl provided") + } + baseURL, ok := base.(string) + if !ok { + return nil, fmt.Errorf("baseurl must be a string") + } + pk, ok := options["privatekey"] + if !ok { + return nil, fmt.Errorf("No privatekey provided") + } + pkPath, ok := pk.(string) + if !ok { + return nil, fmt.Errorf("privatekey must be a string") + } + kpid, ok := options["keypairid"] + if !ok { + return nil, fmt.Errorf("No keypairid provided") + } + keypairID, ok := kpid.(string) + if !ok { + return nil, fmt.Errorf("keypairid must be a string") + } + + pkBytes, err := ioutil.ReadFile(pkPath) + if err != nil { + return nil, fmt.Errorf("Failed to read privatekey file: %s", err) + } + + block, _ := pem.Decode([]byte(pkBytes)) + if block == nil { + return nil, fmt.Errorf("Failed to decode private key as an rsa private key") + } + privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes) + if err != nil { + return nil, err + } + + lh, err := newDelegateLayerHandler(storageDriver, options) + if err != nil { + return nil, err + } + dlh := lh.(*delegateLayerHandler) + + cf := cloudfront.New(baseURL, privateKey, keypairID) + + duration := 20 * time.Minute + d, ok := options["duration"] + if ok { + switch d := d.(type) { + case time.Duration: + duration = d + case string: + dur, err := time.ParseDuration(d) + if err != nil { + return nil, fmt.Errorf("Invalid duration: %s", err) + } + duration = dur + } + } + + return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil +} + +// Resolve returns an http.Handler which can serve the contents of the given +// Layer, or an error if not supported by the storagedriver. +func (lh *cloudFrontLayerHandler) Resolve(layer Layer) (http.Handler, error) { + layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil) + if err != nil { + return nil, err + } + + layerURL, err := url.Parse(layerURLStr) + if err != nil { + return nil, err + } + + cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration)) + if err != nil { + return nil, err + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect) + }), nil +} + +// init registers the cloudfront layerHandler backend. +func init() { + RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler)) +} diff --git a/docs/storage/delegatelayerhandler.go b/docs/storage/delegatelayerhandler.go new file mode 100644 index 00000000..7ed6d87b --- /dev/null +++ b/docs/storage/delegatelayerhandler.go @@ -0,0 +1,94 @@ +package storage + +import ( + "fmt" + "net/http" + "time" + + "github.com/docker/distribution/storagedriver" +) + +// delegateLayerHandler provides a simple implementation of layerHandler that +// simply issues HTTP Temporary Redirects to the URL provided by the +// storagedriver for a given Layer. +type delegateLayerHandler struct { + storageDriver storagedriver.StorageDriver + pathMapper *pathMapper + duration time.Duration +} + +var _ LayerHandler = &delegateLayerHandler{} + +func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) { + duration := 20 * time.Minute + d, ok := options["duration"] + if ok { + switch d := d.(type) { + case time.Duration: + duration = d + case string: + dur, err := time.ParseDuration(d) + if err != nil { + return nil, fmt.Errorf("Invalid duration: %s", err) + } + duration = dur + } + } + + return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil +} + +// Resolve returns an http.Handler which can serve the contents of the given +// Layer, or an error if not supported by the storagedriver. +func (lh *delegateLayerHandler) Resolve(layer Layer) (http.Handler, error) { + // TODO(bbland): This is just a sanity check to ensure that the + // storagedriver supports url generation. It would be nice if we didn't have + // to do this twice for non-GET requests. + layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"}) + if err != nil { + return nil, err + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method}) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect) + }), nil +} + +// urlFor returns a download URL for the given layer, or the empty string if +// unsupported. +func (lh *delegateLayerHandler) urlFor(layer Layer, options map[string]interface{}) (string, error) { + // Crack open the layer to get at the layerStore + layerRd, ok := layer.(*layerReader) + if !ok { + // TODO(stevvooe): We probably want to find a better way to get at the + // underlying filesystem path for a given layer. Perhaps, the layer + // handler should have its own layer store but right now, it is not + // request scoped. + return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer) + } + + if options == nil { + options = make(map[string]interface{}) + } + options["expiry"] = time.Now().Add(lh.duration) + + layerURL, err := lh.storageDriver.URLFor(layerRd.path, options) + if err != nil { + return "", err + } + + return layerURL, nil +} + +// init registers the delegate layerHandler backend. +func init() { + RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler)) +} diff --git a/docs/storage/doc.go b/docs/storage/doc.go new file mode 100644 index 00000000..387d9234 --- /dev/null +++ b/docs/storage/doc.go @@ -0,0 +1,3 @@ +// Package storage contains storage services for use in the registry +// application. It should be considered an internal package, as of Go 1.4. +package storage diff --git a/docs/storage/filereader.go b/docs/storage/filereader.go new file mode 100644 index 00000000..9bc09afe --- /dev/null +++ b/docs/storage/filereader.go @@ -0,0 +1,201 @@ +package storage + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "time" + + "github.com/docker/distribution/storagedriver" +) + +// TODO(stevvooe): Set an optimal buffer size here. We'll have to +// understand the latency characteristics of the underlying network to +// set this correctly, so we may want to leave it to the driver. For +// out of process drivers, we'll have to optimize this buffer size for +// local communication. +const fileReaderBufferSize = 4 << 20 + +// remoteFileReader provides a read seeker interface to files stored in +// storagedriver. Used to implement part of layer interface and will be used +// to implement read side of LayerUpload. +type fileReader struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + size int64 // size is the total layer size, must be set. + modtime time.Time + + // mutable fields + rc io.ReadCloser // remote read closer + brd *bufio.Reader // internal buffered io + offset int64 // offset is the current read offset + err error // terminal error, if set, reader is closed +} + +// newFileReader initializes a file reader for the remote file. The read takes +// on the offset and size at the time the reader is created. If the underlying +// file changes, one must create a new fileReader. +func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { + rd := &fileReader{ + driver: driver, + path: path, + } + + // Grab the size of the layer file, ensuring existence. + if fi, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // NOTE(stevvooe): We really don't care if the file is not + // actually present for the reader. If the caller needs to know + // whether or not the file exists, they should issue a stat call + // on the path. There is still no guarantee, since the file may be + // gone by the time the reader is created. The only correct + // behavior is to return a reader that immediately returns EOF. + default: + // Any other error we want propagated up the stack. + return nil, err + } + } else { + if fi.IsDir() { + return nil, fmt.Errorf("cannot read a directory") + } + + // Fill in file information + rd.size = fi.Size() + rd.modtime = fi.ModTime() + } + + return rd, nil +} + +func (fr *fileReader) Read(p []byte) (n int, err error) { + if fr.err != nil { + return 0, fr.err + } + + rd, err := fr.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + fr.offset += int64(n) + + // Simulate io.EOR error if we reach filesize. + if err == nil && fr.offset >= fr.size { + err = io.EOF + } + + return n, err +} + +func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { + if fr.err != nil { + return 0, fr.err + } + + var err error + newOffset := fr.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fr.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else { + if fr.offset != newOffset { + fr.reset() + } + + // No problems, set the offset. + fr.offset = newOffset + } + + return fr.offset, err +} + +// Close the layer. Should be called when the resource is no longer needed. +func (fr *fileReader) Close() error { + if fr.err != nil { + return fr.err + } + + fr.err = ErrLayerClosed + + // close and release reader chain + if fr.rc != nil { + fr.rc.Close() + } + + fr.rc = nil + fr.brd = nil + + return fr.err +} + +// reader prepares the current reader at the lrs offset, ensuring its buffered +// and ready to go. +func (fr *fileReader) reader() (io.Reader, error) { + if fr.err != nil { + return nil, fr.err + } + + if fr.rc != nil { + return fr.brd, nil + } + + // If we don't have a reader, open one up. + rc, err := fr.driver.ReadStream(fr.path, fr.offset) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // NOTE(stevvooe): If the path is not found, we simply return a + // reader that returns io.EOF. However, we do not set fr.rc, + // allowing future attempts at getting a reader to possibly + // succeed if the file turns up later. + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + default: + return nil, err + } + } + + fr.rc = rc + + if fr.brd == nil { + // TODO(stevvooe): Set an optimal buffer size here. We'll have to + // understand the latency characteristics of the underlying network to + // set this correctly, so we may want to leave it to the driver. For + // out of process drivers, we'll have to optimize this buffer size for + // local communication. + fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize) + } else { + fr.brd.Reset(fr.rc) + } + + return fr.brd, nil +} + +// resetReader resets the reader, forcing the read method to open up a new +// connection and rebuild the buffered reader. This should be called when the +// offset and the reader will become out of sync, such as during a seek +// operation. +func (fr *fileReader) reset() { + if fr.err != nil { + return + } + if fr.rc != nil { + fr.rc.Close() + fr.rc = nil + } +} diff --git a/docs/storage/filereader_test.go b/docs/storage/filereader_test.go new file mode 100644 index 00000000..53dd6c9a --- /dev/null +++ b/docs/storage/filereader_test.go @@ -0,0 +1,193 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + mrand "math/rand" + "os" + "testing" + + "github.com/docker/distribution/digest" + + "github.com/docker/distribution/storagedriver/inmemory" +) + +func TestSimpleRead(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("error allocating file reader: %v", err) + } + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify read data") + } +} + +func TestFileReaderSeek(t *testing.T) { + driver := inmemory.New() + pattern := "01234567890ab" // prime length block + repititions := 1024 + path := "/patterned" + content := bytes.Repeat([]byte(pattern), repititions) + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + + if err != nil { + t.Fatalf("unexpected error creating file reader: %v", err) + } + + // Seek all over the place, in blocks of pattern size and make sure we get + // the right data. + for _, repitition := range mrand.Perm(repititions - 1) { + targetOffset := int64(len(pattern) * repitition) + // Seek to a multiple of pattern size and read pattern size bytes + offset, err := fr.Seek(targetOffset, os.SEEK_SET) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if offset != targetOffset { + t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset) + } + + p := make([]byte, len(pattern)) + + n, err := fr.Read(p) + if err != nil { + t.Fatalf("error reading pattern: %v", err) + } + + if n != len(pattern) { + t.Fatalf("incorrect read length: %d != %d", n, len(pattern)) + } + + if string(p) != pattern { + t.Fatalf("incorrect read content: %q != %q", p, pattern) + } + + // Check offset + current, err := fr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if current != targetOffset+int64(len(pattern)) { + t.Fatalf("unexpected offset after read: %v", err) + } + } + + start, err := fr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error seeking to start: %v", err) + } + + if start != 0 { + t.Fatalf("expected to seek to start: %v != 0", start) + } + + end, err := fr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("expected to seek to end: %v != %v", end, len(content)) + } + + // 4. Seek before start, ensure error. + + // seek before start + before, err := fr.Seek(-1, os.SEEK_SET) + if err == nil { + t.Fatalf("error expected, returned offset=%v", before) + } + + // 5. Seek after end, + after, err := fr.Seek(1, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error expected, returned offset=%v", after) + } + + p := make([]byte, 16) + n, err := fr.Read(p) + + if n != 0 { + t.Fatalf("bytes reads %d != %d", n, 0) + } + + if err != io.EOF { + t.Fatalf("expected io.EOF, got %v", err) + } +} + +// TestFileReaderNonExistentFile ensures the reader behaves as expected with a +// missing or zero-length remote file. While the file may not exist, the +// reader should not error out on creation and should return 0-bytes from the +// read method, with an io.EOF error. +func TestFileReaderNonExistentFile(t *testing.T) { + driver := inmemory.New() + fr, err := newFileReader(driver, "/doesnotexist") + if err != nil { + t.Fatalf("unexpected error initializing reader: %v", err) + } + + var buf [1024]byte + + n, err := fr.Read(buf[:]) + if n != 0 { + t.Fatalf("non-zero byte read reported: %d != 0", n) + } + + if err != io.EOF { + t.Fatalf("read on missing file should return io.EOF, got %v", err) + } +} + +// TestLayerReadErrors covers the various error return type for different +// conditions that can arise when reading a layer. +func TestFileReaderErrors(t *testing.T) { + // TODO(stevvooe): We need to cover error return types, driven by the + // errors returned via the HTTP API. For now, here is a incomplete list: + // + // 1. Layer Not Found: returned when layer is not found or access is + // denied. + // 2. Layer Unavailable: returned when link references are unresolved, + // but layer is known to the registry. + // 3. Layer Invalid: This may more split into more errors, but should be + // returned when name or tarsum does not reference a valid error. We + // may also need something to communication layer verification errors + // for the inline tarsum check. + // 4. Timeout: timeouts to backend. Need to better understand these + // failure cases and how the storage driver propagates these errors + // up the stack. +} diff --git a/docs/storage/filewriter.go b/docs/storage/filewriter.go new file mode 100644 index 00000000..5037f160 --- /dev/null +++ b/docs/storage/filewriter.go @@ -0,0 +1,150 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "os" + + "github.com/docker/distribution/storagedriver" +) + +// fileWriter implements a remote file writer backed by a storage driver. +type fileWriter struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + + // mutable fields + size int64 // size of the file, aka the current end + offset int64 // offset is the current write offset + err error // terminal error, if set, reader is closed +} + +// fileWriterInterface makes the desired io compliant interface that the +// filewriter should implement. +type fileWriterInterface interface { + io.WriteSeeker + io.WriterAt + io.ReaderFrom + io.Closer +} + +var _ fileWriterInterface = &fileWriter{} + +// newFileWriter returns a prepared fileWriter for the driver and path. This +// could be considered similar to an "open" call on a regular filesystem. +func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { + fw := fileWriter{ + driver: driver, + path: path, + } + + if fi, err := driver.Stat(path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // ignore, offset is zero + default: + return nil, err + } + } else { + if fi.IsDir() { + return nil, fmt.Errorf("cannot write to a directory") + } + + fw.size = fi.Size() + } + + return &fw, nil +} + +// Write writes the buffer p at the current write offset. +func (fw *fileWriter) Write(p []byte) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), -1) + return int(nn), err +} + +// WriteAt writes p at the specified offset. The underlying offset does not +// change. +func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) { + nn, err := fw.readFromAt(bytes.NewReader(p), offset) + return int(nn), err +} + +// ReadFrom reads reader r until io.EOF writing the contents at the current +// offset. +func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) { + return fw.readFromAt(r, -1) +} + +// Seek moves the write position do the requested offest based on the whence +// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET. +func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { + if fw.err != nil { + return 0, fw.err + } + + var err error + newOffset := fw.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fw.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else { + // No problems, set the offset. + fw.offset = newOffset + } + + return fw.offset, err +} + +// Close closes the fileWriter for writing. +func (fw *fileWriter) Close() error { + if fw.err != nil { + return fw.err + } + + fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) + + return fw.err +} + +// readFromAt writes to fw from r at the specified offset. If offset is less +// than zero, the value of fw.offset is used and updated after the operation. +func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) { + if fw.err != nil { + return 0, fw.err + } + + var updateOffset bool + if offset < 0 { + offset = fw.offset + updateOffset = true + } + + nn, err := fw.driver.WriteStream(fw.path, offset, r) + + if updateOffset { + // We should forward the offset, whether or not there was an error. + // Basically, we keep the filewriter in sync with the reader's head. If an + // error is encountered, the whole thing should be retried but we proceed + // from an expected offset, even if the data didn't make it to the + // backend. + fw.offset += nn + + if fw.offset > fw.size { + fw.size = fw.offset + } + } + + return nn, err +} diff --git a/docs/storage/filewriter_test.go b/docs/storage/filewriter_test.go new file mode 100644 index 00000000..2235462f --- /dev/null +++ b/docs/storage/filewriter_test.go @@ -0,0 +1,148 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + "os" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver/inmemory" +) + +// TestSimpleWrite takes the fileWriter through common write operations +// ensuring data integrity. +func TestSimpleWrite(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + fw, err := newFileWriter(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + n, err = fw.Write(content) + if err != nil { + t.Fatalf("unexpected error writing content: %v", err) + } + + if n != len(content) { + t.Fatalf("unexpected write length: %d != %d", n, len(content)) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check the seek position is equal to the content length + end, err := fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Double the content, but use the WriteAt method + doubled := append(content, content...) + doubledgst, err := digest.FromReader(bytes.NewReader(doubled)) + if err != nil { + t.Fatalf("unexpected error digesting doubled content: %v", err) + } + + n, err = fw.WriteAt(content, end) + if err != nil { + t.Fatalf("unexpected error writing content at %d: %v", end, err) + } + + if n != len(content) { + t.Fatalf("writeat was short: %d != %d", n, len(content)) + } + + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } + + // Check that WriteAt didn't update the offset. + end, err = fw.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("write did not advance offset: %d != %d", end, len(content)) + } + + // Now, we copy from one path to another, running the data through the + // fileReader to fileWriter, rather than the driver.Move command to ensure + // everything is working correctly. + fr, err = newFileReader(driver, path) + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + fw, err = newFileWriter(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileWriter: %v", err) + } + defer fw.Close() + + nn, err := io.Copy(fw, fr) + if err != nil { + t.Fatalf("unexpected error copying data: %v", err) + } + + if nn != int64(len(doubled)) { + t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) + } + + fr, err = newFileReader(driver, "/copied") + if err != nil { + t.Fatalf("unexpected error creating fileReader: %v", err) + } + defer fr.Close() + + verifier = digest.NewDigestVerifier(doubledgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify write data") + } +} diff --git a/docs/storage/layer.go b/docs/storage/layer.go new file mode 100644 index 00000000..5e12f43e --- /dev/null +++ b/docs/storage/layer.go @@ -0,0 +1,90 @@ +package storage + +import ( + "fmt" + "io" + "time" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" +) + +// Layer provides a readable and seekable layer object. Typically, +// implementations are *not* goroutine safe. +type Layer interface { + // http.ServeContent requires an efficient implementation of + // ReadSeeker.Seek(0, os.SEEK_END). + io.ReadSeeker + io.Closer + + // Name returns the repository under which this layer is linked. + Name() string // TODO(stevvooe): struggling with nomenclature: should this be "repo" or "name"? + + // Digest returns the unique digest of the blob, which is the tarsum for + // layers. + Digest() digest.Digest + + // CreatedAt returns the time this layer was created. + CreatedAt() time.Time +} + +// LayerUpload provides a handle for working with in-progress uploads. +// Instances can be obtained from the LayerService.Upload and +// LayerService.Resume. +type LayerUpload interface { + io.WriteSeeker + io.ReaderFrom + io.Closer + + // Name of the repository under which the layer will be linked. + Name() string + + // UUID returns the identifier for this upload. + UUID() string + + // StartedAt returns the time this layer upload was started. + StartedAt() time.Time + + // Finish marks the upload as completed, returning a valid handle to the + // uploaded layer. The digest is validated against the contents of the + // uploaded layer. + Finish(digest digest.Digest) (Layer, error) + + // Cancel the layer upload process. + Cancel() error +} + +var ( + // ErrLayerExists returned when layer already exists + ErrLayerExists = fmt.Errorf("layer exists") + + // ErrLayerTarSumVersionUnsupported when tarsum is unsupported version. + ErrLayerTarSumVersionUnsupported = fmt.Errorf("unsupported tarsum version") + + // ErrLayerUploadUnknown returned when upload is not found. + ErrLayerUploadUnknown = fmt.Errorf("layer upload unknown") + + // ErrLayerClosed returned when an operation is attempted on a closed + // Layer or LayerUpload. + ErrLayerClosed = fmt.Errorf("layer closed") +) + +// ErrUnknownLayer returned when layer cannot be found. +type ErrUnknownLayer struct { + FSLayer manifest.FSLayer +} + +func (err ErrUnknownLayer) Error() string { + return fmt.Sprintf("unknown layer %v", err.FSLayer.BlobSum) +} + +// ErrLayerInvalidDigest returned when tarsum check fails. +type ErrLayerInvalidDigest struct { + Digest digest.Digest + Reason error +} + +func (err ErrLayerInvalidDigest) Error() string { + return fmt.Sprintf("invalid digest for referenced layer: %v, %v", + err.Digest, err.Reason) +} diff --git a/docs/storage/layer_test.go b/docs/storage/layer_test.go new file mode 100644 index 00000000..c7d64b79 --- /dev/null +++ b/docs/storage/layer_test.go @@ -0,0 +1,364 @@ +package storage + +import ( + "bytes" + "crypto/sha256" + "fmt" + "io" + "io/ioutil" + "os" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "golang.org/x/net/context" +) + +// TestSimpleLayerUpload covers the layer upload process, exercising common +// error paths that might be seen during an upload. +func TestSimpleLayerUpload(t *testing.T) { + randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile() + + if err != nil { + t.Fatalf("error creating random reader: %v", err) + } + + dgst := digest.Digest(tarSumStr) + + if err != nil { + t.Fatalf("error allocating upload store: %v", err) + } + + ctx := context.Background() + imageName := "foo/bar" + driver := inmemory.New() + registry := NewRegistryWithDriver(driver) + ls := registry.Repository(ctx, imageName).Layers() + + h := sha256.New() + rd := io.TeeReader(randomDataReader, h) + + layerUpload, err := ls.Upload() + + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + + // Cancel the upload then restart it + if err := layerUpload.Cancel(); err != nil { + t.Fatalf("unexpected error during upload cancellation: %v", err) + } + + // Do a resume, get unknown upload + layerUpload, err = ls.Resume(layerUpload.UUID()) + if err != ErrLayerUploadUnknown { + t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) + } + + // Restart! + layerUpload, err = ls.Upload() + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + + // Get the size of our random tarfile + randomDataSize, err := seekerSize(randomDataReader) + if err != nil { + t.Fatalf("error getting seeker size of random data: %v", err) + } + + nn, err := io.Copy(layerUpload, rd) + if err != nil { + t.Fatalf("unexpected error uploading layer data: %v", err) + } + + if nn != randomDataSize { + t.Fatalf("layer data write incomplete") + } + + offset, err := layerUpload.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("unexpected error seeking layer upload: %v", err) + } + + if offset != nn { + t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn) + } + layerUpload.Close() + + // Do a resume, for good fun + layerUpload, err = ls.Resume(layerUpload.UUID()) + if err != nil { + t.Fatalf("unexpected error resuming upload: %v", err) + } + + sha256Digest := digest.NewDigest("sha256", h) + layer, err := layerUpload.Finish(dgst) + + if err != nil { + t.Fatalf("unexpected error finishing layer upload: %v", err) + } + + // After finishing an upload, it should no longer exist. + if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown { + t.Fatalf("expected layer upload to be unknown, got %v", err) + } + + // Test for existence. + exists, err := ls.Exists(layer.Digest()) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v", err) + } + + if !exists { + t.Fatalf("layer should now exist") + } + + h.Reset() + nn, err = io.Copy(h, layer) + if err != nil { + t.Fatalf("error reading layer: %v", err) + } + + if nn != randomDataSize { + t.Fatalf("incorrect read length") + } + + if digest.NewDigest("sha256", h) != sha256Digest { + t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest) + } +} + +// TestSimpleLayerRead just creates a simple layer file and ensures that basic +// open, read, seek, read works. More specific edge cases should be covered in +// other tests. +func TestSimpleLayerRead(t *testing.T) { + ctx := context.Background() + imageName := "foo/bar" + driver := inmemory.New() + registry := NewRegistryWithDriver(driver) + ls := registry.Repository(ctx, imageName).Layers() + + randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating random data: %v", err) + } + + dgst := digest.Digest(tarSumStr) + + // Test for existence. + exists, err := ls.Exists(dgst) + if err != nil { + t.Fatalf("unexpected error checking for existence: %v", err) + } + + if exists { + t.Fatalf("layer should not exist") + } + + // Try to get the layer and make sure we get a not found error + layer, err := ls.Fetch(dgst) + if err == nil { + t.Fatalf("error expected fetching unknown layer") + } + + switch err.(type) { + case ErrUnknownLayer: + err = nil + default: + t.Fatalf("unexpected error fetching non-existent layer: %v", err) + } + + randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader) + if err != nil { + t.Fatalf("unexpected error writing test layer: %v", err) + } + + randomLayerSize, err := seekerSize(randomLayerReader) + if err != nil { + t.Fatalf("error getting seeker size for random layer: %v", err) + } + + layer, err = ls.Fetch(dgst) + if err != nil { + t.Fatal(err) + } + defer layer.Close() + + // Now check the sha digest and ensure its the same + h := sha256.New() + nn, err := io.Copy(h, layer) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error copying to hash: %v", err) + } + + if nn != randomLayerSize { + t.Fatalf("stored incorrect number of bytes in layer: %d != %d", nn, randomLayerSize) + } + + sha256Digest := digest.NewDigest("sha256", h) + if sha256Digest != randomLayerDigest { + t.Fatalf("fetched digest does not match: %q != %q", sha256Digest, randomLayerDigest) + } + + // Now seek back the layer, read the whole thing and check against randomLayerData + offset, err := layer.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error seeking layer: %v", err) + } + + if offset != 0 { + t.Fatalf("seek failed: expected 0 offset, got %d", offset) + } + + p, err := ioutil.ReadAll(layer) + if err != nil { + t.Fatalf("error reading all of layer: %v", err) + } + + if len(p) != int(randomLayerSize) { + t.Fatalf("layer data read has different length: %v != %v", len(p), randomLayerSize) + } + + // Reset the randomLayerReader and read back the buffer + _, err = randomLayerReader.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error resetting layer reader: %v", err) + } + + randomLayerData, err := ioutil.ReadAll(randomLayerReader) + if err != nil { + t.Fatalf("random layer read failed: %v", err) + } + + if !bytes.Equal(p, randomLayerData) { + t.Fatalf("layer data not equal") + } +} + +// TestLayerUploadZeroLength uploads zero-length +func TestLayerUploadZeroLength(t *testing.T) { + ctx := context.Background() + imageName := "foo/bar" + driver := inmemory.New() + registry := NewRegistryWithDriver(driver) + ls := registry.Repository(ctx, imageName).Layers() + + upload, err := ls.Upload() + if err != nil { + t.Fatalf("unexpected error starting upload: %v", err) + } + + io.Copy(upload, bytes.NewReader([]byte{})) + + dgst, err := digest.FromTarArchive(bytes.NewReader([]byte{})) + if err != nil { + t.Fatalf("error getting zero digest: %v", err) + } + + if dgst != digest.DigestTarSumV1EmptyTar { + // sanity check on zero digest + t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar) + } + + layer, err := upload.Finish(dgst) + if err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + + if layer.Digest() != dgst { + t.Fatalf("unexpected digest: %v != %v", layer.Digest(), dgst) + } +} + +// writeRandomLayer creates a random layer under name and tarSum using driver +// and pathMapper. An io.ReadSeeker with the data is returned, along with the +// sha256 hex digest. +func writeRandomLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string) (rs io.ReadSeeker, tarSum digest.Digest, sha256digest digest.Digest, err error) { + reader, tarSumStr, err := testutil.CreateRandomTarFile() + if err != nil { + return nil, "", "", err + } + + tarSum = digest.Digest(tarSumStr) + + // Now, actually create the layer. + randomLayerDigest, err := writeTestLayer(driver, pathMapper, name, tarSum, ioutil.NopCloser(reader)) + + if _, err := reader.Seek(0, os.SEEK_SET); err != nil { + return nil, "", "", err + } + + return reader, tarSum, randomLayerDigest, err +} + +// seekerSize seeks to the end of seeker, checks the size and returns it to +// the original state, returning the size. The state of the seeker should be +// treated as unknown if an error is returned. +func seekerSize(seeker io.ReadSeeker) (int64, error) { + current, err := seeker.Seek(0, os.SEEK_CUR) + if err != nil { + return 0, err + } + + end, err := seeker.Seek(0, os.SEEK_END) + if err != nil { + return 0, err + } + + resumed, err := seeker.Seek(current, os.SEEK_SET) + if err != nil { + return 0, err + } + + if resumed != current { + return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location") + } + + return end, nil +} + +// createTestLayer creates a simple test layer in the provided driver under +// tarsum dgst, returning the sha256 digest location. This is implemented +// peicemeal and should probably be replaced by the uploader when it's ready. +func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string, dgst digest.Digest, content io.Reader) (digest.Digest, error) { + h := sha256.New() + rd := io.TeeReader(content, h) + + p, err := ioutil.ReadAll(rd) + + if err != nil { + return "", nil + } + + blobDigestSHA := digest.NewDigest("sha256", h) + + blobPath, err := pathMapper.path(blobDataPathSpec{ + digest: dgst, + }) + + if err := driver.PutContent(blobPath, p); err != nil { + return "", err + } + + if err != nil { + return "", err + } + + layerLinkPath, err := pathMapper.path(layerLinkPathSpec{ + name: name, + digest: dgst, + }) + + if err != nil { + return "", err + } + + if err := driver.PutContent(layerLinkPath, []byte(dgst)); err != nil { + return "", nil + } + + return blobDigestSHA, err +} diff --git a/docs/storage/layerhandler.go b/docs/storage/layerhandler.go new file mode 100644 index 00000000..2755470e --- /dev/null +++ b/docs/storage/layerhandler.go @@ -0,0 +1,50 @@ +package storage + +import ( + "fmt" + "net/http" + + "github.com/docker/distribution/storagedriver" +) + +// LayerHandler provides middleware for serving the contents of a Layer. +type LayerHandler interface { + // Resolve returns an http.Handler which can serve the contents of a given + // Layer if possible, or nil and an error when unsupported. This may + // directly serve the contents of the layer or issue a redirect to another + // URL hosting the content. + Resolve(layer Layer) (http.Handler, error) +} + +// LayerHandlerInitFunc is the type of a LayerHandler factory function and is +// used to register the contsructor for different LayerHandler backends. +type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) + +var layerHandlers map[string]LayerHandlerInitFunc + +// RegisterLayerHandler is used to register an LayerHandlerInitFunc for +// a LayerHandler backend with the given name. +func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error { + if layerHandlers == nil { + layerHandlers = make(map[string]LayerHandlerInitFunc) + } + if _, exists := layerHandlers[name]; exists { + return fmt.Errorf("name already registered: %s", name) + } + + layerHandlers[name] = initFunc + + return nil +} + +// GetLayerHandler constructs a LayerHandler +// with the given options using the named backend. +func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) { + if layerHandlers != nil { + if initFunc, exists := layerHandlers[name]; exists { + return initFunc(storageDriver, options) + } + } + + return nil, fmt.Errorf("no layer handler registered with name: %s", name) +} diff --git a/docs/storage/layerreader.go b/docs/storage/layerreader.go new file mode 100644 index 00000000..4510dd7d --- /dev/null +++ b/docs/storage/layerreader.go @@ -0,0 +1,30 @@ +package storage + +import ( + "time" + + "github.com/docker/distribution/digest" +) + +// layerReadSeeker implements Layer and provides facilities for reading and +// seeking. +type layerReader struct { + fileReader + + name string // repo name of this layer + digest digest.Digest +} + +var _ Layer = &layerReader{} + +func (lrs *layerReader) Name() string { + return lrs.name +} + +func (lrs *layerReader) Digest() digest.Digest { + return lrs.digest +} + +func (lrs *layerReader) CreatedAt() time.Time { + return lrs.modtime +} diff --git a/docs/storage/layerstore.go b/docs/storage/layerstore.go new file mode 100644 index 00000000..b6578792 --- /dev/null +++ b/docs/storage/layerstore.go @@ -0,0 +1,168 @@ +package storage + +import ( + "time" + + "code.google.com/p/go-uuid/uuid" + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storagedriver" +) + +type layerStore struct { + repository *repository +} + +func (ls *layerStore) Exists(digest digest.Digest) (bool, error) { + ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists") + + // Because this implementation just follows blob links, an existence check + // is pretty cheap by starting and closing a fetch. + _, err := ls.Fetch(digest) + + if err != nil { + switch err.(type) { + case ErrUnknownLayer: + return false, nil + } + + return false, err + } + + return true, nil +} + +func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) { + ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch") + bp, err := ls.path(dgst) + if err != nil { + return nil, err + } + + fr, err := newFileReader(ls.repository.driver, bp) + if err != nil { + return nil, err + } + + return &layerReader{ + fileReader: *fr, + name: ls.repository.Name(), + digest: dgst, + }, nil +} + +// Upload begins a layer upload, returning a handle. If the layer upload +// is already in progress or the layer has already been uploaded, this +// will return an error. +func (ls *layerStore) Upload() (LayerUpload, error) { + ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload") + + // NOTE(stevvooe): Consider the issues with allowing concurrent upload of + // the same two layers. Should it be disallowed? For now, we allow both + // parties to proceed and the the first one uploads the layer. + + uuid := uuid.New() + startedAt := time.Now().UTC() + + path, err := ls.repository.registry.pm.path(uploadDataPathSpec{ + name: ls.repository.Name(), + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{ + name: ls.repository.Name(), + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + // Write a startedat file for this upload + if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + return nil, err + } + + return ls.newLayerUpload(uuid, path, startedAt) +} + +// Resume continues an in progress layer upload, returning the current +// state of the upload. +func (ls *layerStore) Resume(uuid string) (LayerUpload, error) { + ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume") + startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{ + name: ls.repository.Name(), + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrLayerUploadUnknown + default: + return nil, err + } + } + + startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) + if err != nil { + return nil, err + } + + path, err := ls.repository.pm.path(uploadDataPathSpec{ + name: ls.repository.Name(), + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + return ls.newLayerUpload(uuid, path, startedAt) +} + +// newLayerUpload allocates a new upload controller with the given state. +func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (LayerUpload, error) { + fw, err := newFileWriter(ls.repository.driver, path) + if err != nil { + return nil, err + } + + return &layerUploadController{ + layerStore: ls, + uuid: uuid, + startedAt: startedAt, + fileWriter: *fw, + }, nil +} + +func (ls *layerStore) path(dgst digest.Digest) (string, error) { + // We must traverse this path through the link to enforce ownership. + layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst}) + if err != nil { + return "", err + } + + blobPath, err := ls.repository.blobStore.resolve(layerLinkPath) + + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return "", ErrUnknownLayer{manifest.FSLayer{BlobSum: dgst}} + default: + return "", err + } + } + + return blobPath, nil +} diff --git a/docs/storage/layerupload.go b/docs/storage/layerupload.go new file mode 100644 index 00000000..54860913 --- /dev/null +++ b/docs/storage/layerupload.go @@ -0,0 +1,238 @@ +package storage + +import ( + "fmt" + "io" + "path" + "time" + + "github.com/Sirupsen/logrus" + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver" + "github.com/docker/docker/pkg/tarsum" +) + +// layerUploadController is used to control the various aspects of resumable +// layer upload. It implements the LayerUpload interface. +type layerUploadController struct { + layerStore *layerStore + + uuid string + startedAt time.Time + + fileWriter +} + +var _ LayerUpload = &layerUploadController{} + +// Name of the repository under which the layer will be linked. +func (luc *layerUploadController) Name() string { + return luc.layerStore.repository.Name() +} + +// UUID returns the identifier for this upload. +func (luc *layerUploadController) UUID() string { + return luc.uuid +} + +func (luc *layerUploadController) StartedAt() time.Time { + return luc.startedAt +} + +// Finish marks the upload as completed, returning a valid handle to the +// uploaded layer. The final size and checksum are validated against the +// contents of the uploaded layer. The checksum should be provided in the +// format :. +func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) { + ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") + canonical, err := luc.validateLayer(digest) + if err != nil { + return nil, err + } + + if err := luc.moveLayer(canonical); err != nil { + // TODO(stevvooe): Cleanup? + return nil, err + } + + // Link the layer blob into the repository. + if err := luc.linkLayer(canonical); err != nil { + return nil, err + } + + if err := luc.removeResources(); err != nil { + return nil, err + } + + return luc.layerStore.Fetch(canonical) +} + +// Cancel the layer upload process. +func (luc *layerUploadController) Cancel() error { + ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel") + if err := luc.removeResources(); err != nil { + return err + } + + luc.Close() + return nil +} + +// validateLayer checks the layer data against the digest, returning an error +// if it does not match. The canonical digest is returned. +func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { + // First, check the incoming tarsum version of the digest. + version, err := tarsum.GetVersionFromTarsum(dgst.String()) + if err != nil { + return "", err + } + + // TODO(stevvooe): Should we push this down into the digest type? + switch version { + case tarsum.Version1: + default: + // version 0 and dev, for now. + return "", ErrLayerInvalidDigest{ + Digest: dgst, + Reason: ErrLayerTarSumVersionUnsupported, + } + } + + digestVerifier := digest.NewDigestVerifier(dgst) + + // TODO(stevvooe): Store resumable hash calculations in upload directory + // in driver. Something like a file at path /resumablehash/ + // with the hash state up to that point would be perfect. The hasher would + // then only have to fetch the difference. + + // Read the file from the backend driver and validate it. + fr, err := newFileReader(luc.fileWriter.driver, luc.path) + if err != nil { + return "", err + } + + tr := io.TeeReader(fr, digestVerifier) + + // TODO(stevvooe): This is one of the places we need a Digester write + // sink. Instead, its read driven. This might be okay. + + // Calculate an updated digest with the latest version. + canonical, err := digest.FromTarArchive(tr) + if err != nil { + return "", err + } + + if !digestVerifier.Verified() { + return "", ErrLayerInvalidDigest{ + Digest: dgst, + Reason: fmt.Errorf("content does not match digest"), + } + } + + return canonical, nil +} + +// moveLayer moves the data into its final, hash-qualified destination, +// identified by dgst. The layer should be validated before commencing the +// move. +func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { + blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{ + digest: dgst, + }) + + if err != nil { + return err + } + + // Check for existence + if _, err := luc.driver.Stat(blobPath); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // ensure that it doesn't exist. + default: + return err + } + } else { + // If the path exists, we can assume that the content has already + // been uploaded, since the blob storage is content-addressable. + // While it may be corrupted, detection of such corruption belongs + // elsewhere. + return nil + } + + // If no data was received, we may not actually have a file on disk. Check + // the size here and write a zero-length file to blobPath if this is the + // case. For the most part, this should only ever happen with zero-length + // tars. + if _, err := luc.driver.Stat(luc.path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // HACK(stevvooe): This is slightly dangerous: if we verify above, + // get a hash, then the underlying file is deleted, we risk moving + // a zero-length blob into a nonzero-length blob location. To + // prevent this horrid thing, we employ the hack of only allowing + // to this happen for the zero tarsum. + if dgst == digest.DigestTarSumV1EmptyTar { + return luc.driver.PutContent(blobPath, []byte{}) + } + + // We let this fail during the move below. + logrus. + WithField("upload.uuid", luc.UUID()). + WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest") + default: + return err // unrelated error + } + } + + return luc.driver.Move(luc.path, blobPath) +} + +// linkLayer links a valid, written layer blob into the registry under the +// named repository for the upload controller. +func (luc *layerUploadController) linkLayer(digest digest.Digest) error { + layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{ + name: luc.Name(), + digest: digest, + }) + + if err != nil { + return err + } + + return luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(digest)) +} + +// removeResources should clean up all resources associated with the upload +// instance. An error will be returned if the clean up cannot proceed. If the +// resources are already not present, no error will be returned. +func (luc *layerUploadController) removeResources() error { + dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{ + name: luc.Name(), + uuid: luc.uuid, + }) + + if err != nil { + return err + } + + // Resolve and delete the containing directory, which should include any + // upload related files. + dirPath := path.Dir(dataPath) + + if err := luc.driver.Delete(dirPath); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // already gone! + default: + // This should be uncommon enough such that returning an error + // should be okay. At this point, the upload should be mostly + // complete, but perhaps the backend became unaccessible. + logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err) + return err + } + } + + return nil +} diff --git a/docs/storage/manifeststore.go b/docs/storage/manifeststore.go new file mode 100644 index 00000000..1f798dde --- /dev/null +++ b/docs/storage/manifeststore.go @@ -0,0 +1,190 @@ +package storage + +import ( + "fmt" + "strings" + + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/libtrust" +) + +// ErrUnknownRepository is returned if the named repository is not known by +// the registry. +type ErrUnknownRepository struct { + Name string +} + +func (err ErrUnknownRepository) Error() string { + return fmt.Sprintf("unknown respository name=%s", err.Name) +} + +// ErrUnknownManifest is returned if the manifest is not known by the +// registry. +type ErrUnknownManifest struct { + Name string + Tag string +} + +func (err ErrUnknownManifest) Error() string { + return fmt.Sprintf("unknown manifest name=%s tag=%s", err.Name, err.Tag) +} + +// ErrUnknownManifestRevision is returned when a manifest cannot be found by +// revision within a repository. +type ErrUnknownManifestRevision struct { + Name string + Revision digest.Digest +} + +func (err ErrUnknownManifestRevision) Error() string { + return fmt.Sprintf("unknown manifest name=%s revision=%s", err.Name, err.Revision) +} + +// ErrManifestUnverified is returned when the registry is unable to verify +// the manifest. +type ErrManifestUnverified struct{} + +func (ErrManifestUnverified) Error() string { + return fmt.Sprintf("unverified manifest") +} + +// ErrManifestVerification provides a type to collect errors encountered +// during manifest verification. Currently, it accepts errors of all types, +// but it may be narrowed to those involving manifest verification. +type ErrManifestVerification []error + +func (errs ErrManifestVerification) Error() string { + var parts []string + for _, err := range errs { + parts = append(parts, err.Error()) + } + + return fmt.Sprintf("errors verifying manifest: %v", strings.Join(parts, ",")) +} + +type manifestStore struct { + repository *repository + + revisionStore *revisionStore + tagStore *tagStore +} + +var _ ManifestService = &manifestStore{} + +// func (ms *manifestStore) Repository() Repository { +// return ms.repository +// } + +func (ms *manifestStore) Tags() ([]string, error) { + ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Tags") + return ms.tagStore.tags() +} + +func (ms *manifestStore) Exists(tag string) (bool, error) { + ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Exists") + return ms.tagStore.exists(tag) +} + +func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) { + ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Get") + dgst, err := ms.tagStore.resolve(tag) + if err != nil { + return nil, err + } + + return ms.revisionStore.get(dgst) +} + +func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error { + ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Put") + + // TODO(stevvooe): Add check here to see if the revision is already + // present in the repository. If it is, we should merge the signatures, do + // a shallow verify (or a full one, doesn't matter) and return an error + // indicating what happened. + + // Verify the manifest. + if err := ms.verifyManifest(tag, manifest); err != nil { + return err + } + + // Store the revision of the manifest + revision, err := ms.revisionStore.put(manifest) + if err != nil { + return err + } + + // Now, tag the manifest + return ms.tagStore.tag(tag, revision) +} + +// Delete removes all revisions of the given tag. We may want to change these +// semantics in the future, but this will maintain consistency. The underlying +// blobs are left alone. +func (ms *manifestStore) Delete(tag string) error { + ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Delete") + + revisions, err := ms.tagStore.revisions(tag) + if err != nil { + return err + } + + for _, revision := range revisions { + if err := ms.revisionStore.delete(revision); err != nil { + return err + } + } + + return ms.tagStore.delete(tag) +} + +// verifyManifest ensures that the manifest content is valid from the +// perspective of the registry. It ensures that the name and tag match and +// that the signature is valid for the enclosed payload. As a policy, the +// registry only tries to store valid content, leaving trust policies of that +// content up to consumers. +func (ms *manifestStore) verifyManifest(tag string, mnfst *manifest.SignedManifest) error { + var errs ErrManifestVerification + if mnfst.Name != ms.repository.Name() { + // TODO(stevvooe): This needs to be an exported error + errs = append(errs, fmt.Errorf("repository name does not match manifest name")) + } + + if mnfst.Tag != tag { + // TODO(stevvooe): This needs to be an exported error. + errs = append(errs, fmt.Errorf("tag does not match manifest tag")) + } + + if _, err := manifest.Verify(mnfst); err != nil { + switch err { + case libtrust.ErrMissingSignatureKey, libtrust.ErrInvalidJSONContent, libtrust.ErrMissingSignatureKey: + errs = append(errs, ErrManifestUnverified{}) + default: + if err.Error() == "invalid signature" { // TODO(stevvooe): This should be exported by libtrust + errs = append(errs, ErrManifestUnverified{}) + } else { + errs = append(errs, err) + } + } + } + + for _, fsLayer := range mnfst.FSLayers { + exists, err := ms.repository.Layers().Exists(fsLayer.BlobSum) + if err != nil { + errs = append(errs, err) + } + + if !exists { + errs = append(errs, ErrUnknownLayer{FSLayer: fsLayer}) + } + } + + if len(errs) != 0 { + // TODO(stevvooe): These need to be recoverable by a caller. + return errs + } + + return nil +} diff --git a/docs/storage/manifeststore_test.go b/docs/storage/manifeststore_test.go new file mode 100644 index 00000000..8284ce94 --- /dev/null +++ b/docs/storage/manifeststore_test.go @@ -0,0 +1,233 @@ +package storage + +import ( + "bytes" + "io" + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" + "golang.org/x/net/context" +) + +func TestManifestStorage(t *testing.T) { + ctx := context.Background() + name := "foo/bar" + tag := "thetag" + driver := inmemory.New() + registry := NewRegistryWithDriver(driver) + repo := registry.Repository(ctx, name) + ms := repo.Manifests() + + exists, err := ms.Exists(tag) + if err != nil { + t.Fatalf("unexpected error checking manifest existence: %v", err) + } + + if exists { + t.Fatalf("manifest should not exist") + } + + if _, err := ms.Get(tag); true { + switch err.(type) { + case ErrUnknownManifest: + break + default: + t.Fatalf("expected manifest unknown error: %#v", err) + } + } + + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: name, + Tag: tag, + } + + // Build up some test layers and add them to the manifest, saving the + // readseekers for upload later. + testLayers := map[digest.Digest]io.ReadSeeker{} + for i := 0; i < 2; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("unexpected error generating test layer file") + } + dgst := digest.Digest(ds) + + testLayers[digest.Digest(dgst)] = rs + m.FSLayers = append(m.FSLayers, manifest.FSLayer{ + BlobSum: dgst, + }) + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating private key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("error signing manifest: %v", err) + } + + err = ms.Put(tag, sm) + if err == nil { + t.Fatalf("expected errors putting manifest") + } + + // TODO(stevvooe): We expect errors describing all of the missing layers. + + // Now, upload the layers that were missing! + for dgst, rs := range testLayers { + upload, err := repo.Layers().Upload() + if err != nil { + t.Fatalf("unexpected error creating test upload: %v", err) + } + + if _, err := io.Copy(upload, rs); err != nil { + t.Fatalf("unexpected error copying to upload: %v", err) + } + + if _, err := upload.Finish(dgst); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + } + + if err = ms.Put(tag, sm); err != nil { + t.Fatalf("unexpected error putting manifest: %v", err) + } + + exists, err = ms.Exists(tag) + if err != nil { + t.Fatalf("unexpected error checking manifest existence: %v", err) + } + + if !exists { + t.Fatalf("manifest should exist") + } + + fetchedManifest, err := ms.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if !reflect.DeepEqual(fetchedManifest, sm) { + t.Fatalf("fetched manifest not equal: %#v != %#v", fetchedManifest, sm) + } + + fetchedJWS, err := libtrust.ParsePrettySignature(fetchedManifest.Raw, "signatures") + if err != nil { + t.Fatalf("unexpected error parsing jws: %v", err) + } + + payload, err := fetchedJWS.Payload() + if err != nil { + t.Fatalf("unexpected error extracting payload: %v", err) + } + + sigs, err := fetchedJWS.Signatures() + if err != nil { + t.Fatalf("unable to extract signatures: %v", err) + } + + if len(sigs) != 1 { + t.Fatalf("unexpected number of signatures: %d != %d", len(sigs), 1) + } + + // Grabs the tags and check that this tagged manifest is present + tags, err := ms.Tags() + if err != nil { + t.Fatalf("unexpected error fetching tags: %v", err) + } + + if len(tags) != 1 { + t.Fatalf("unexpected tags returned: %v", tags) + } + + if tags[0] != tag { + t.Fatalf("unexpected tag found in tags: %v != %v", tags, []string{tag}) + } + + // Now, push the same manifest with a different key + pk2, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating private key: %v", err) + } + + sm2, err := manifest.Sign(&m, pk2) + if err != nil { + t.Fatalf("unexpected error signing manifest: %v", err) + } + + jws2, err := libtrust.ParsePrettySignature(sm2.Raw, "signatures") + if err != nil { + t.Fatalf("error parsing signature: %v", err) + } + + sigs2, err := jws2.Signatures() + if err != nil { + t.Fatalf("unable to extract signatures: %v", err) + } + + if len(sigs2) != 1 { + t.Fatalf("unexpected number of signatures: %d != %d", len(sigs2), 1) + } + + if err = ms.Put(tag, sm2); err != nil { + t.Fatalf("unexpected error putting manifest: %v", err) + } + + fetched, err := ms.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if _, err := manifest.Verify(fetched); err != nil { + t.Fatalf("unexpected error verifying manifest: %v", err) + } + + // Assemble our payload and two signatures to get what we expect! + expectedJWS, err := libtrust.NewJSONSignature(payload, sigs[0], sigs2[0]) + if err != nil { + t.Fatalf("unexpected error merging jws: %v", err) + } + + expectedSigs, err := expectedJWS.Signatures() + if err != nil { + t.Fatalf("unexpected error getting expected signatures: %v", err) + } + + receivedJWS, err := libtrust.ParsePrettySignature(fetched.Raw, "signatures") + if err != nil { + t.Fatalf("unexpected error parsing jws: %v", err) + } + + receivedPayload, err := receivedJWS.Payload() + if err != nil { + t.Fatalf("unexpected error extracting received payload: %v", err) + } + + if !bytes.Equal(receivedPayload, payload) { + t.Fatalf("payloads are not equal") + } + + receivedSigs, err := receivedJWS.Signatures() + if err != nil { + t.Fatalf("error getting signatures: %v", err) + } + + for i, sig := range receivedSigs { + if !bytes.Equal(sig, expectedSigs[i]) { + t.Fatalf("mismatched signatures from remote: %v != %v", string(sig), string(expectedSigs[i])) + } + } + + if err := ms.Delete(tag); err != nil { + t.Fatalf("unexpected error deleting manifest: %v", err) + } +} diff --git a/docs/storage/notifications/bridge.go b/docs/storage/notifications/bridge.go new file mode 100644 index 00000000..217ee5bd --- /dev/null +++ b/docs/storage/notifications/bridge.go @@ -0,0 +1,156 @@ +package notifications + +import ( + "net/http" + "time" + + "github.com/docker/distribution/manifest" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage" +) + +type bridge struct { + ub URLBuilder + actor ActorRecord + source SourceRecord + request RequestRecord + sink Sink +} + +var _ Listener = &bridge{} + +// URLBuilder defines a subset of url builder to be used by the event listener. +type URLBuilder interface { + BuildManifestURL(name, tag string) (string, error) + BuildBlobURL(name string, dgst digest.Digest) (string, error) +} + +// NewBridge returns a notification listener that writes records to sink, +// using the actor and source. Any urls populated in the events created by +// this bridge will be created using the URLBuilder. +// TODO(stevvooe): Update this to simply take a context.Context object. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener { + return &bridge{ + ub: ub, + actor: actor, + source: source, + request: request, + sink: sink, + } +} + +// NewRequestRecord builds a RequestRecord for use in NewBridge from an +// http.Request, associating it with a request id. +func NewRequestRecord(id string, r *http.Request) RequestRecord { + return RequestRecord{ + ID: id, + Addr: r.RemoteAddr, + Host: r.Host, + Method: r.Method, + UserAgent: r.UserAgent(), + } +} + +func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPush, repo, sm) +} + +func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPull, repo, sm) +} + +func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +} + +func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) +} + +func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) +} + +func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) +} + +func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { + event, err := b.createManifestEvent(action, repo, sm) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = EventTargetTypeManifest + event.Target.Name = repo.Name() + event.Target.Tag = sm.Tag + + p, err := sm.Payload() + if err != nil { + return nil, err + } + + event.Target.Digest, err = digest.FromBytes(p) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is + // implemented, this should be replaced. + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + if err != nil { + return nil, err + } + + return event, nil +} + +func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { + event, err := b.createLayerEvent(action, repo, dgst) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = EventTargetTypeBlob + event.Target.Name = repo.Name() + event.Target.Digest = dgst + + var err error + event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst) + if err != nil { + return nil, err + } + + return event, nil +} + +// createEvent creates an event with actor and source populated. +func (b *bridge) createEvent(action string) *Event { + event := createEvent(action) + event.Source = b.source + event.Actor = b.actor + event.Request = b.request + + return event +} + +// createEvent returns a new event, timestamped, with the specified action. +func createEvent(action string) *Event { + return &Event{ + ID: uuid.New(), + Timestamp: time.Now(), + Action: action, + } +} diff --git a/docs/storage/notifications/endpoint.go b/docs/storage/notifications/endpoint.go new file mode 100644 index 00000000..dfdb111c --- /dev/null +++ b/docs/storage/notifications/endpoint.go @@ -0,0 +1,86 @@ +package notifications + +import ( + "net/http" + "time" +) + +// EndpointConfig covers the optional configuration parameters for an active +// endpoint. +type EndpointConfig struct { + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration +} + +// defaults set any zero-valued fields to a reasonable default. +func (ec *EndpointConfig) defaults() { + if ec.Timeout <= 0 { + ec.Timeout = time.Second + } + + if ec.Threshold <= 0 { + ec.Threshold = 10 + } + + if ec.Backoff <= 0 { + ec.Backoff = time.Second + } +} + +// Endpoint is a reliable, queued, thread-safe sink that notify external http +// services when events are written. Writes are non-blocking and always +// succeed for callers but events may be queued internally. +type Endpoint struct { + Sink + url string + name string + + EndpointConfig + + metrics *safeMetrics +} + +// NewEndpoint returns a running endpoint, ready to receive events. +func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { + var endpoint Endpoint + endpoint.name = name + endpoint.url = url + endpoint.EndpointConfig = config + endpoint.defaults() + endpoint.metrics = newSafeMetrics() + + // Configures the inmemory queue, retry, http pipeline. + endpoint.Sink = newHTTPSink( + endpoint.url, endpoint.Timeout, endpoint.Headers, + endpoint.metrics.httpStatusListener()) + endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + + register(&endpoint) + return &endpoint +} + +// Name returns the name of the endpoint, generally used for debugging. +func (e *Endpoint) Name() string { + return e.name +} + +// URL returns the url of the endpoint. +func (e *Endpoint) URL() string { + return e.url +} + +// ReadMetrics populates em with metrics from the endpoint. +func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { + e.metrics.Lock() + defer e.metrics.Unlock() + + *em = e.metrics.EndpointMetrics + // Map still need to copied in a threadsafe manner. + em.Statuses = make(map[string]int) + for k, v := range e.metrics.Statuses { + em.Statuses[k] = v + } +} diff --git a/docs/storage/notifications/event.go b/docs/storage/notifications/event.go new file mode 100644 index 00000000..c23766fa --- /dev/null +++ b/docs/storage/notifications/event.go @@ -0,0 +1,154 @@ +package notifications + +import ( + "fmt" + "time" + + "github.com/docker/distribution/digest" +) + +// EventAction constants used in action field of Event. +const ( + EventActionPull = "pull" + EventActionPush = "push" + EventActionDelete = "delete" +) + +// EventTargetType constants used in Target section of Event. +const ( + EventTargetTypeManifest = "manifest" + EventTargetTypeBlob = "blob" +) + +// EventsMediaType is the mediatype for the json event envelope. If the Event, +// ActorRecord, SourceRecord or Envelope structs change, the version number +// should be incremented. +const EventsMediaType = "application/vnd.docker.distribution.events.v1+json" + +// Envelope defines the fields of a json event envelope message that can hold +// one or more events. +type Envelope struct { + // Events make up the contents of the envelope. Events present in a single + // envelope are not necessarily related. + Events []Event `json:"events,omitempty"` +} + +// TODO(stevvooe): The event type should be separate from the json format. It +// should be defined as an interface. Leaving as is for now since we don't +// need that at this time. If we make this change, the struct below would be +// called "EventRecord". + +// Event provides the fields required to describe a registry event. +type Event struct { + // ID provides a unique identifier for the event. + ID string `json:"id,omitempty"` + + // Timestamp is the time at which the event occurred. + Timestamp time.Time `json:"timestamp,omitempty"` + + // Action indicates what action encompasses the provided event. + Action string `json:"action,omitempty"` + + // Target uniquely describes the target of the event. + Target struct { + // Type should be "manifest" or "blob" + Type string `json:"type,omitempty"` + + // Name identifies the named repository. + Name string `json:"name,omitempty"` + + // Digest should identify the object in the repository. + Digest digest.Digest `json:"digest,omitempty"` + + // Tag is present if the operation involved a tagged manifest. + Tag string `json:"tag,omitempty"` + + // URL provides a link to the content on the relevant repository instance. + URL string `json:"url,omitempty"` + } `json:"target,omitempty"` + + // Request covers the request that generated the event. + Request RequestRecord `json:"request,omitempty"` + + // Actor specifies the agent that initiated the event. For most + // situations, this could be from the authorizaton context of the request. + Actor ActorRecord `json:"actor,omitempty"` + + // Source identifies the registry node that generated the event. Put + // differently, while the actor "initiates" the event, the source + // "generates" it. + Source SourceRecord `json:"source,omitempty"` +} + +// ActorRecord specifies the agent that initiated the event. For most +// situations, this could be from the authorizaton context of the request. +// Data in this record can refer to both the initiating client and the +// generating request. +type ActorRecord struct { + // Name corresponds to the subject or username associated with the + // request context that generated the event. + Name string `json:"name,omitempty"` + + // TODO(stevvooe): Look into setting a session cookie to get this + // without docker daemon. + // SessionID + + // TODO(stevvooe): Push the "Docker-Command" header to replace cookie and + // get the actual command. + // Command +} + +// RequestRecord covers the request that generated the event. +type RequestRecord struct { + // ID uniquely identifies the request that initiated the event. + ID string `json:"id"` + + // Addr contains the ip or hostname and possibly port of the client + // connection that initiated the event. This is the RemoteAddr from + // the standard http request. + Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // Method has the request method that generated the event. + Method string `json:"method"` + + // UserAgent contains the user agent header of the request. + UserAgent string `json:"useragent"` +} + +// SourceRecord identifies the registry node that generated the event. Put +// differently, while the actor "initiates" the event, the source "generates" +// it. +type SourceRecord struct { + // Addr contains the ip or hostname and the port of the registry node + // that generated the event. Generally, this will be resolved by + // os.Hostname() along with the running port. + Addr string `json:"addr,omitempty"` + + // InstanceID identifies a running instance of an application. Changes + // after each restart. + InstanceID string `json:"instanceID,omitempty"` +} + +var ( + // ErrSinkClosed is returned if a write is issued to a sink that has been + // closed. If encountered, the error should be considered terminal and + // retries will not be successful. + ErrSinkClosed = fmt.Errorf("sink: closed") +) + +// Sink accepts and sends events. +type Sink interface { + // Write writes one or more events to the sink. If no error is returned, + // the caller will assume that all events have been committed and will not + // try to send them again. If an error is received, the caller may retry + // sending the event. The caller should cede the slice of memory to the + // sink and not modify it after calling this method. + Write(events ...Event) error + + // Close the sink, possibly waiting for pending events to flush. + Close() error +} diff --git a/docs/storage/notifications/event_test.go b/docs/storage/notifications/event_test.go new file mode 100644 index 00000000..cc2180ac --- /dev/null +++ b/docs/storage/notifications/event_test.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +// TestEventJSONFormat provides silly test to detect if the event format or +// envelope has changed. If this code fails, the revision of the protocol may +// need to be incremented. +func TestEventEnvelopeJSONFormat(t *testing.T) { + var expected = strings.TrimSpace(` +{ + "events": [ + { + "id": "asdf-asdf-asdf-asdf-0", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "manifest", + "name": "library/test", + "digest": "sha256:0123456789abcdef0", + "tag": "latest", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + }, + { + "id": "asdf-asdf-asdf-asdf-1", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef1", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + }, + { + "id": "asdf-asdf-asdf-asdf-2", + "timestamp": "2006-01-02T15:04:05Z", + "action": "push", + "target": { + "type": "blob", + "name": "library/test", + "digest": "tarsum.v2+sha256:0123456789abcdef2", + "url": "http://example.com/v2/library/test/manifests/latest" + }, + "request": { + "id": "asdfasdf", + "addr": "client.local", + "host": "registrycluster.local", + "method": "PUT", + "useragent": "test/0.1" + }, + "actor": { + "name": "test-actor" + }, + "source": { + "addr": "hostname.local:port" + } + } + ] +} + `) + + tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5]) + if err != nil { + t.Fatalf("error creating time: %v", err) + } + + var prototype Event + prototype.Action = "push" + prototype.Timestamp = tm + prototype.Actor.Name = "test-actor" + prototype.Request.ID = "asdfasdf" + prototype.Request.Addr = "client.local" + prototype.Request.Host = "registrycluster.local" + prototype.Request.Method = "PUT" + prototype.Request.UserAgent = "test/0.1" + prototype.Source.Addr = "hostname.local:port" + + var manifestPush Event + manifestPush = prototype + manifestPush.ID = "asdf-asdf-asdf-asdf-0" + manifestPush.Target.Digest = "sha256:0123456789abcdef0" + manifestPush.Target.Type = EventTargetTypeManifest + manifestPush.Target.Name = "library/test" + manifestPush.Target.Tag = "latest" + manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush0 Event + layerPush0 = prototype + layerPush0.ID = "asdf-asdf-asdf-asdf-1" + layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1" + layerPush0.Target.Type = EventTargetTypeBlob + layerPush0.Target.Name = "library/test" + layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var layerPush1 Event + layerPush1 = prototype + layerPush1.ID = "asdf-asdf-asdf-asdf-2" + layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2" + layerPush1.Target.Type = EventTargetTypeBlob + layerPush1.Target.Name = "library/test" + layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest" + + var envelope Envelope + envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1) + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + t.Fatalf("unexpected error marshaling envelope: %v", err) + } + if string(p) != expected { + t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected) + } +} diff --git a/docs/storage/notifications/http.go b/docs/storage/notifications/http.go new file mode 100644 index 00000000..15b3574c --- /dev/null +++ b/docs/storage/notifications/http.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// httpSink implements a single-flight, http notification endpoint. This is +// very lightweight in that it only makes an attempt at an http request. +// Reliability should be provided by the caller. +type httpSink struct { + url string + + mu sync.Mutex + closed bool + client *http.Client + listeners []httpStatusListener + + // TODO(stevvooe): Allow one to configure the media type accepted by this + // sink and choose the serialization based on that. +} + +// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other +// sinks for increased reliability. +func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { + return &httpSink{ + url: u, + listeners: listeners, + client: &http.Client{ + Transport: &headerRoundTripper{ + Transport: http.DefaultTransport.(*http.Transport), + headers: headers, + }, + Timeout: timeout, + }, + } +} + +// httpStatusListener is called on various outcomes of sending notifications. +type httpStatusListener interface { + success(status int, events ...Event) + failure(status int, events ...Event) + err(err error, events ...Event) +} + +// Accept makes an attempt to notify the endpoint, returning an error if it +// fails. It is the caller's responsibility to retry on error. The events are +// accepted or rejected as a group. +func (hs *httpSink) Write(events ...Event) error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return ErrSinkClosed + } + + envelope := Envelope{ + Events: events, + } + + // TODO(stevvooe): It is not ideal to keep re-encoding the request body on + // retry but we are going to do it to keep the code simple. It is likely + // we could change the event struct to manage its own buffer. + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) + } + + body := bytes.NewReader(p) + resp, err := hs.client.Post(hs.url, EventsMediaType, body) + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + + return fmt.Errorf("%v: error posting: %v", hs, err) + } + + // The notifier will treat any 2xx or 3xx response as accepted by the + // endpoint. + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 400: + for _, listener := range hs.listeners { + listener.success(resp.StatusCode, events...) + } + + // TODO(stevvooe): This is a little accepting: we may want to support + // unsupported media type responses with retries using the correct + // media type. There may also be cases that will never work. + + return nil + default: + for _, listener := range hs.listeners { + listener.failure(resp.StatusCode, events...) + } + return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) + } +} + +// Close the endpoint +func (hs *httpSink) Close() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return fmt.Errorf("httpsink: already closed") + } + + hs.closed = true + return nil +} + +func (hs *httpSink) String() string { + return fmt.Sprintf("httpSink{%s}", hs.url) +} + +type headerRoundTripper struct { + *http.Transport // must be transport to support CancelRequest + headers http.Header +} + +func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + var nreq http.Request + nreq = *req + nreq.Header = make(http.Header) + + merge := func(headers http.Header) { + for k, v := range headers { + nreq.Header[k] = append(nreq.Header[k], v...) + } + } + + merge(req.Header) + merge(hrt.headers) + + return hrt.Transport.RoundTrip(&nreq) +} diff --git a/docs/storage/notifications/http_test.go b/docs/storage/notifications/http_test.go new file mode 100644 index 00000000..c2cfbc02 --- /dev/null +++ b/docs/storage/notifications/http_test.go @@ -0,0 +1,155 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "mime" + "net/http" + "net/http/httptest" + "reflect" + "strconv" + "testing" +) + +// TestHTTPSink mocks out an http endpoint and notifies it under a couple of +// conditions, ensuring correct behavior. +func TestHTTPSink(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + t.Fatalf("unexpected request method: %v", r.Method) + return + } + + // Extract the content type and make sure it matches + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) + return + } + + if mediaType != EventsMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) + return + } + + var envelope Envelope + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&envelope); err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error decoding request body: %v", err) + return + } + + // Let caller choose the status + status, err := strconv.Atoi(r.FormValue("status")) + if err != nil { + t.Logf("error parsing status: %v", err) + + // May just be empty, set status to 200 + status = http.StatusOK + } + + w.WriteHeader(status) + })) + + metrics := newSafeMetrics() + sink := newHTTPSink(server.URL, 0, nil, + &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) + + var expectedMetrics EndpointMetrics + expectedMetrics.Statuses = make(map[string]int) + + for _, tc := range []struct { + events []Event // events to send + url string + failure bool // true if there should be a failure. + statusCode int // if not set, no status code should be incremented. + }{ + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest")}, + }, + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest"), + createTestEvent("push", "library/test", "layer"), + createTestEvent("push", "library/test", "layer"), + }, + }, + { + statusCode: http.StatusTemporaryRedirect, + }, + { + statusCode: http.StatusBadRequest, + failure: true, + }, + { + // Case where connection never goes through. + url: "http://shoudlntresolve/", + failure: true, + }, + } { + + if tc.failure { + expectedMetrics.Failures += len(tc.events) + } else { + expectedMetrics.Successes += len(tc.events) + } + + if tc.statusCode > 0 { + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + } + + url := tc.url + if url == "" { + url = server.URL + "/" + } + // setup endpoint to respond with expected status code. + url += fmt.Sprintf("?status=%v", tc.statusCode) + sink.url = url + + t.Logf("testcase: %v, fail=%v", url, tc.failure) + // Try a simple event emission. + err := sink.Write(tc.events...) + + if !tc.failure { + if err != nil { + t.Fatalf("unexpected error send event: %v", err) + } + } else { + if err == nil { + t.Fatalf("the endpoint should have rejected the request") + } + } + + if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { + t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) + } + } + + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing http sink: %v", err) + } + + // double close returns error + if err := sink.Close(); err == nil { + t.Fatalf("second close should have returned error: %v", err) + } + +} + +func createTestEvent(action, repo, typ string) Event { + event := createEvent(action) + + event.Target.Type = typ + event.Target.Name = repo + + return *event +} diff --git a/docs/storage/notifications/listener.go b/docs/storage/notifications/listener.go new file mode 100644 index 00000000..99a06f02 --- /dev/null +++ b/docs/storage/notifications/listener.go @@ -0,0 +1,140 @@ +package notifications + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/storage" +) + +// ManifestListener describes a set of methods for listening to events related to manifests. +type ManifestListener interface { + ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error + ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error +} + +// LayerListener describes a listener that can respond to layer related events. +type LayerListener interface { + LayerPushed(repo storage.Repository, layer storage.Layer) error + LayerPulled(repo storage.Repository, layer storage.Layer) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + LayerDeleted(repo storage.Repository, layer storage.Layer) error +} + +// Listener combines all repository events into a single interface. +type Listener interface { + ManifestListener + LayerListener +} + +type repositoryListener struct { + storage.Repository + listener Listener +} + +// Listen dispatches events on the repository to the listener. +func Listen(repo storage.Repository, listener Listener) storage.Repository { + return &repositoryListener{ + Repository: repo, + listener: listener, + } +} + +func (rl *repositoryListener) Manifests() storage.ManifestService { + return &manifestServiceListener{ + ManifestService: rl.Repository.Manifests(), + parent: rl, + } +} + +func (rl *repositoryListener) Layers() storage.LayerService { + return &layerServiceListener{ + LayerService: rl.Repository.Layers(), + parent: rl, + } +} + +type manifestServiceListener struct { + storage.ManifestService + parent *repositoryListener +} + +func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) { + sm, err := msl.ManifestService.Get(tag) + if err == nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest pull to listener: %v", err) + } + } + + return sm, err +} + +func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error { + err := msl.ManifestService.Put(tag, sm) + + if err == nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest push to listener: %v", err) + } + } + + return err +} + +type layerServiceListener struct { + storage.LayerService + parent *repositoryListener +} + +func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { + layer, err := lsl.LayerService.Fetch(dgst) + if err == nil { + if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer pull to listener: %v", err) + } + } + + return layer, err +} + +func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Upload() + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Resume(uuid) + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { + return &layerUploadListener{ + LayerUpload: lu, + parent: lsl, + } +} + +type layerUploadListener struct { + storage.LayerUpload + parent *layerServiceListener +} + +func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { + layer, err := lul.LayerUpload.Finish(dgst) + if err == nil { + if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer push to listener: %v", err) + } + } + + return layer, err +} diff --git a/docs/storage/notifications/listener_test.go b/docs/storage/notifications/listener_test.go new file mode 100644 index 00000000..b62e7e7e --- /dev/null +++ b/docs/storage/notifications/listener_test.go @@ -0,0 +1,153 @@ +package notifications + +import ( + "io" + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" + "golang.org/x/net/context" +) + +func TestListener(t *testing.T) { + registry := storage.NewRegistryWithDriver(inmemory.New()) + tl := &testListener{ + ops: make(map[string]int), + } + ctx := context.Background() + repository := Listen(registry.Repository(ctx, "foo/bar"), tl) + + // Now take the registry through a number of operations + checkExerciseRepository(t, repository) + + expectedOps := map[string]int{ + "manifest:push": 1, + "manifest:pull": 1, + // "manifest:delete": 0, // deletes not supported for now + "layer:push": 2, + "layer:pull": 2, + // "layer:delete": 0, // deletes not supported for now + } + + if !reflect.DeepEqual(tl.ops, expectedOps) { + t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps) + } + +} + +type testListener struct { + ops map[string]int +} + +func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:push"]++ + + return nil +} + +func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:pull"]++ + return nil +} + +func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:delete"]++ + return nil +} + +func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:push"]++ + return nil +} + +func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:pull"]++ + return nil +} + +func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:delete"]++ + return nil +} + +// checkExerciseRegistry takes the registry through all of its operations, +// carrying out generic checks. +func checkExerciseRepository(t *testing.T, repository storage.Repository) { + // TODO(stevvooe): This would be a nice testutil function. Basically, it + // takes the registry through a common set of operations. This could be + // used to make cross-cutting updates by changing internals that affect + // update counts. Basically, it would make writing tests a lot easier. + + tag := "thetag" + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: repository.Name(), + Tag: tag, + } + + layers := repository.Layers() + for i := 0; i < 2; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating test layer: %v", err) + } + dgst := digest.Digest(ds) + upload, err := layers.Upload() + if err != nil { + t.Fatalf("error creating layer upload: %v", err) + } + + // Use the resumes, as well! + upload, err = layers.Resume(upload.UUID()) + if err != nil { + t.Fatalf("error resuming layer upload: %v", err) + } + + io.Copy(upload, rs) + + if _, err := upload.Finish(dgst); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + + m.FSLayers = append(m.FSLayers, manifest.FSLayer{ + BlobSum: dgst, + }) + + // Then fetch the layers + if _, err := layers.Fetch(dgst); err != nil { + t.Fatalf("error fetching layer: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("unexpected error signing manifest: %v", err) + } + + manifests := repository.Manifests() + + if err := manifests.Put(tag, sm); err != nil { + t.Fatalf("unexpected error putting the manifest: %v", err) + } + + fetched, err := manifests.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if fetched.Tag != fetched.Tag { + t.Fatalf("retrieved unexpected manifest: %v", err) + } +} diff --git a/docs/storage/notifications/metrics.go b/docs/storage/notifications/metrics.go new file mode 100644 index 00000000..2a8ffcbd --- /dev/null +++ b/docs/storage/notifications/metrics.go @@ -0,0 +1,152 @@ +package notifications + +import ( + "expvar" + "fmt" + "net/http" + "sync" +) + +// EndpointMetrics track various actions taken by the endpoint, typically by +// number of events. The goal of this to export it via expvar but we may find +// some other future solution to be better. +type EndpointMetrics struct { + Pending int // events pending in queue + Events int // total events incoming + Successes int // total events written successfully + Failures int // total events failed + Errors int // total events errored + Statuses map[string]int // status code histogram, per call event +} + +// safeMetrics guards the metrics implementation with a lock and provides a +// safe update function. +type safeMetrics struct { + EndpointMetrics + sync.Mutex // protects statuses map +} + +// newSafeMetrics returns safeMetrics with map allocated. +func newSafeMetrics() *safeMetrics { + var sm safeMetrics + sm.Statuses = make(map[string]int) + return &sm +} + +// httpStatusListener returns the listener for the http sink that updates the +// relevent counters. +func (sm *safeMetrics) httpStatusListener() httpStatusListener { + return &endpointMetricsHTTPStatusListener{ + safeMetrics: sm, + } +} + +// eventQueueListener returns a listener that maintains queue related counters. +func (sm *safeMetrics) eventQueueListener() eventQueueListener { + return &endpointMetricsEventQueueListener{ + safeMetrics: sm, + } +} + +// endpointMetricsHTTPStatusListener increments counters related to http sinks +// for the relevent events. +type endpointMetricsHTTPStatusListener struct { + *safeMetrics +} + +var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} + +func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Successes += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Failures += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Errors += len(events) +} + +// endpointMetricsEventQueueListener maintains the incoming events counter and +// the queues pending count. +type endpointMetricsEventQueueListener struct { + *safeMetrics +} + +func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Events += len(events) + eqc.Pending += len(events) +} + +func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Pending -= len(events) +} + +// endpoints is global registry of endpoints used to report metrics to expvar +var endpoints struct { + registered []*Endpoint + mu sync.Mutex +} + +// register places the endpoint into expvar so that stats are tracked. +func register(e *Endpoint) { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + endpoints.registered = append(endpoints.registered, e) +} + +func init() { + // NOTE(stevvooe): Setup registry metrics structure to report to expvar. + // Ideally, we do more metrics through logging but we need some nice + // realtime metrics for queue state for now. + + registry := expvar.Get("registry") + + if registry == nil { + registry = expvar.NewMap("registry") + } + + var notifications expvar.Map + notifications.Init() + notifications.Set("endpoints", expvar.Func(func() interface{} { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + var names []interface{} + for _, v := range endpoints.registered { + var epjson struct { + Name string `json:"name"` + URL string `json:"url"` + EndpointConfig + + Metrics EndpointMetrics + } + + epjson.Name = v.Name() + epjson.URL = v.URL() + epjson.EndpointConfig = v.EndpointConfig + + v.ReadMetrics(&epjson.Metrics) + + names = append(names, epjson) + } + + return names + })) + + registry.(*expvar.Map).Set("notifications", ¬ifications) +} diff --git a/docs/storage/notifications/sinks.go b/docs/storage/notifications/sinks.go new file mode 100644 index 00000000..2bf63e2d --- /dev/null +++ b/docs/storage/notifications/sinks.go @@ -0,0 +1,337 @@ +package notifications + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// NOTE(stevvooe): This file contains definitions for several utility sinks. +// Typically, the broadcaster is the only sink that should be required +// externally, but others are suitable for export if the need arises. Albeit, +// the tight integration with endpoint metrics should be removed. + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan []Event + closed chan chan struct{} +} + +// NewBroadcaster ... +// Add appends one or more sinks to the list of sinks. The broadcaster +// behavior will be affected by the properties of the sink. Generally, the +// sink should accept all messages and deal with reliability on its own. Use +// of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan []Event), + closed: make(chan chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts a block of events to be dispatched to all sinks. This method +// will never fail and should never block (hopefully!). The caller cedes the +// slice memory to the broadcaster and should not modify it after calling +// write. +func (b *Broadcaster) Write(events ...Event) error { + select { + case b.events <- events: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + logrus.Infof("broadcaster: closing") + select { + case <-b.closed: + // already closed + return fmt.Errorf("broadcaster: already closed") + default: + // do a little chan handoff dance to synchronize closing + closed := make(chan struct{}) + b.closed <- closed + close(b.closed) + <-closed + return nil + } +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + for { + select { + case block := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(block...); err != nil { + logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) + } + } + case closing := <-b.closed: + + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil { + logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) + } + } + closing <- struct{}{} + + logrus.Debugf("broadcaster: closed") + return + } + } +} + +// eventQueue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type eventQueue struct { + sink Sink + events *list.List + listeners []eventQueueListener + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// eventQueueListener is called when various events happen on the queue. +type eventQueueListener interface { + ingress(events ...Event) + egress(events ...Event) +} + +// newEventQueue returns a queue to the provided sink. If the updater is non- +// nil, it will be called to update pending metrics on ingress and egress. +func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { + eq := eventQueue{ + sink: sink, + events: list.New(), + listeners: listeners, + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// beend closed. +func (eq *eventQueue) Write(events ...Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + for _, listener := range eq.listeners { + listener.ingress(events...) + } + eq.events.PushBack(events) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *eventQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return fmt.Errorf("eventqueue: already closed") + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + + return eq.sink.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *eventQueue) run() { + for { + block := eq.next() + + if block == nil { + return // nil block means event queue is closed. + } + + if err := eq.sink.Write(block...); err != nil { + logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) + } + + for _, listener := range eq.listeners { + listener.egress(block...) + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *eventQueue) next() []Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.([]Event) + eq.events.Remove(front) + + return block +} + +// retryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Internally, it is a circuit breaker retries to manage reset. +// Concurrent calls to a retrying sink are serialized through the sink, +// meaning that if one is in-flight, another will not proceed. +type retryingSink struct { + mu sync.Mutex + sink Sink + closed bool + + // circuit breaker hueristics + failures struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + } +} + +type retryingSinkListener interface { + active(events ...Event) + retry(events ...Event) +} + +// TODO(stevvooe): We are using circuit break here, which actually doesn't +// make a whole lot of sense for this use case, since we always retry. Move +// this to use bounded exponential backoff. + +// newRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { + rs := &retryingSink{ + sink: sink, + } + rs.failures.threshold = threshold + rs.failures.backoff = backoff + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *retryingSink) Write(events ...Event) error { + rs.mu.Lock() + defer rs.mu.Unlock() + +retry: + + if rs.closed { + return ErrSinkClosed + } + + if !rs.proceed() { + logrus.Warnf("%v encountered too many errors, backing off", rs.sink) + rs.wait(rs.failures.backoff) + goto retry + } + + if err := rs.write(events...); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logrus.Errorf("retryingsink: error writing events: %v, retrying", err) + goto retry + } + + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *retryingSink) Close() error { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.closed { + return fmt.Errorf("retryingsink: already closed") + } + + rs.closed = true + return rs.sink.Close() +} + +// write provides a helper that dispatches failure and success properly. Used +// by write as the single-flight write call. +func (rs *retryingSink) write(events ...Event) error { + if err := rs.sink.Write(events...); err != nil { + rs.failure() + return err + } + + rs.reset() + return nil +} + +// wait backoff time against the sink, unlocking so others can proceed. Should +// only be called by methods that currently have the mutex. +func (rs *retryingSink) wait(backoff time.Duration) { + rs.mu.Unlock() + defer rs.mu.Lock() + + // backoff here + time.Sleep(backoff) +} + +// reset marks a succesful call. +func (rs *retryingSink) reset() { + rs.failures.recent = 0 + rs.failures.last = time.Time{} +} + +// failure records a failure. +func (rs *retryingSink) failure() { + rs.failures.recent++ + rs.failures.last = time.Now().UTC() +} + +// proceed returns true if the call should proceed based on circuit breaker +// hueristics. +func (rs *retryingSink) proceed() bool { + return rs.failures.recent < rs.failures.threshold || + time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) +} diff --git a/docs/storage/notifications/sinks_test.go b/docs/storage/notifications/sinks_test.go new file mode 100644 index 00000000..89756a99 --- /dev/null +++ b/docs/storage/notifications/sinks_test.go @@ -0,0 +1,223 @@ +package notifications + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + + for i := 0; i < 10; i++ { + sinks = append(sinks, &testSink{}) + } + + b := NewBroadcaster(sinks...) + + var block []Event + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := b.Write(block...); err != nil { + t.Fatalf("error writing block of length %d: %v", len(block), err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() // Wait until writes complete + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } + +} + +func TestEventQueue(t *testing.T) { + const nevents = 1000 + var ts testSink + metrics := newSafeMetrics() + eq := newEventQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: &ts, + delay: time.Millisecond * 1, + }, metrics.eventQueueListener()) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= nevents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := eq.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + metrics.Lock() + defer metrics.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + + if metrics.Events != nevents { + t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents) + } + + if metrics.Pending != 0 { + t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0) + } +} + +func TestRetryingSink(t *testing.T) { + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + var ts testSink + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: &ts, + } + s := newRetryingSink(flaky, 3, 10*time.Millisecond) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= 100; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + // Above 50, set the failure rate lower + if i > 50 { + s.mu.Lock() + flaky.rate = 0.90 + s.mu.Unlock() + } + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + defer wg.Done() + if err := s.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != 100 { + t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) + } +} + +type testSink struct { + events []Event + mu sync.Mutex + closed bool +} + +func (ts *testSink) Write(events ...Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.events = append(ts.events, events...) + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.closed = true + + logrus.Infof("closing testSink") + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(events ...Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(events...) +} + +type flakySink struct { + Sink + rate float64 +} + +func (fs *flakySink) Write(events ...Event) error { + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing %d events", len(events)) + } + + return fs.Sink.Write(events...) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err == nil { + t.Fatalf("no error on double close") + } + + // Write after closed should be an error + if err := sink.Write([]Event{}...); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +} diff --git a/docs/storage/paths.go b/docs/storage/paths.go new file mode 100644 index 00000000..9380dc65 --- /dev/null +++ b/docs/storage/paths.go @@ -0,0 +1,458 @@ +package storage + +import ( + "fmt" + "path" + "strings" + + "github.com/docker/distribution/digest" +) + +const storagePathVersion = "v2" + +// pathMapper maps paths based on "object names" and their ids. The "object +// names" mapped by pathMapper are internal to the storage system. +// +// The path layout in the storage backend is roughly as follows: +// +// /v2 +// -> repositories/ +// ->/ +// -> _manifests/ +// revisions +// -> +// -> link +// -> signatures +// //link +// tags/ +// -> current/link +// -> index +// -> //link +// -> _layers/ +// +// -> _uploads/ +// data +// startedat +// -> blob/ +// +// +// The storage backend layout is broken up into a content- addressable blob +// store and repositories. The content-addressable blob store holds most data +// throughout the backend, keyed by algorithm and digests of the underlying +// content. Access to the blob store is controled through links from the +// repository to blobstore. +// +// A repository is made up of layers, manifests and tags. The layers component +// is just a directory of layers which are "linked" into a repository. A layer +// can only be accessed through a qualified repository name if it is linked in +// the repository. Uploads of layers are managed in the uploads directory, +// which is key by upload uuid. When all data for an upload is received, the +// data is moved into the blob store and the upload directory is deleted. +// Abandoned uploads can be garbage collected by reading the startedat file +// and removing uploads that have been active for longer than a certain time. +// +// The third component of the repository directory is the manifests store, +// which is made up of a revision store and tag store. Manifests are stored in +// the blob store and linked into the revision store. Signatures are separated +// from the manifest payload data and linked into the blob store, as well. +// While the registry can save all revisions of a manifest, no relationship is +// implied as to the ordering of changes to a manifest. The tag store provides +// support for name, tag lookups of manifests, using "current/link" under a +// named tag directory. An index is maintained to support deletions of all +// revisions of a given manifest tag. +// +// We cover the path formats implemented by this path mapper below. +// +// Manifests: +// +// manifestRevisionPathSpec: /v2/repositories//_manifests/revisions/// +// manifestRevisionLinkPathSpec: /v2/repositories//_manifests/revisions///link +// manifestSignaturesPathSpec: /v2/repositories//_manifests/revisions///signatures/ +// manifestSignatureLinkPathSpec: /v2/repositories//_manifests/revisions///signatures///link +// +// Tags: +// +// manifestTagsPathSpec: /v2/repositories//_manifests/tags/ +// manifestTagPathSpec: /v2/repositories//_manifests/tags// +// manifestTagCurrentPathSpec: /v2/repositories//_manifests/tags//current/link +// manifestTagIndexPathSpec: /v2/repositories//_manifests/tags//index/ +// manifestTagIndexEntryPathSpec: /v2/repositories//_manifests/tags//index///link +// +// Layers: +// +// layerLinkPathSpec: /v2/repositories//_layers/tarsum////link +// +// Uploads: +// +// uploadDataPathSpec: /v2/repositories//_uploads//data +// uploadStartedAtPathSpec: /v2/repositories//_uploads//startedat +// +// Blob Store: +// +// blobPathSpec: /v2/blobs/// +// blobDataPathSpec: /v2/blobs////data +// +// For more information on the semantic meaning of each path and their +// contents, please see the path spec documentation. +type pathMapper struct { + root string + version string // should be a constant? +} + +var defaultPathMapper = &pathMapper{ + root: "/docker/registry/", + version: storagePathVersion, +} + +// path returns the path identified by spec. +func (pm *pathMapper) path(spec pathSpec) (string, error) { + + // Switch on the path object type and return the appropriate path. At + // first glance, one may wonder why we don't use an interface to + // accomplish this. By keep the formatting separate from the pathSpec, we + // keep separate the path generation componentized. These specs could be + // passed to a completely different mapper implementation and generate a + // different set of paths. + // + // For example, imagine migrating from one backend to the other: one could + // build a filesystem walker that converts a string path in one version, + // to an intermediate path object, than can be consumed and mapped by the + // other version. + + rootPrefix := []string{pm.root, pm.version} + repoPrefix := append(rootPrefix, "repositories") + + switch v := spec.(type) { + + case manifestRevisionPathSpec: + components, err := digestPathComponents(v.revision, false) + if err != nil { + return "", err + } + + return path.Join(append(append(repoPrefix, v.name, "_manifests", "revisions"), components...)...), nil + case manifestRevisionLinkPathSpec: + root, err := pm.path(manifestRevisionPathSpec{ + name: v.name, + revision: v.revision, + }) + + if err != nil { + return "", err + } + + return path.Join(root, "link"), nil + case manifestSignaturesPathSpec: + root, err := pm.path(manifestRevisionPathSpec{ + name: v.name, + revision: v.revision, + }) + + if err != nil { + return "", err + } + + return path.Join(root, "signatures"), nil + case manifestSignatureLinkPathSpec: + root, err := pm.path(manifestSignaturesPathSpec{ + name: v.name, + revision: v.revision, + }) + if err != nil { + return "", err + } + + signatureComponents, err := digestPathComponents(v.signature, false) + if err != nil { + return "", err + } + + return path.Join(root, path.Join(append(signatureComponents, "link")...)), nil + case manifestTagsPathSpec: + return path.Join(append(repoPrefix, v.name, "_manifests", "tags")...), nil + case manifestTagPathSpec: + root, err := pm.path(manifestTagsPathSpec{ + name: v.name, + }) + if err != nil { + return "", err + } + + return path.Join(root, v.tag), nil + case manifestTagCurrentPathSpec: + root, err := pm.path(manifestTagPathSpec{ + name: v.name, + tag: v.tag, + }) + if err != nil { + return "", err + } + + return path.Join(root, "current", "link"), nil + case manifestTagIndexPathSpec: + root, err := pm.path(manifestTagPathSpec{ + name: v.name, + tag: v.tag, + }) + if err != nil { + return "", err + } + + return path.Join(root, "index"), nil + case manifestTagIndexEntryPathSpec: + root, err := pm.path(manifestTagIndexPathSpec{ + name: v.name, + tag: v.tag, + }) + if err != nil { + return "", err + } + + components, err := digestPathComponents(v.revision, false) + if err != nil { + return "", err + } + + return path.Join(root, path.Join(append(components, "link")...)), nil + case layerLinkPathSpec: + components, err := digestPathComponents(v.digest, false) + if err != nil { + return "", err + } + + // For now, only map tarsum paths. + if components[0] != "tarsum" { + // Only tarsum is supported, for now + return "", fmt.Errorf("unsupported content digest: %v", v.digest) + } + + layerLinkPathComponents := append(repoPrefix, v.name, "_layers") + + return path.Join(path.Join(append(layerLinkPathComponents, components...)...), "link"), nil + case blobDataPathSpec: + components, err := digestPathComponents(v.digest, true) + if err != nil { + return "", err + } + + components = append(components, "data") + blobPathPrefix := append(rootPrefix, "blobs") + return path.Join(append(blobPathPrefix, components...)...), nil + + case uploadDataPathSpec: + return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil + case uploadStartedAtPathSpec: + return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil + default: + // TODO(sday): This is an internal error. Ensure it doesn't escape (panic?). + return "", fmt.Errorf("unknown path spec: %#v", v) + } +} + +// pathSpec is a type to mark structs as path specs. There is no +// implementation because we'd like to keep the specs and the mappers +// decoupled. +type pathSpec interface { + pathSpec() +} + +// manifestRevisionPathSpec describes the components of the directory path for +// a manifest revision. +type manifestRevisionPathSpec struct { + name string + revision digest.Digest +} + +func (manifestRevisionPathSpec) pathSpec() {} + +// manifestRevisionLinkPathSpec describes the path components required to look +// up the data link for a revision of a manifest. If this file is not present, +// the manifest blob is not available in the given repo. The contents of this +// file should just be the digest. +type manifestRevisionLinkPathSpec struct { + name string + revision digest.Digest +} + +func (manifestRevisionLinkPathSpec) pathSpec() {} + +// manifestSignaturesPathSpec decribes the path components for the directory +// containing all the signatures for the target blob. Entries are named with +// the underlying key id. +type manifestSignaturesPathSpec struct { + name string + revision digest.Digest +} + +func (manifestSignaturesPathSpec) pathSpec() {} + +// manifestSignatureLinkPathSpec decribes the path components used to look up +// a signature file by the hash of its blob. +type manifestSignatureLinkPathSpec struct { + name string + revision digest.Digest + signature digest.Digest +} + +func (manifestSignatureLinkPathSpec) pathSpec() {} + +// manifestTagsPathSpec describes the path elements required to point to the +// manifest tags directory. +type manifestTagsPathSpec struct { + name string +} + +func (manifestTagsPathSpec) pathSpec() {} + +// manifestTagPathSpec describes the path elements required to point to the +// manifest tag links files under a repository. These contain a blob id that +// can be used to look up the data and signatures. +type manifestTagPathSpec struct { + name string + tag string +} + +func (manifestTagPathSpec) pathSpec() {} + +// manifestTagCurrentPathSpec describes the link to the current revision for a +// given tag. +type manifestTagCurrentPathSpec struct { + name string + tag string +} + +func (manifestTagCurrentPathSpec) pathSpec() {} + +// manifestTagCurrentPathSpec describes the link to the index of revisions +// with the given tag. +type manifestTagIndexPathSpec struct { + name string + tag string +} + +func (manifestTagIndexPathSpec) pathSpec() {} + +// manifestTagIndexEntryPathSpec describes the link to a revisions of a +// manifest with given tag within the index. +type manifestTagIndexEntryPathSpec struct { + name string + tag string + revision digest.Digest +} + +func (manifestTagIndexEntryPathSpec) pathSpec() {} + +// layerLink specifies a path for a layer link, which is a file with a blob +// id. The layer link will contain a content addressable blob id reference +// into the blob store. The format of the contents is as follows: +// +// : +// +// The following example of the file contents is more illustrative: +// +// sha256:96443a84ce518ac22acb2e985eda402b58ac19ce6f91980bde63726a79d80b36 +// +// This says indicates that there is a blob with the id/digest, calculated via +// sha256 that can be fetched from the blob store. +type layerLinkPathSpec struct { + name string + digest digest.Digest +} + +func (layerLinkPathSpec) pathSpec() {} + +// blobAlgorithmReplacer does some very simple path sanitization for user +// input. Mostly, this is to provide some heirachry for tarsum digests. Paths +// should be "safe" before getting this far due to strict digest requirements +// but we can add further path conversion here, if needed. +var blobAlgorithmReplacer = strings.NewReplacer( + "+", "/", + ".", "/", + ";", "/", +) + +// // blobPathSpec contains the path for the registry global blob store. +// type blobPathSpec struct { +// digest digest.Digest +// } + +// func (blobPathSpec) pathSpec() {} + +// blobDataPathSpec contains the path for the registry global blob store. For +// now, this contains layer data, exclusively. +type blobDataPathSpec struct { + digest digest.Digest +} + +func (blobDataPathSpec) pathSpec() {} + +// uploadDataPathSpec defines the path parameters of the data file for +// uploads. +type uploadDataPathSpec struct { + name string + uuid string +} + +func (uploadDataPathSpec) pathSpec() {} + +// uploadDataPathSpec defines the path parameters for the file that stores the +// start time of an uploads. If it is missing, the upload is considered +// unknown. Admittedly, the presence of this file is an ugly hack to make sure +// we have a way to cleanup old or stalled uploads that doesn't rely on driver +// FileInfo behavior. If we come up with a more clever way to do this, we +// should remove this file immediately and rely on the startetAt field from +// the client to enforce time out policies. +type uploadStartedAtPathSpec struct { + name string + uuid string +} + +func (uploadStartedAtPathSpec) pathSpec() {} + +// digestPathComponents provides a consistent path breakdown for a given +// digest. For a generic digest, it will be as follows: +// +// / +// +// Most importantly, for tarsum, the layout looks like this: +// +// tarsum/// +// +// If multilevel is true, the first two bytes of the digest will separate +// groups of digest folder. It will be as follows: +// +// // +// +func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error) { + if err := dgst.Validate(); err != nil { + return nil, err + } + + algorithm := blobAlgorithmReplacer.Replace(dgst.Algorithm()) + hex := dgst.Hex() + prefix := []string{algorithm} + + var suffix []string + + if multilevel { + suffix = append(suffix, hex[:2]) + } + + suffix = append(suffix, hex) + + if tsi, err := digest.ParseTarSum(dgst.String()); err == nil { + // We have a tarsum! + version := tsi.Version + if version == "" { + version = "v0" + } + + prefix = []string{ + "tarsum", + version, + tsi.Algorithm, + } + } + + return append(prefix, suffix...), nil +} diff --git a/docs/storage/paths_test.go b/docs/storage/paths_test.go new file mode 100644 index 00000000..79410e75 --- /dev/null +++ b/docs/storage/paths_test.go @@ -0,0 +1,138 @@ +package storage + +import ( + "testing" + + "github.com/docker/distribution/digest" +) + +func TestPathMapper(t *testing.T) { + pm := &pathMapper{ + root: "/pathmapper-test", + } + + for _, testcase := range []struct { + spec pathSpec + expected string + err error + }{ + { + spec: manifestRevisionPathSpec{ + name: "foo/bar", + revision: "sha256:abcdef0123456789", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789", + }, + { + spec: manifestRevisionLinkPathSpec{ + name: "foo/bar", + revision: "sha256:abcdef0123456789", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/link", + }, + { + spec: manifestSignatureLinkPathSpec{ + name: "foo/bar", + revision: "sha256:abcdef0123456789", + signature: "sha256:abcdef0123456789", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/signatures/sha256/abcdef0123456789/link", + }, + { + spec: manifestSignaturesPathSpec{ + name: "foo/bar", + revision: "sha256:abcdef0123456789", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/signatures", + }, + { + spec: manifestTagsPathSpec{ + name: "foo/bar", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags", + }, + { + spec: manifestTagPathSpec{ + name: "foo/bar", + tag: "thetag", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag", + }, + { + spec: manifestTagCurrentPathSpec{ + name: "foo/bar", + tag: "thetag", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/current/link", + }, + { + spec: manifestTagIndexPathSpec{ + name: "foo/bar", + tag: "thetag", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/index", + }, + { + spec: manifestTagIndexEntryPathSpec{ + name: "foo/bar", + tag: "thetag", + revision: "sha256:abcdef0123456789", + }, + expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/index/sha256/abcdef0123456789/link", + }, + { + spec: layerLinkPathSpec{ + name: "foo/bar", + digest: "tarsum.v1+test:abcdef", + }, + expected: "/pathmapper-test/repositories/foo/bar/_layers/tarsum/v1/test/abcdef/link", + }, + { + spec: blobDataPathSpec{ + digest: digest.Digest("tarsum.dev+sha512:abcdefabcdefabcdef908909909"), + }, + expected: "/pathmapper-test/blobs/tarsum/dev/sha512/ab/abcdefabcdefabcdef908909909/data", + }, + { + spec: blobDataPathSpec{ + digest: digest.Digest("tarsum.v1+sha256:abcdefabcdefabcdef908909909"), + }, + expected: "/pathmapper-test/blobs/tarsum/v1/sha256/ab/abcdefabcdefabcdef908909909/data", + }, + + { + spec: uploadDataPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", + }, + expected: "/pathmapper-test/repositories/foo/bar/_uploads/asdf-asdf-asdf-adsf/data", + }, + { + spec: uploadStartedAtPathSpec{ + name: "foo/bar", + uuid: "asdf-asdf-asdf-adsf", + }, + expected: "/pathmapper-test/repositories/foo/bar/_uploads/asdf-asdf-asdf-adsf/startedat", + }, + } { + p, err := pm.path(testcase.spec) + if err != nil { + t.Fatalf("unexpected generating path (%T): %v", testcase.spec, err) + } + + if p != testcase.expected { + t.Fatalf("unexpected path generated (%T): %q != %q", testcase.spec, p, testcase.expected) + } + } + + // Add a few test cases to ensure we cover some errors + + // Specify a path that requires a revision and get a digest validation error. + badpath, err := pm.path(manifestSignaturesPathSpec{ + name: "foo/bar", + }) + if err == nil { + t.Fatalf("expected an error when mapping an invalid revision: %s", badpath) + } + +} diff --git a/docs/storage/registry.go b/docs/storage/registry.go new file mode 100644 index 00000000..ed865007 --- /dev/null +++ b/docs/storage/registry.go @@ -0,0 +1,80 @@ +package storage + +import ( + "github.com/docker/distribution/storagedriver" + "golang.org/x/net/context" +) + +// registry is the top-level implementation of Registry for use in the storage +// package. All instances should descend from this object. +type registry struct { + driver storagedriver.StorageDriver + pm *pathMapper + blobStore *blobStore +} + +// NewRegistryWithDriver creates a new registry instance from the provided +// driver. The resulting registry may be shared by multiple goroutines but is +// cheap to allocate. +func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry { + bs := &blobStore{} + + reg := ®istry{ + driver: driver, + blobStore: bs, + + // TODO(sday): This should be configurable. + pm: defaultPathMapper, + } + + reg.blobStore.registry = reg + + return reg +} + +// Repository returns an instance of the repository tied to the registry. +// Instances should not be shared between goroutines but are cheap to +// allocate. In general, they should be request scoped. +func (reg *registry) Repository(ctx context.Context, name string) Repository { + return &repository{ + ctx: ctx, + registry: reg, + name: name, + } +} + +// repository provides name-scoped access to various services. +type repository struct { + *registry + ctx context.Context + name string +} + +// Name returns the name of the repository. +func (repo *repository) Name() string { + return repo.name +} + +// Manifests returns an instance of ManifestService. Instantiation is cheap and +// may be context sensitive in the future. The instance should be used similar +// to a request local. +func (repo *repository) Manifests() ManifestService { + return &manifestStore{ + repository: repo, + revisionStore: &revisionStore{ + repository: repo, + }, + tagStore: &tagStore{ + repository: repo, + }, + } +} + +// Layers returns an instance of the LayerService. Instantiation is cheap and +// may be context sensitive in the future. The instance should be used similar +// to a request local. +func (repo *repository) Layers() LayerService { + return &layerStore{ + repository: repo, + } +} diff --git a/docs/storage/revisionstore.go b/docs/storage/revisionstore.go new file mode 100644 index 00000000..b3ecd711 --- /dev/null +++ b/docs/storage/revisionstore.go @@ -0,0 +1,207 @@ +package storage + +import ( + "encoding/json" + "path" + + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/libtrust" +) + +// revisionStore supports storing and managing manifest revisions. +type revisionStore struct { + *repository +} + +// exists returns true if the revision is available in the named repository. +func (rs *revisionStore) exists(revision digest.Digest) (bool, error) { + revpath, err := rs.pm.path(manifestRevisionPathSpec{ + name: rs.Name(), + revision: revision, + }) + + if err != nil { + return false, err + } + + exists, err := exists(rs.driver, revpath) + if err != nil { + return false, err + } + + return exists, nil +} + +// get retrieves the manifest, keyed by revision digest. +func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, error) { + // Ensure that this revision is available in this repository. + if exists, err := rs.exists(revision); err != nil { + return nil, err + } else if !exists { + return nil, ErrUnknownManifestRevision{ + Name: rs.Name(), + Revision: revision, + } + } + + content, err := rs.blobStore.get(revision) + if err != nil { + return nil, err + } + + // Fetch the signatures for the manifest + signatures, err := rs.getSignatures(revision) + if err != nil { + return nil, err + } + + jsig, err := libtrust.NewJSONSignature(content, signatures...) + if err != nil { + return nil, err + } + + // Extract the pretty JWS + raw, err := jsig.PrettySignature("signatures") + if err != nil { + return nil, err + } + + var sm manifest.SignedManifest + if err := json.Unmarshal(raw, &sm); err != nil { + return nil, err + } + + return &sm, nil +} + +// put stores the manifest in the repository, if not already present. Any +// updated signatures will be stored, as well. +func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) { + // Resolve the payload in the manifest. + payload, err := sm.Payload() + if err != nil { + return "", err + } + + // Digest and store the manifest payload in the blob store. + revision, err := rs.blobStore.put(payload) + if err != nil { + logrus.Errorf("error putting payload into blobstore: %v", err) + return "", err + } + + // Link the revision into the repository. + if err := rs.link(revision); err != nil { + return "", err + } + + // Grab each json signature and store them. + signatures, err := sm.Signatures() + if err != nil { + return "", err + } + + for _, signature := range signatures { + if err := rs.putSignature(revision, signature); err != nil { + return "", err + } + } + + return revision, nil +} + +// link links the revision into the repository. +func (rs *revisionStore) link(revision digest.Digest) error { + revisionPath, err := rs.pm.path(manifestRevisionLinkPathSpec{ + name: rs.Name(), + revision: revision, + }) + + if err != nil { + return err + } + + if exists, err := exists(rs.driver, revisionPath); err != nil { + return err + } else if exists { + // Revision has already been linked! + return nil + } + + return rs.blobStore.link(revisionPath, revision) +} + +// delete removes the specified manifest revision from storage. +func (rs *revisionStore) delete(revision digest.Digest) error { + revisionPath, err := rs.pm.path(manifestRevisionPathSpec{ + name: rs.Name(), + revision: revision, + }) + + if err != nil { + return err + } + + return rs.driver.Delete(revisionPath) +} + +// getSignatures retrieves all of the signature blobs for the specified +// manifest revision. +func (rs *revisionStore) getSignatures(revision digest.Digest) ([][]byte, error) { + signaturesPath, err := rs.pm.path(manifestSignaturesPathSpec{ + name: rs.Name(), + revision: revision, + }) + + if err != nil { + return nil, err + } + + // Need to append signature digest algorithm to path to get all items. + // Perhaps, this should be in the pathMapper but it feels awkward. This + // can be eliminated by implementing listAll on drivers. + signaturesPath = path.Join(signaturesPath, "sha256") + + signaturePaths, err := rs.driver.List(signaturesPath) + if err != nil { + return nil, err + } + + var signatures [][]byte + for _, sigPath := range signaturePaths { + // Append the link portion + sigPath = path.Join(sigPath, "link") + + // TODO(stevvooe): These fetches should be parallelized for performance. + p, err := rs.blobStore.linked(sigPath) + if err != nil { + return nil, err + } + + signatures = append(signatures, p) + } + + return signatures, nil +} + +// putSignature stores the signature for the provided manifest revision. +func (rs *revisionStore) putSignature(revision digest.Digest, signature []byte) error { + signatureDigest, err := rs.blobStore.put(signature) + if err != nil { + return err + } + + signaturePath, err := rs.pm.path(manifestSignatureLinkPathSpec{ + name: rs.Name(), + revision: revision, + signature: signatureDigest, + }) + + if err != nil { + return err + } + + return rs.blobStore.link(signaturePath, signatureDigest) +} diff --git a/docs/storage/services.go b/docs/storage/services.go new file mode 100644 index 00000000..7e6ac476 --- /dev/null +++ b/docs/storage/services.go @@ -0,0 +1,84 @@ +package storage + +import ( + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "golang.org/x/net/context" +) + +// TODO(stevvooe): These types need to be moved out of the storage package. + +// Registry represents a collection of repositories, addressable by name. +type Registry interface { + // Repository should return a reference to the named repository. The + // registry may or may not have the repository but should always return a + // reference. + Repository(ctx context.Context, name string) Repository +} + +// Repository is a named collection of manifests and layers. +type Repository interface { + // Name returns the name of the repository. + Name() string + + // Manifests returns a reference to this repository's manifest service. + Manifests() ManifestService + + // Layers returns a reference to this repository's layers service. + Layers() LayerService +} + +// ManifestService provides operations on image manifests. +type ManifestService interface { + // Tags lists the tags under the named repository. + Tags() ([]string, error) + + // Exists returns true if the manifest exists. + Exists(tag string) (bool, error) + + // Get retrieves the named manifest, if it exists. + Get(tag string) (*manifest.SignedManifest, error) + + // Put creates or updates the named manifest. + // Put(tag string, manifest *manifest.SignedManifest) (digest.Digest, error) + Put(tag string, manifest *manifest.SignedManifest) error + + // Delete removes the named manifest, if it exists. + Delete(tag string) error + + // TODO(stevvooe): There are several changes that need to be done to this + // interface: + // + // 1. Get(tag string) should be GetByTag(tag string) + // 2. Put(tag string, manifest *manifest.SignedManifest) should be + // Put(manifest *manifest.SignedManifest). The method can read the + // tag on manifest to automatically tag it in the repository. + // 3. Need a GetByDigest(dgst digest.Digest) method. + // 4. Allow explicit tagging with Tag(digest digest.Digest, tag string) + // 5. Support reading tags with a re-entrant reader to avoid large + // allocations in the registry. + // 6. Long-term: Provide All() method that lets one scroll through all of + // the manifest entries. + // 7. Long-term: break out concept of signing from manifests. This is + // really a part of the distribution sprint. + // 8. Long-term: Manifest should be an interface. This code shouldn't + // really be concerned with the storage format. +} + +// LayerService provides operations on layer files in a backend storage. +type LayerService interface { + // Exists returns true if the layer exists. + Exists(digest digest.Digest) (bool, error) + + // Fetch the layer identifed by TarSum. + Fetch(digest digest.Digest) (Layer, error) + + // Upload begins a layer upload to repository identified by name, + // returning a handle. + Upload() (LayerUpload, error) + + // Resume continues an in progress layer upload, returning a handle to the + // upload. The caller should seek to the latest desired upload location + // before proceeding. + Resume(uuid string) (LayerUpload, error) +} diff --git a/docs/storage/tagstore.go b/docs/storage/tagstore.go new file mode 100644 index 00000000..f7b87a25 --- /dev/null +++ b/docs/storage/tagstore.go @@ -0,0 +1,157 @@ +package storage + +import ( + "path" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storagedriver" +) + +// tagStore provides methods to manage manifest tags in a backend storage driver. +type tagStore struct { + *repository +} + +// tags lists the manifest tags for the specified repository. +func (ts *tagStore) tags() ([]string, error) { + p, err := ts.pm.path(manifestTagPathSpec{ + name: ts.name, + }) + if err != nil { + return nil, err + } + + var tags []string + entries, err := ts.driver.List(p) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrUnknownRepository{Name: ts.name} + default: + return nil, err + } + } + + for _, entry := range entries { + _, filename := path.Split(entry) + + tags = append(tags, filename) + } + + return tags, nil +} + +// exists returns true if the specified manifest tag exists in the repository. +func (ts *tagStore) exists(tag string) (bool, error) { + tagPath, err := ts.pm.path(manifestTagCurrentPathSpec{ + name: ts.Name(), + tag: tag, + }) + if err != nil { + return false, err + } + + exists, err := exists(ts.driver, tagPath) + if err != nil { + return false, err + } + + return exists, nil +} + +// tag tags the digest with the given tag, updating the the store to point at +// the current tag. The digest must point to a manifest. +func (ts *tagStore) tag(tag string, revision digest.Digest) error { + indexEntryPath, err := ts.pm.path(manifestTagIndexEntryPathSpec{ + name: ts.Name(), + tag: tag, + revision: revision, + }) + + if err != nil { + return err + } + + currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{ + name: ts.Name(), + tag: tag, + }) + + if err != nil { + return err + } + + // Link into the index + if err := ts.blobStore.link(indexEntryPath, revision); err != nil { + return err + } + + // Overwrite the current link + return ts.blobStore.link(currentPath, revision) +} + +// resolve the current revision for name and tag. +func (ts *tagStore) resolve(tag string) (digest.Digest, error) { + currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{ + name: ts.Name(), + tag: tag, + }) + + if err != nil { + return "", err + } + + if exists, err := exists(ts.driver, currentPath); err != nil { + return "", err + } else if !exists { + return "", ErrUnknownManifest{Name: ts.Name(), Tag: tag} + } + + revision, err := ts.blobStore.readlink(currentPath) + if err != nil { + return "", err + } + + return revision, nil +} + +// revisions returns all revisions with the specified name and tag. +func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) { + manifestTagIndexPath, err := ts.pm.path(manifestTagIndexPathSpec{ + name: ts.Name(), + tag: tag, + }) + + if err != nil { + return nil, err + } + + // TODO(stevvooe): Need to append digest alg to get listing of revisions. + manifestTagIndexPath = path.Join(manifestTagIndexPath, "sha256") + + entries, err := ts.driver.List(manifestTagIndexPath) + if err != nil { + return nil, err + } + + var revisions []digest.Digest + for _, entry := range entries { + revisions = append(revisions, digest.NewDigestFromHex("sha256", path.Base(entry))) + } + + return revisions, nil +} + +// delete removes the tag from repository, including the history of all +// revisions that have the specified tag. +func (ts *tagStore) delete(tag string) error { + tagPath, err := ts.pm.path(manifestTagPathSpec{ + name: ts.Name(), + tag: tag, + }) + if err != nil { + return err + } + + return ts.driver.Delete(tagPath) +}