diff --git a/api_test.go b/api_test.go new file mode 100644 index 00000000..c850f141 --- /dev/null +++ b/api_test.go @@ -0,0 +1,236 @@ +package registry + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "net/url" + "os" + "testing" + + "github.com/Sirupsen/logrus" + _ "github.com/docker/docker-registry/storagedriver/inmemory" + + "github.com/gorilla/handlers" + + "github.com/docker/docker-registry/common/testutil" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/digest" +) + +// TestLayerAPI conducts a full of the of the layer api. +func TestLayerAPI(t *testing.T) { + // TODO(stevvooe): This test code is complete junk but it should cover the + // complete flow. This must be broken down and checked against the + // specification *before* we submit the final to docker core. + + config := configuration.Configuration{ + Storage: configuration.Storage{ + "inmemory": configuration.Parameters{}, + }, + } + + app := NewApp(config) + server := httptest.NewServer(handlers.CombinedLoggingHandler(os.Stderr, app)) + router := v2APIRouter() + + u, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + + imageName := "foo/bar" + // "build" our layer file + layerFile, tarSumStr, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating random layer file: %v", err) + } + + layerDigest := digest.Digest(tarSumStr) + + // ----------------------------------- + // Test fetch for non-existent content + r, err := router.GetRoute(routeNameBlob).Host(u.Host). + URL("name", imageName, + "digest", tarSumStr) + + resp, err := http.Get(r.String()) + if err != nil { + t.Fatalf("unexpected error fetching non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Test head request for non-existent content + resp, err = http.Head(r.String()) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusNotFound: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on non-existent layer: %v, %v", resp.StatusCode, resp.Status) + } + + // ------------------------------------------ + // Upload a layer + r, err = router.GetRoute(routeNameBlobUpload).Host(u.Host). + URL("name", imageName) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + resp, err = http.Post(r.String(), "", nil) + if err != nil { + t.Fatalf("error starting layer upload: %v", err) + } + + if resp.StatusCode != http.StatusAccepted { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status starting layer upload: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { // TODO(stevvooe): Need better check here. + t.Fatalf("unexpected Location: %q != %q", resp.Header.Get("Location"), "foo") + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerLength, _ := layerFile.Seek(0, os.SEEK_END) + layerFile.Seek(0, os.SEEK_SET) + + uploadURLStr := resp.Header.Get("Location") + + // TODO(sday): Cancel the layer upload here and restart. + + query := url.Values{ + "digest": []string{layerDigest.String()}, + "length": []string{fmt.Sprint(layerLength)}, + } + + uploadURL, err := url.Parse(uploadURLStr) + if err != nil { + t.Fatalf("unexpected error parsing url: %v", err) + } + + uploadURL.RawQuery = query.Encode() + + // Just do a monolithic upload + req, err := http.NewRequest("PUT", uploadURL.String(), layerFile) + if err != nil { + t.Fatalf("unexpected error creating new request: %v", err) + } + + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("unexpected error doing put: %v", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusCreated: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status putting chunk: %v, %v", resp.StatusCode, resp.Status) + } + + if resp.Header.Get("Location") == "" { + t.Fatalf("unexpected Location: %q", resp.Header.Get("Location")) + } + + if resp.Header.Get("Content-Length") != "0" { + t.Fatalf("unexpected content-length: %q != %q", resp.Header.Get("Content-Length"), "0") + } + + layerURL := resp.Header.Get("Location") + + // ------------------------ + // Use a head request to see if the layer exists. + resp, err = http.Head(layerURL) + if err != nil { + t.Fatalf("unexpected error checking head on non-existent layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status checking head on layer: %v, %v", resp.StatusCode, resp.Status) + } + + logrus.Infof("fetch the layer") + // ---------------- + // Fetch the layer! + resp, err = http.Get(layerURL) + if err != nil { + t.Fatalf("unexpected error fetching layer: %v", err) + } + + switch resp.StatusCode { + case http.StatusOK: + break // expected + default: + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("unexpected status fetching layer: %v, %v", resp.StatusCode, resp.Status) + } + + // Verify the body + verifier := digest.NewDigestVerifier(layerDigest) + io.Copy(verifier, resp.Body) + + if !verifier.Verified() { + d, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("unexpected status checking head on layer ayo!: %v, %v", resp.StatusCode, resp.Status) + } + + t.Logf("response:\n%s", string(d)) + t.Fatalf("response body did not pass verification") + } +} diff --git a/app.go b/app.go index bc7df554..25bf572d 100644 --- a/app.go +++ b/app.go @@ -3,7 +3,11 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" + "github.com/docker/docker-registry/configuration" + "github.com/docker/docker-registry/storage" log "github.com/Sirupsen/logrus" "github.com/gorilla/mux" @@ -16,6 +20,12 @@ type App struct { Config configuration.Configuration router *mux.Router + + // driver maintains the app global storage driver instance. + driver storagedriver.StorageDriver + + // services contains the main services instance for the application. + services *storage.Services } // NewApp takes a configuration and returns a configured app, ready to serve @@ -29,11 +39,23 @@ func NewApp(configuration configuration.Configuration) *App { // Register the handler dispatchers. app.register(routeNameImageManifest, imageManifestDispatcher) - app.register(routeNameBlob, layerDispatcher) app.register(routeNameTags, tagsDispatcher) + app.register(routeNameBlob, layerDispatcher) app.register(routeNameBlobUpload, layerUploadDispatcher) app.register(routeNameBlobUploadResume, layerUploadDispatcher) + driver, err := factory.Create(configuration.Storage.Type(), configuration.Storage.Parameters()) + + if err != nil { + // TODO(stevvooe): Move the creation of a service into a protected + // method, where this is created lazily. Its status can be queried via + // a health check. + panic(err) + } + + app.driver = driver + app.services = storage.NewServices(app.driver) + return app } @@ -64,6 +86,22 @@ type dispatchFunc func(ctx *Context, r *http.Request) http.Handler // TODO(stevvooe): dispatchers should probably have some validation error // chain with proper error reporting. +// singleStatusResponseWriter only allows the first status to be written to be +// the valid request status. The current use case of this class should be +// factored out. +type singleStatusResponseWriter struct { + http.ResponseWriter + status int +} + +func (ssrw *singleStatusResponseWriter) WriteHeader(status int) { + if ssrw.status != 0 { + return + } + ssrw.status = status + ssrw.ResponseWriter.WriteHeader(status) +} + // dispatcher returns a handler that constructs a request specific context and // handler, using the dispatch factory function. func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { @@ -80,14 +118,17 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { context.log = log.WithField("name", context.Name) handler := dispatch(context, r) + ssrw := &singleStatusResponseWriter{ResponseWriter: w} context.log.Infoln("handler", resolveHandlerName(r.Method, handler)) - handler.ServeHTTP(w, r) + handler.ServeHTTP(ssrw, r) // Automated error response handling here. Handlers may return their // own errors if they need different behavior (such as range errors // for layer upload). - if len(context.Errors.Errors) > 0 { - w.WriteHeader(http.StatusBadRequest) + if context.Errors.Len() > 0 { + if ssrw.status == 0 { + w.WriteHeader(http.StatusBadRequest) + } serveJSON(w, context.Errors) } }) diff --git a/context.go b/context.go index a5706b4e..c246d6ac 100644 --- a/context.go +++ b/context.go @@ -1,8 +1,6 @@ package registry -import ( - "github.com/Sirupsen/logrus" -) +import "github.com/Sirupsen/logrus" // Context should contain the request specific context for use in across // handlers. Resources that don't need to be shared across handlers should not @@ -20,11 +18,6 @@ type Context struct { // handler *must not* start the response via http.ResponseWriter. Errors Errors - // TODO(stevvooe): Context would be a good place to create a - // representation of the "authorized resource". Perhaps, rather than - // having fields like "name", the context should be a set of parameters - // then we do routing from there. - // vars contains the extracted gorilla/mux variables that can be used for // assignment. vars map[string]string diff --git a/errors.go b/errors.go index 9a28e5b6..e2f16ba0 100644 --- a/errors.go +++ b/errors.go @@ -17,20 +17,14 @@ const ( // The following errors can happen during a layer upload. - // ErrorCodeInvalidChecksum is returned when uploading a layer if the - // provided checksum does not match the layer contents. - ErrorCodeInvalidChecksum + // ErrorCodeInvalidDigest is returned when uploading a layer if the + // provided digest does not match the layer contents. + ErrorCodeInvalidDigest // ErrorCodeInvalidLength is returned when uploading a layer if the provided // length does not match the content length. ErrorCodeInvalidLength - // ErrorCodeInvalidTarsum is returned when the provided tarsum does not - // match the computed tarsum of the contents. - ErrorCodeInvalidTarsum - - // The following errors can happen during manifest upload. - // ErrorCodeInvalidName is returned when the name in the manifest does not // match the provided name. ErrorCodeInvalidName @@ -47,6 +41,9 @@ const ( // nonexistent layer. ErrorCodeUnknownLayer + // ErrorCodeUnknownLayerUpload is returned when an upload is accessed. + ErrorCodeUnknownLayerUpload + // ErrorCodeUntrustedSignature is returned when the manifest is signed by an // untrusted source. ErrorCodeUntrustedSignature @@ -54,25 +51,25 @@ const ( var errorCodeStrings = map[ErrorCode]string{ ErrorCodeUnknown: "UNKNOWN", - ErrorCodeInvalidChecksum: "INVALID_CHECKSUM", + ErrorCodeInvalidDigest: "INVALID_DIGEST", ErrorCodeInvalidLength: "INVALID_LENGTH", - ErrorCodeInvalidTarsum: "INVALID_TARSUM", ErrorCodeInvalidName: "INVALID_NAME", ErrorCodeInvalidTag: "INVALID_TAG", ErrorCodeUnverifiedManifest: "UNVERIFIED_MANIFEST", ErrorCodeUnknownLayer: "UNKNOWN_LAYER", + ErrorCodeUnknownLayerUpload: "UNKNOWN_LAYER_UPLOAD", ErrorCodeUntrustedSignature: "UNTRUSTED_SIGNATURE", } var errorCodesMessages = map[ErrorCode]string{ ErrorCodeUnknown: "unknown error", - ErrorCodeInvalidChecksum: "provided checksum did not match uploaded content", + ErrorCodeInvalidDigest: "provided digest did not match uploaded content", ErrorCodeInvalidLength: "provided length did not match content length", - ErrorCodeInvalidTarsum: "provided tarsum did not match binary content", ErrorCodeInvalidName: "Manifest name did not match URI", ErrorCodeInvalidTag: "Manifest tag did not match URI", ErrorCodeUnverifiedManifest: "Manifest failed signature validation", ErrorCodeUnknownLayer: "Referenced layer not available", + ErrorCodeUnknownLayerUpload: "cannot resume unknown layer upload", ErrorCodeUntrustedSignature: "Manifest signed by untrusted source", } @@ -136,7 +133,7 @@ func (ec *ErrorCode) UnmarshalText(text []byte) error { // Error provides a wrapper around ErrorCode with extra Details provided. type Error struct { - Code ErrorCode `json:"code,omitempty"` + Code ErrorCode `json:"code"` Message string `json:"message,omitempty"` Detail interface{} `json:"detail,omitempty"` } @@ -144,7 +141,7 @@ type Error struct { // Error returns a human readable representation of the error. func (e Error) Error() string { return fmt.Sprintf("%s: %s", - strings.Title(strings.Replace(e.Code.String(), "_", " ", -1)), + strings.ToLower(strings.Replace(e.Code.String(), "_", " ", -1)), e.Message) } @@ -167,6 +164,10 @@ func (errs *Errors) Push(code ErrorCode, details ...interface{}) { detail = details[0] } + if err, ok := detail.(error); ok { + detail = err.Error() + } + errs.PushErr(Error{ Code: code, Message: code.Message(), @@ -180,7 +181,7 @@ func (errs *Errors) PushErr(err error) { } func (errs *Errors) Error() string { - switch len(errs.Errors) { + switch errs.Len() { case 0: return "" case 1: @@ -194,6 +195,16 @@ func (errs *Errors) Error() string { } } +// Clear clears the errors. +func (errs *Errors) Clear() { + errs.Errors = errs.Errors[:0] +} + +// Len returns the current number of errors. +func (errs *Errors) Len() int { + return len(errs.Errors) +} + // DetailUnknownLayer provides detail for unknown layer errors, returned by // image manifest push for layers that are not yet transferred. This intended // to only be used on the backend to return detail for this specific error. diff --git a/errors_test.go b/errors_test.go index e6ec72f9..709b6ced 100644 --- a/errors_test.go +++ b/errors_test.go @@ -56,7 +56,7 @@ func TestErrorCodes(t *testing.T) { func TestErrorsManagement(t *testing.T) { var errs Errors - errs.Push(ErrorCodeInvalidChecksum) + errs.Push(ErrorCodeInvalidDigest) var detail DetailUnknownLayer detail.Unknown.BlobSum = "sometestblobsumdoesntmatter" @@ -69,7 +69,20 @@ func TestErrorsManagement(t *testing.T) { t.Fatalf("error marashaling errors: %v", err) } - expectedJSON := "{\"errors\":[{\"code\":\"INVALID_CHECKSUM\",\"message\":\"provided checksum did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" + expectedJSON := "{\"errors\":[{\"code\":\"INVALID_DIGEST\",\"message\":\"provided digest did not match uploaded content\"},{\"code\":\"UNKNOWN_LAYER\",\"message\":\"Referenced layer not available\",\"detail\":{\"unknown\":{\"blobSum\":\"sometestblobsumdoesntmatter\"}}}]}" + + if string(p) != expectedJSON { + t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) + } + + errs.Clear() + errs.Push(ErrorCodeUnknown) + expectedJSON = "{\"errors\":[{\"code\":\"UNKNOWN\",\"message\":\"unknown error\"}]}" + p, err = json.Marshal(errs) + + if err != nil { + t.Fatalf("error marashaling errors: %v", err) + } if string(p) != expectedJSON { t.Fatalf("unexpected json: %q != %q", string(p), expectedJSON) diff --git a/helpers.go b/helpers.go index b3b9d744..7714d029 100644 --- a/helpers.go +++ b/helpers.go @@ -2,7 +2,10 @@ package registry import ( "encoding/json" + "io" "net/http" + + "github.com/gorilla/mux" ) // serveJSON marshals v and sets the content-type header to @@ -18,3 +21,21 @@ func serveJSON(w http.ResponseWriter, v interface{}) error { return nil } + +// closeResources closes all the provided resources after running the target +// handler. +func closeResources(handler http.Handler, closers ...io.Closer) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, closer := range closers { + defer closer.Close() + } + handler.ServeHTTP(w, r) + }) +} + +// clondedRoute returns a clone of the named route from the router. +func clonedRoute(router *mux.Router, name string) *mux.Route { + route := new(mux.Route) + *route = *router.GetRoute(name) // clone the route + return route +} diff --git a/layer.go b/layer.go index 82a1e6d9..38fdfe39 100644 --- a/layer.go +++ b/layer.go @@ -3,17 +3,28 @@ package registry import ( "net/http" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerDispatcher uses the request context to build a layerHandler. func layerDispatcher(ctx *Context, r *http.Request) http.Handler { - layerHandler := &layerHandler{ - Context: ctx, - TarSum: ctx.vars["tarsum"], + dgst, err := digest.ParseDigest(ctx.vars["digest"]) + + if err != nil { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx.Errors.Push(ErrorCodeInvalidDigest, err) + }) } - layerHandler.log = layerHandler.log.WithField("tarsum", layerHandler.TarSum) + layerHandler := &layerHandler{ + Context: ctx, + Digest: dgst, + } + + layerHandler.log = layerHandler.log.WithField("digest", dgst) return handlers.MethodHandler{ "GET": http.HandlerFunc(layerHandler.GetLayer), @@ -25,11 +36,44 @@ func layerDispatcher(ctx *Context, r *http.Request) http.Handler { type layerHandler struct { *Context - TarSum string + Digest digest.Digest } // GetLayer fetches the binary data from backend storage returns it in the // response. func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) { + layers := lh.services.Layers() + layer, err := layers.Fetch(lh.Name, lh.Digest) + + if err != nil { + switch err { + case storage.ErrLayerUnknown: + w.WriteHeader(http.StatusNotFound) + lh.Errors.Push(ErrorCodeUnknownLayer, + map[string]interface{}{ + "unknown": FSLayer{BlobSum: lh.Digest}, + }) + return + default: + lh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + defer layer.Close() + + http.ServeContent(w, r, layer.Digest().String(), layer.CreatedAt(), layer) +} + +func buildLayerURL(router *mux.Router, r *http.Request, layer storage.Layer) (string, error) { + route := clonedRoute(router, routeNameBlob) + + layerURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", layer.Name(), + "digest", layer.Digest().String()) + if err != nil { + return "", err + } + + return layerURL.String(), nil } diff --git a/layerupload.go b/layerupload.go index 8916b552..d1ec4206 100644 --- a/layerupload.go +++ b/layerupload.go @@ -1,64 +1,225 @@ package registry import ( + "fmt" + "io" "net/http" + "strconv" + "github.com/Sirupsen/logrus" + "github.com/docker/docker-registry/digest" + "github.com/docker/docker-registry/storage" "github.com/gorilla/handlers" + "github.com/gorilla/mux" ) // layerUploadDispatcher constructs and returns the layer upload handler for // the given request context. func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler { - layerUploadHandler := &layerUploadHandler{ + luh := &layerUploadHandler{ Context: ctx, - TarSum: ctx.vars["tarsum"], UUID: ctx.vars["uuid"], } - layerUploadHandler.log = layerUploadHandler.log.WithField("tarsum", layerUploadHandler.TarSum) + handler := http.Handler(handlers.MethodHandler{ + "POST": http.HandlerFunc(luh.StartLayerUpload), + "GET": http.HandlerFunc(luh.GetUploadStatus), + "HEAD": http.HandlerFunc(luh.GetUploadStatus), + "PUT": http.HandlerFunc(luh.PutLayerChunk), + "DELETE": http.HandlerFunc(luh.CancelLayerUpload), + }) - if layerUploadHandler.UUID != "" { - layerUploadHandler.log = layerUploadHandler.log.WithField("uuid", layerUploadHandler.UUID) + if luh.UUID != "" { + luh.log = luh.log.WithField("uuid", luh.UUID) + + layers := ctx.services.Layers() + upload, err := layers.Resume(luh.UUID) + + if err != nil && err != storage.ErrLayerUploadUnknown { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.Infof("error resolving upload: %v", err) + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + }) + } + + luh.Upload = upload + handler = closeResources(handler, luh.Upload) } - return handlers.MethodHandler{ - "POST": http.HandlerFunc(layerUploadHandler.StartLayerUpload), - "GET": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "HEAD": http.HandlerFunc(layerUploadHandler.GetUploadStatus), - "PUT": http.HandlerFunc(layerUploadHandler.PutLayerChunk), - "DELETE": http.HandlerFunc(layerUploadHandler.CancelLayerUpload), - } + return handler } // layerUploadHandler handles the http layer upload process. type layerUploadHandler struct { *Context - // TarSum is the unique identifier of the layer being uploaded. - TarSum string - // UUID identifies the upload instance for the current request. UUID string + + Upload storage.LayerUpload } // StartLayerUpload begins the layer upload process and allocates a server- // side upload session. func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) { + layers := luh.services.Layers() + upload, err := layers.Upload(luh.Name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + luh.Upload = upload + defer luh.Upload.Close() + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + w.WriteHeader(http.StatusAccepted) } // GetUploadStatus returns the status of a given upload, identified by uuid. func (luh *layerUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + w.WriteHeader(http.StatusNoContent) } // PutLayerChunk receives a layer chunk during the layer upload process, // possible completing the upload with a checksum and length. func (luh *layerUploadHandler) PutLayerChunk(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } + var finished bool + + // TODO(stevvooe): This is woefully incomplete. Missing stuff: + // + // 1. Extract information from range header, if present. + // 2. Check offset of current layer. + // 3. Emit correct error responses. + + // Read in the chunk + io.Copy(luh.Upload, r.Body) + + if err := luh.maybeCompleteUpload(w, r); err != nil { + if err != errNotReadyToComplete { + w.WriteHeader(http.StatusInternalServerError) + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + } + + if err := luh.layerUploadResponse(w, r); err != nil { + w.WriteHeader(http.StatusInternalServerError) // Error conditions here? + luh.Errors.Push(ErrorCodeUnknown, err) + return + } + + if finished { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusAccepted) + } } // CancelLayerUpload cancels an in-progress upload of a layer. func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.Request) { + if luh.Upload == nil { + w.WriteHeader(http.StatusNotFound) + luh.Errors.Push(ErrorCodeUnknownLayerUpload) + } } + +// layerUploadResponse provides a standard request for uploading layers and +// chunk responses. This sets the correct headers but the response status is +// left to the caller. +func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error { + uploadURL, err := buildLayerUploadURL(luh.router, r, luh.Upload) + if err != nil { + logrus.Infof("error building upload url: %s", err) + return err + } + + w.Header().Set("Location", uploadURL) + w.Header().Set("Content-Length", "0") + w.Header().Set("Range", fmt.Sprintf("0-%d", luh.Upload.Offset())) + + return nil +} + +var errNotReadyToComplete = fmt.Errorf("not ready to complete upload") + +// maybeCompleteUpload tries to complete the upload if the correct parameters +// are available. Returns errNotReadyToComplete if not ready to complete. +func (luh *layerUploadHandler) maybeCompleteUpload(w http.ResponseWriter, r *http.Request) error { + // If we get a digest and length, we can finish the upload. + dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! + sizeStr := r.FormValue("length") + + if dgstStr == "" || sizeStr == "" { + return errNotReadyToComplete + } + + dgst, err := digest.ParseDigest(dgstStr) + if err != nil { + return err + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return err + } + + luh.completeUpload(w, r, size, dgst) + return nil +} + +// completeUpload finishes out the upload with the correct response. +func (luh *layerUploadHandler) completeUpload(w http.ResponseWriter, r *http.Request, size int64, dgst digest.Digest) { + layer, err := luh.Upload.Finish(size, dgst) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + layerURL, err := buildLayerURL(luh.router, r, layer) + if err != nil { + luh.Errors.Push(ErrorCodeUnknown, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Location", layerURL) + w.Header().Set("Content-Length", "0") + w.WriteHeader(http.StatusCreated) +} + +func buildLayerUploadURL(router *mux.Router, r *http.Request, upload storage.LayerUpload) (string, error) { + route := clonedRoute(router, routeNameBlobUploadResume) + + uploadURL, err := route.Schemes(r.URL.Scheme).Host(r.Host). + URL("name", upload.Name(), "uuid", upload.UUID()) + if err != nil { + return "", err + } + + return uploadURL.String(), nil +} diff --git a/storage/filereader.go b/storage/filereader.go new file mode 100644 index 00000000..8f1f5205 --- /dev/null +++ b/storage/filereader.go @@ -0,0 +1,163 @@ +package storage + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/docker/docker-registry/storagedriver" +) + +// remoteFileReader provides a read seeker interface to files stored in +// storagedriver. Used to implement part of layer interface and will be used +// to implement read side of LayerUpload. +type fileReader struct { + driver storagedriver.StorageDriver + + // identifying fields + path string + size int64 // size is the total layer size, must be set. + + // mutable fields + rc io.ReadCloser // remote read closer + brd *bufio.Reader // internal buffered io + offset int64 // offset is the current read offset + err error // terminal error, if set, reader is closed +} + +func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { + // Grab the size of the layer file, ensuring existence. + size, err := driver.CurrentSize(path) + + if err != nil { + return nil, err + } + + return &fileReader{ + driver: driver, + path: path, + size: int64(size), + }, nil +} + +func (fr *fileReader) Read(p []byte) (n int, err error) { + if fr.err != nil { + return 0, fr.err + } + + rd, err := fr.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + fr.offset += int64(n) + + // Simulate io.EOR error if we reach filesize. + if err == nil && fr.offset >= fr.size { + err = io.EOF + } + + return n, err +} + +func (fr *fileReader) Seek(offset int64, whence int) (int64, error) { + if fr.err != nil { + return 0, fr.err + } + + var err error + newOffset := fr.offset + + switch whence { + case os.SEEK_CUR: + newOffset += int64(offset) + case os.SEEK_END: + newOffset = fr.size + int64(offset) + case os.SEEK_SET: + newOffset = int64(offset) + } + + if newOffset < 0 { + err = fmt.Errorf("cannot seek to negative position") + } else if newOffset > fr.size { + err = fmt.Errorf("cannot seek passed end of file") + } else { + if fr.offset != newOffset { + fr.reset() + } + + // No problems, set the offset. + fr.offset = newOffset + } + + return fr.offset, err +} + +// Close the layer. Should be called when the resource is no longer needed. +func (fr *fileReader) Close() error { + if fr.err != nil { + return fr.err + } + + fr.err = ErrLayerClosed + + // close and release reader chain + if fr.rc != nil { + fr.rc.Close() + } + + fr.rc = nil + fr.brd = nil + + return fr.err +} + +// reader prepares the current reader at the lrs offset, ensuring its buffered +// and ready to go. +func (fr *fileReader) reader() (io.Reader, error) { + if fr.err != nil { + return nil, fr.err + } + + if fr.rc != nil { + return fr.brd, nil + } + + // If we don't have a reader, open one up. + rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset)) + + if err != nil { + return nil, err + } + + fr.rc = rc + + if fr.brd == nil { + // TODO(stevvooe): Set an optimal buffer size here. We'll have to + // understand the latency characteristics of the underlying network to + // set this correctly, so we may want to leave it to the driver. For + // out of process drivers, we'll have to optimize this buffer size for + // local communication. + fr.brd = bufio.NewReader(fr.rc) + } else { + fr.brd.Reset(fr.rc) + } + + return fr.brd, nil +} + +// resetReader resets the reader, forcing the read method to open up a new +// connection and rebuild the buffered reader. This should be called when the +// offset and the reader will become out of sync, such as during a seek +// operation. +func (fr *fileReader) reset() { + if fr.err != nil { + return + } + if fr.rc != nil { + fr.rc.Close() + fr.rc = nil + } +} diff --git a/storage/filereader_test.go b/storage/filereader_test.go new file mode 100644 index 00000000..cfc9d215 --- /dev/null +++ b/storage/filereader_test.go @@ -0,0 +1,158 @@ +package storage + +import ( + "bytes" + "crypto/rand" + "io" + mrand "math/rand" + "os" + "testing" + + "github.com/docker/docker-registry/digest" + + "github.com/docker/docker-registry/storagedriver/inmemory" +) + +func TestSimpleRead(t *testing.T) { + content := make([]byte, 1<<20) + n, err := rand.Read(content) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + + if n != len(content) { + t.Fatalf("random read did't fill buffer") + } + + dgst, err := digest.FromReader(bytes.NewReader(content)) + if err != nil { + t.Fatalf("unexpected error digesting random content: %v", err) + } + + driver := inmemory.New() + path := "/random" + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + if err != nil { + t.Fatalf("error allocating file reader: %v", err) + } + + verifier := digest.NewDigestVerifier(dgst) + io.Copy(verifier, fr) + + if !verifier.Verified() { + t.Fatalf("unable to verify read data") + } +} + +func TestFileReaderSeek(t *testing.T) { + driver := inmemory.New() + pattern := "01234567890ab" // prime length block + repititions := 1024 + path := "/patterned" + content := bytes.Repeat([]byte(pattern), repititions) + + if err := driver.PutContent(path, content); err != nil { + t.Fatalf("error putting patterned content: %v", err) + } + + fr, err := newFileReader(driver, path) + + if err != nil { + t.Fatalf("unexpected error creating file reader: %v", err) + } + + // Seek all over the place, in blocks of pattern size and make sure we get + // the right data. + for _, repitition := range mrand.Perm(repititions - 1) { + targetOffset := int64(len(pattern) * repitition) + // Seek to a multiple of pattern size and read pattern size bytes + offset, err := fr.Seek(targetOffset, os.SEEK_SET) + if err != nil { + t.Fatalf("unexpected error seeking: %v", err) + } + + if offset != targetOffset { + t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset) + } + + p := make([]byte, len(pattern)) + + n, err := fr.Read(p) + if err != nil { + t.Fatalf("error reading pattern: %v", err) + } + + if n != len(pattern) { + t.Fatalf("incorrect read length: %d != %d", n, len(pattern)) + } + + if string(p) != pattern { + t.Fatalf("incorrect read content: %q != %q", p, pattern) + } + + // Check offset + current, err := fr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if current != targetOffset+int64(len(pattern)) { + t.Fatalf("unexpected offset after read: %v", err) + } + } + + start, err := fr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatalf("error seeking to start: %v", err) + } + + if start != 0 { + t.Fatalf("expected to seek to start: %v != 0", start) + } + + end, err := fr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatalf("error checking current offset: %v", err) + } + + if end != int64(len(content)) { + t.Fatalf("expected to seek to end: %v != %v", end, len(content)) + } + + // 4. Seek past end and before start, ensure error. + + // seek before start + before, err := fr.Seek(-1, os.SEEK_SET) + if err == nil { + t.Fatalf("error expected, returned offset=%v", before) + } + + after, err := fr.Seek(int64(len(content)+1), os.SEEK_END) + if err == nil { + t.Fatalf("error expected, returned offset=%v", after) + } +} + +// TestLayerReadErrors covers the various error return type for different +// conditions that can arise when reading a layer. +func TestFileReaderErrors(t *testing.T) { + // TODO(stevvooe): We need to cover error return types, driven by the + // errors returned via the HTTP API. For now, here is a incomplete list: + // + // 1. Layer Not Found: returned when layer is not found or access is + // denied. + // 2. Layer Unavailable: returned when link references are unresolved, + // but layer is known to the registry. + // 3. Layer Invalid: This may more split into more errors, but should be + // returned when name or tarsum does not reference a valid error. We + // may also need something to communication layer verification errors + // for the inline tarsum check. + // 4. Timeout: timeouts to backend. Need to better understand these + // failure cases and how the storage driver propagates these errors + // up the stack. +} diff --git a/storage/layer.go b/storage/layer.go index 6c45f401..d2ddfb07 100644 --- a/storage/layer.go +++ b/storage/layer.go @@ -87,4 +87,8 @@ var ( // ErrLayerInvalidLength returned when length check fails. ErrLayerInvalidLength = fmt.Errorf("invalid layer length") + + // ErrLayerClosed returned when an operation is attempted on a closed + // Layer or LayerUpload. + ErrLayerClosed = fmt.Errorf("layer closed") ) diff --git a/storage/layer_test.go b/storage/layer_test.go index 335793d2..03cba9b9 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -241,31 +241,6 @@ func TestSimpleLayerRead(t *testing.T) { } } -func TestLayerReaderSeek(t *testing.T) { - // TODO(stevvooe): Ensure that all relative seeks work as advertised. - // Readers must close and re-open on command. This is important to support - // resumable and concurrent downloads via HTTP range requests. -} - -// TestLayerReadErrors covers the various error return type for different -// conditions that can arise when reading a layer. -func TestLayerReadErrors(t *testing.T) { - // TODO(stevvooe): We need to cover error return types, driven by the - // errors returned via the HTTP API. For now, here is a incomplete list: - // - // 1. Layer Not Found: returned when layer is not found or access is - // denied. - // 2. Layer Unavailable: returned when link references are unresolved, - // but layer is known to the registry. - // 3. Layer Invalid: This may more split into more errors, but should be - // returned when name or tarsum does not reference a valid error. We - // may also need something to communication layer verification errors - // for the inline tarsum check. - // 4. Timeout: timeouts to backend. Need to better understand these - // failure cases and how the storage driver propagates these errors - // up the stack. -} - // writeRandomLayer creates a random layer under name and tarSum using driver // and pathMapper. An io.ReadSeeker with the data is returned, along with the // sha256 hex digest. diff --git a/storage/layerreader.go b/storage/layerreader.go index 396940d0..2cc184fd 100644 --- a/storage/layerreader.go +++ b/storage/layerreader.go @@ -1,10 +1,6 @@ package storage import ( - "bufio" - "fmt" - "io" - "os" "time" "github.com/docker/docker-registry/digest" @@ -13,22 +9,11 @@ import ( // layerReadSeeker implements Layer and provides facilities for reading and // seeking. type layerReader struct { - layerStore *layerStore - rc io.ReadCloser - brd *bufio.Reader + fileReader name string // repo name of this layer digest digest.Digest - path string createdAt time.Time - - // offset is the current read offset - offset int64 - - // size is the total layer size, if available. - size int64 - - closedErr error // terminal error, if set, reader is closed } var _ Layer = &layerReader{} @@ -44,131 +29,3 @@ func (lrs *layerReader) Digest() digest.Digest { func (lrs *layerReader) CreatedAt() time.Time { return lrs.createdAt } - -func (lrs *layerReader) Read(p []byte) (n int, err error) { - if err := lrs.closed(); err != nil { - return 0, err - } - - rd, err := lrs.reader() - if err != nil { - return 0, err - } - - n, err = rd.Read(p) - lrs.offset += int64(n) - - // Simulate io.EOR error if we reach filesize. - if err == nil && lrs.offset >= lrs.size { - err = io.EOF - } - - // TODO(stevvooe): More error checking is required here. If the reader - // times out for some reason, we should reset the reader so we re-open the - // connection. - - return n, err -} - -func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) { - if err := lrs.closed(); err != nil { - return 0, err - } - - var err error - newOffset := lrs.offset - - switch whence { - case os.SEEK_CUR: - newOffset += int64(whence) - case os.SEEK_END: - newOffset = lrs.size + int64(whence) - case os.SEEK_SET: - newOffset = int64(whence) - } - - if newOffset < 0 { - err = fmt.Errorf("cannot seek to negative position") - } else if newOffset >= lrs.size { - err = fmt.Errorf("cannot seek passed end of layer") - } else { - if lrs.offset != newOffset { - lrs.resetReader() - } - - // No problems, set the offset. - lrs.offset = newOffset - } - - return lrs.offset, err -} - -// Close the layer. Should be called when the resource is no longer needed. -func (lrs *layerReader) Close() error { - if lrs.closedErr != nil { - return lrs.closedErr - } - // TODO(sday): Must export this error. - lrs.closedErr = fmt.Errorf("layer closed") - - // close and release reader chain - if lrs.rc != nil { - lrs.rc.Close() - lrs.rc = nil - } - lrs.brd = nil - - return lrs.closedErr -} - -// reader prepares the current reader at the lrs offset, ensuring its buffered -// and ready to go. -func (lrs *layerReader) reader() (io.Reader, error) { - if err := lrs.closed(); err != nil { - return nil, err - } - - if lrs.rc != nil { - return lrs.brd, nil - } - - // If we don't have a reader, open one up. - rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset)) - - if err != nil { - return nil, err - } - - lrs.rc = rc - - if lrs.brd == nil { - // TODO(stevvooe): Set an optimal buffer size here. We'll have to - // understand the latency characteristics of the underlying network to - // set this correctly, so we may want to leave it to the driver. For - // out of process drivers, we'll have to optimize this buffer size for - // local communication. - lrs.brd = bufio.NewReader(lrs.rc) - } else { - lrs.brd.Reset(lrs.rc) - } - - return lrs.brd, nil -} - -// resetReader resets the reader, forcing the read method to open up a new -// connection and rebuild the buffered reader. This should be called when the -// offset and the reader will become out of sync, such as during a seek -// operation. -func (lrs *layerReader) resetReader() { - if err := lrs.closed(); err != nil { - return - } - if lrs.rc != nil { - lrs.rc.Close() - lrs.rc = nil - } -} - -func (lrs *layerReader) closed() error { - return lrs.closedErr -} diff --git a/storage/layerstore.go b/storage/layerstore.go index c9662ffd..6abd50e3 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -57,33 +57,26 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) { return nil, err } - // Grab the size of the layer file, ensuring that it exists, among other - // things. - size, err := ls.driver.CurrentSize(p) - + fr, err := newFileReader(ls.driver, p) if err != nil { - // TODO(stevvooe): Handle blob/path does not exist here. - // TODO(stevvooe): Get a better understanding of the error cases here - // that don't stem from unknown path. - return nil, err + switch err := err.(type) { + case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError: + return nil, ErrLayerUnknown + default: + return nil, err + } } - // Build the layer reader and return to the client. - layer := &layerReader{ - layerStore: ls, - path: p, + return &layerReader{ + fileReader: *fr, name: name, digest: digest, // TODO(stevvooe): Storage backend does not support modification time - // queries yet. Layers "never" change, so just return the zero value. - createdAt: time.Time{}, - - offset: 0, - size: int64(size), - } - - return layer, nil + // queries yet. Layers "never" change, so just return the zero value + // plus a nano-second. + createdAt: (time.Time{}).Add(time.Nanosecond), + }, nil } // Upload begins a layer upload, returning a handle. If the layer upload diff --git a/storage/layerupload.go b/storage/layerupload.go index c07927f1..f134aa19 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -429,6 +429,10 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) return lus, err } + if err := llufs.SaveState(lus); err != nil { + return lus, err + } + return lus, nil } diff --git a/storage/services.go b/storage/services.go index dbe5dc75..afb26d94 100644 --- a/storage/services.go +++ b/storage/services.go @@ -15,7 +15,6 @@ type Services struct { // NewServices creates a new Services object to access docker objects stored // in the underlying driver. func NewServices(driver storagedriver.StorageDriver) *Services { - layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() if err != nil { @@ -40,5 +39,5 @@ func NewServices(driver storagedriver.StorageDriver) *Services { // may be context sensitive in the future. The instance should be used similar // to a request local. func (ss *Services) Layers() LayerService { - return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} }