diff --git a/storage/layer.go b/storage/layer.go index ec5f0f9d..24736c70 100644 --- a/storage/layer.go +++ b/storage/layer.go @@ -24,8 +24,7 @@ type Layer interface { // layers. Digest() digest.Digest - // CreatedAt returns the time this layer was created. Until we implement - // Stat call on storagedriver, this just returns the zero time. + // CreatedAt returns the time this layer was created. CreatedAt() time.Time } @@ -33,26 +32,22 @@ type Layer interface { // Instances can be obtained from the LayerService.Upload and // LayerService.Resume. type LayerUpload interface { - io.WriteCloser - - // UUID returns the identifier for this upload. - UUID() string + io.WriteSeeker + io.Closer // Name of the repository under which the layer will be linked. Name() string - // Offset returns the position of the last byte written to this layer. - Offset() int64 + // UUID returns the identifier for this upload. + UUID() string - // TODO(stevvooe): Consider completely removing the size check from this - // interface. The digest check may be adequate and we are making it - // optional in the HTTP API. + // StartedAt returns the time this layer upload was started. + StartedAt() time.Time // Finish marks the upload as completed, returning a valid handle to the - // uploaded layer. The final size and digest are validated against the - // contents of the uploaded layer. If the size is negative, only the - // digest will be checked. - Finish(size int64, digest digest.Digest) (Layer, error) + // uploaded layer. The digest is validated against the contents of the + // uploaded layer. + Finish(digest digest.Digest) (Layer, error) // Cancel the layer upload process. Cancel() error @@ -84,11 +79,11 @@ func (err ErrUnknownLayer) Error() string { // ErrLayerInvalidDigest returned when tarsum check fails. type ErrLayerInvalidDigest struct { - FSLayer manifest.FSLayer + Digest digest.Digest } func (err ErrLayerInvalidDigest) Error() string { - return fmt.Sprintf("invalid digest for referenced layer: %v", err.FSLayer.BlobSum) + return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest) } // ErrLayerInvalidSize returned when length check fails. diff --git a/storage/layer_test.go b/storage/layer_test.go index ec5b7406..d6f4718a 100644 --- a/storage/layer_test.go +++ b/storage/layer_test.go @@ -26,21 +26,18 @@ func TestSimpleLayerUpload(t *testing.T) { dgst := digest.Digest(tarSumStr) - uploadStore, err := newTemporaryLocalFSLayerUploadStore() if err != nil { t.Fatalf("error allocating upload store: %v", err) } imageName := "foo/bar" - driver := inmemory.New() ls := &layerStore{ - driver: driver, + driver: inmemory.New(), pathMapper: &pathMapper{ root: "/storage/testing", version: storagePathVersion, }, - uploadStore: uploadStore, } h := sha256.New() @@ -58,7 +55,7 @@ func TestSimpleLayerUpload(t *testing.T) { } // Do a resume, get unknown upload - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != ErrLayerUploadUnknown { t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) } @@ -84,26 +81,31 @@ func TestSimpleLayerUpload(t *testing.T) { t.Fatalf("layer data write incomplete") } - if layerUpload.Offset() != nn { - t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn) + offset, err := layerUpload.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("unexpected error seeking layer upload: %v", err) + } + + if offset != nn { + t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn) } layerUpload.Close() // Do a resume, for good fun - layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}) + layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) if err != nil { t.Fatalf("unexpected error resuming upload: %v", err) } sha256Digest := digest.NewDigest("sha256", h) - layer, err := layerUpload.Finish(randomDataSize, dgst) + layer, err := layerUpload.Finish(dgst) if err != nil { t.Fatalf("unexpected error finishing layer upload: %v", err) } // After finishing an upload, it should no longer exist. - if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown { + if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown { t.Fatalf("expected layer upload to be unknown, got %v", err) } diff --git a/storage/layerstore.go b/storage/layerstore.go index f73bef6d..41227cc5 100644 --- a/storage/layerstore.go +++ b/storage/layerstore.go @@ -1,15 +1,17 @@ package storage import ( + "time" + + "code.google.com/p/go-uuid/uuid" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" ) type layerStore struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - uploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) { @@ -66,31 +68,86 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) { // the same two layers. Should it be disallowed? For now, we allow both // parties to proceed and the the first one uploads the layer. - lus, err := ls.uploadStore.New(name) + uuid := uuid.New() + startedAt := time.Now().UTC() + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + // Write a startedat file for this upload + if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // Resume continues an in progress layer upload, returning the current // state of the upload. -func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) { - _, err := ls.uploadStore.GetState(lus.UUID) +func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) { + startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ + name: name, + uuid: uuid, + }) if err != nil { return nil, err } - return ls.newLayerUpload(lus), nil + startedAtBytes, err := ls.driver.GetContent(startedAtPath) + if err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + return nil, ErrLayerUploadUnknown + default: + return nil, err + } + } + + startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) + if err != nil { + return nil, err + } + + path, err := ls.pathMapper.path(uploadDataPathSpec{ + name: name, + uuid: uuid, + }) + + if err != nil { + return nil, err + } + + return ls.newLayerUpload(name, uuid, path, startedAt) } // newLayerUpload allocates a new upload controller with the given state. -func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload { - return &layerUploadController{ - LayerUploadState: lus, - layerStore: ls, - uploadStore: ls.uploadStore, +func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) { + fw, err := newFileWriter(ls.driver, path) + if err != nil { + return nil, err } + + return &layerUploadController{ + layerStore: ls, + name: name, + uuid: uuid, + startedAt: startedAt, + fileWriter: *fw, + }, nil } diff --git a/storage/layerupload.go b/storage/layerupload.go index 3175a09e..b9953b23 100644 --- a/storage/layerupload.go +++ b/storage/layerupload.go @@ -1,229 +1,84 @@ package storage import ( - "fmt" "io" - "io/ioutil" - "os" - "path/filepath" - - "code.google.com/p/go-uuid/uuid" + "path" + "time" + "github.com/Sirupsen/logrus" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest" "github.com/docker/distribution/storagedriver" "github.com/docker/docker/pkg/tarsum" ) -// LayerUploadState captures the state serializable state of the layer upload. -type LayerUploadState struct { - // name is the primary repository under which the layer will be linked. - Name string - - // UUID identifies the upload. - UUID string - - // offset contains the current progress of the upload. - Offset int64 -} - // layerUploadController is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type layerUploadController struct { - LayerUploadState + layerStore *layerStore - layerStore *layerStore - uploadStore layerUploadStore - fp layerFile - err error // terminal error, if set, controller is closed -} + name string + uuid string + startedAt time.Time -// layerFile documents the interface used while writing layer files, similar -// to *os.File. This is separate from layerReader, for now, because we want to -// store uploads on the local file system until we have write-through hashing -// support. They should be combined once this is worked out. -type layerFile interface { - io.WriteSeeker - io.Reader - io.Closer - - // Sync commits the contents of the writer to storage. - Sync() (err error) -} - -// layerUploadStore provides storage for temporary files and upload state of -// layers. This is be used by the LayerService to manage the state of ongoing -// uploads. This interface will definitely change and will most likely end up -// being exported to the app layer. Move the layer.go when it's ready to go. -type layerUploadStore interface { - New(name string) (LayerUploadState, error) - Open(uuid string) (layerFile, error) - GetState(uuid string) (LayerUploadState, error) - // TODO: factor this method back in - // SaveState(lus LayerUploadState) error - DeleteState(uuid string) error + fileWriter } var _ LayerUpload = &layerUploadController{} // Name of the repository under which the layer will be linked. func (luc *layerUploadController) Name() string { - return luc.LayerUploadState.Name + return luc.name } // UUID returns the identifier for this upload. func (luc *layerUploadController) UUID() string { - return luc.LayerUploadState.UUID + return luc.uuid } -// Offset returns the position of the last byte written to this layer. -func (luc *layerUploadController) Offset() int64 { - return luc.LayerUploadState.Offset +func (luc *layerUploadController) StartedAt() time.Time { + return luc.startedAt } // Finish marks the upload as completed, returning a valid handle to the // uploaded layer. The final size and checksum are validated against the // contents of the uploaded layer. The checksum should be provided in the // format :. -func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Layer, error) { - - // This section is going to be pretty ugly now. We will have to read the - // file twice. First, to get the tarsum and checksum. When those are - // available, and validated, we will upload it to the blob store and link - // it into the repository. In the future, we need to use resumable hash - // calculations for tarsum and checksum that can be calculated during the - // upload. This will allow us to cut the data directly into a temporary - // directory in the storage backend. - - fp, err := luc.file() - - if err != nil { - // Cleanup? - return nil, err - } - - digest, err = luc.validateLayer(fp, size, digest) +func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) { + canonical, err := luc.validateLayer(digest) if err != nil { return nil, err } - if nn, err := luc.writeLayer(fp, digest); err != nil { - // Cleanup? - return nil, err - } else if size >= 0 && nn != size { - // TODO(stevvooe): Short write. Will have to delete the location and - // report an error. This error needs to be reported to the client. - return nil, fmt.Errorf("short write writing layer") - } - - // Yes! We have written some layer data. Let's make it visible. Link the - // layer blob into the repository. - if err := luc.linkLayer(digest); err != nil { + if err := luc.moveLayer(canonical); err != nil { + // TODO(stevvooe): Cleanup? return nil, err } - // Ok, the upload has completed and finished. Delete the state. - if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil { - // Can we ignore this error? + // Link the layer blob into the repository. + if err := luc.linkLayer(canonical); err != nil { return nil, err } - return luc.layerStore.Fetch(luc.Name(), digest) + if err := luc.removeResources(); err != nil { + return nil, err + } + + return luc.layerStore.Fetch(luc.Name(), canonical) } // Cancel the layer upload process. func (luc *layerUploadController) Cancel() error { - if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil { + if err := luc.removeResources(); err != nil { return err } - return luc.Close() + luc.Close() + return nil } -func (luc *layerUploadController) Write(p []byte) (int, error) { - wr, err := luc.file() - if err != nil { - return 0, err - } - - n, err := wr.Write(p) - - // Because we expect the reported offset to be consistent with the storage - // state, unfortunately, we need to Sync on every call to write. - if err := wr.Sync(); err != nil { - // Effectively, ignore the write state if the Sync fails. Report that - // no bytes were written and seek back to the starting offset. - offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET) - if seekErr != nil { - // What do we do here? Quite disasterous. - luc.reset() - - return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr) - } - - if offset != luc.Offset() { - return 0, fmt.Errorf("unexpected offset after seek") - } - - return 0, err - } - - luc.LayerUploadState.Offset += int64(n) - - return n, err -} - -func (luc *layerUploadController) Close() error { - if luc.err != nil { - return luc.err - } - - if luc.fp != nil { - luc.err = luc.fp.Close() - } - - return luc.err -} - -func (luc *layerUploadController) file() (layerFile, error) { - if luc.fp != nil { - return luc.fp, nil - } - - fp, err := luc.uploadStore.Open(luc.UUID()) - - if err != nil { - return nil, err - } - - // TODO(stevvooe): We may need a more aggressive check here to ensure that - // the file length is equal to the current offset. We may want to sync the - // offset before return the layer upload to the client so it can be - // validated before proceeding with any writes. - - // Seek to the current layer offset for good measure. - if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil { - return nil, err - } - - luc.fp = fp - - return luc.fp, nil -} - -// reset closes and drops the current writer. -func (luc *layerUploadController) reset() { - if luc.fp != nil { - luc.fp.Close() - luc.fp = nil - } -} - -// validateLayer runs several checks on the layer file to ensure its validity. -// This is currently very expensive and relies on fast io and fast seek on the -// local host. If successful, the latest digest is returned, which should be -// used over the passed in value. -func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst digest.Digest) (digest.Digest, error) { +// validateLayer checks the layer data against the digest, returning an error +// if it does not match. The canonical digest is returned. +func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) { // First, check the incoming tarsum version of the digest. version, err := tarsum.GetVersionFromTarsum(dgst.String()) if err != nil { @@ -239,87 +94,65 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d } digestVerifier := digest.NewDigestVerifier(dgst) - lengthVerifier := digest.NewLengthVerifier(size) - // First, seek to the end of the file, checking the size is as expected. - end, err := fp.Seek(0, os.SEEK_END) + // TODO(stevvooe): Store resumable hash calculations in upload directory + // in driver. Something like a file at path /resumablehash/ + // with the hash state up to that point would be perfect. The hasher would + // then only have to fetch the difference. + + // Read the file from the backend driver and validate it. + fr, err := newFileReader(luc.fileWriter.driver, luc.path) if err != nil { return "", err } - // Only check size if it is greater than - if size >= 0 && end != size { - // Fast path length check. - return "", ErrLayerInvalidSize{Size: size} - } - - // Now seek back to start and take care of the digest. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - return "", err - } - - tr := io.TeeReader(fp, digestVerifier) - - // Only verify the size if a positive size argument has been passed. - if size >= 0 { - tr = io.TeeReader(tr, lengthVerifier) - } + tr := io.TeeReader(fr, digestVerifier) // TODO(stevvooe): This is one of the places we need a Digester write - // sink. Instead, its read driven. This migth be okay. + // sink. Instead, its read driven. This might be okay. // Calculate an updated digest with the latest version. - dgst, err = digest.FromReader(tr) + canonical, err := digest.FromReader(tr) if err != nil { return "", err } - if size >= 0 && !lengthVerifier.Verified() { - return "", ErrLayerInvalidSize{Size: size} - } - if !digestVerifier.Verified() { - return "", ErrLayerInvalidDigest{manifest.FSLayer{BlobSum: dgst}} + return "", ErrLayerInvalidDigest{Digest: dgst} } - return dgst, nil + return canonical, nil } -// writeLayer actually writes the the layer file into its final destination, +// moveLayer moves the data into its final, hash-qualified destination, // identified by dgst. The layer should be validated before commencing the -// write. -func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) { +// move. +func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{ digest: dgst, }) if err != nil { - return 0, err + return err } // Check for existence if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { - // TODO(stevvooe): This check is kind of problematic and very racy. switch err := err.(type) { case storagedriver.PathNotFoundError: break // ensure that it doesn't exist. default: - // TODO(stevvooe): This isn't actually an error: the blob store is - // content addressable and we should just use this to ensure we - // have it written. Although, we do need to verify that the - // content that is there is the correct length. - return 0, err + return err } + } else { + // If the path exists, we can assume that the content has already + // been uploaded, since the blob storage is content-addressable. + // While it may be corrupted, detection of such corruption belongs + // elsewhere. + return nil } - // Seek our local layer file back now. - if _, err := fp.Seek(0, os.SEEK_SET); err != nil { - // Cleanup? - return 0, err - } - - // Okay: we can write the file to the blob store. - return luc.layerStore.driver.WriteStream(blobPath, 0, fp) + return luc.driver.Move(luc.path, blobPath) } // linkLayer links a valid, written layer blob into the registry under the @@ -337,85 +170,35 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error { return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)) } -// localFSLayerUploadStore implements a local layerUploadStore. There are some -// complexities around hashsums that make round tripping to the storage -// backend problematic, so we'll store and read locally for now. By GO-beta, -// this should be fully implemented on top of the backend storagedriver. -// -// For now, the directory layout is as follows: -// -// //registry-layer-upload/ -// / -// -> state.json -// -> data -// -// Each upload, identified by uuid, has its own directory with a state file -// and a data file. The state file has a json representation of the current -// state. The data file is the in-progress upload data. -type localFSLayerUploadStore struct { - root string -} - -func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) { - path, err := ioutil.TempDir("", "registry-layer-upload") +// removeResources should clean up all resources associated with the upload +// instance. An error will be returned if the clean up cannot proceed. If the +// resources are already not present, no error will be returned. +func (luc *layerUploadController) removeResources() error { + dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{ + name: luc.name, + uuid: luc.uuid, + }) if err != nil { - return nil, err - } - - return &localFSLayerUploadStore{ - root: path, - }, nil -} - -func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error) { - lus := LayerUploadState{ - Name: name, - UUID: uuid.New(), - } - - if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil { - return lus, err - } - - return lus, nil -} - -func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) { - fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644) - - if err != nil { - return nil, err - } - - return fp, nil -} - -func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) { - var lus LayerUploadState - - if _, err := os.Stat(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return lus, ErrLayerUploadUnknown - } - - return lus, err - } - return lus, nil -} - -func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error { - if err := os.RemoveAll(llufs.path(uuid, "")); err != nil { - if os.IsNotExist(err) { - return ErrLayerUploadUnknown - } - return err } + // Resolve and delete the containing directory, which should include any + // upload related files. + dirPath := path.Dir(dataPath) + + if err := luc.driver.Delete(dirPath); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + break // already gone! + default: + // This should be uncommon enough such that returning an error + // should be okay. At this point, the upload should be mostly + // complete, but perhaps the backend became unaccessible. + logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err) + return err + } + } + return nil } - -func (llufs *localFSLayerUploadStore) path(uuid, file string) string { - return filepath.Join(llufs.root, uuid, file) -} diff --git a/storage/manifeststore_test.go b/storage/manifeststore_test.go index 991028e5..a6cca962 100644 --- a/storage/manifeststore_test.go +++ b/storage/manifeststore_test.go @@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) { panic("not implemented") } -func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) { +func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) { panic("not implemented") } diff --git a/storage/services.go b/storage/services.go index 5507faeb..97edca3f 100644 --- a/storage/services.go +++ b/storage/services.go @@ -9,28 +9,18 @@ import ( // Services provides various services with application-level operations for // use across backend storage drivers. type Services struct { - driver storagedriver.StorageDriver - pathMapper *pathMapper - layerUploadStore layerUploadStore + driver storagedriver.StorageDriver + pathMapper *pathMapper } // NewServices creates a new Services object to access docker objects stored // in the underlying driver. func NewServices(driver storagedriver.StorageDriver) *Services { - layerUploadStore, err := newTemporaryLocalFSLayerUploadStore() - - if err != nil { - // TODO(stevvooe): This failure needs to be understood in the context - // of the lifecycle of the services object, which is uncertain at this - // point. - panic("unable to allocate layerUploadStore: " + err.Error()) - } return &Services{ driver: driver, // TODO(sday): This should be configurable. - pathMapper: defaultPathMapper, - layerUploadStore: layerUploadStore, + pathMapper: defaultPathMapper, } } @@ -38,7 +28,7 @@ func NewServices(driver storagedriver.StorageDriver) *Services { // may be context sensitive in the future. The instance should be used similar // to a request local. func (ss *Services) Layers() LayerService { - return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore} + return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper} } // Manifests returns an instance of ManifestService. Instantiation is cheap and @@ -78,7 +68,8 @@ type LayerService interface { // returning a handle. Upload(name string) (LayerUpload, error) - // Resume continues an in progress layer upload, returning the current - // state of the upload. - Resume(layerUploadState LayerUploadState) (LayerUpload, error) + // Resume continues an in progress layer upload, returning a handle to the + // upload. The caller should seek to the latest desired upload location + // before proceeding. + Resume(name, uuid string) (LayerUpload, error) }