9c88801a12
Back in the before time, the best practices surrounding usage of Context weren't quite worked out. We defined our own type to make usage easier. As this packaged was used elsewhere, it make it more and more challenging to integrate with the forked `Context` type. Now that it is available in the standard library, we can just use that one directly. To make usage more consistent, we now use `dcontext` when referring to the distribution context package. Signed-off-by: Stephen J Day <stephen.day@docker.com>
146 lines
3.4 KiB
Go
146 lines
3.4 KiB
Go
// +build !noresumabledigest
|
|
|
|
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"strconv"
|
|
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/stevvooe/resumable"
|
|
|
|
// register resumable hashes with import
|
|
_ "github.com/stevvooe/resumable/sha256"
|
|
_ "github.com/stevvooe/resumable/sha512"
|
|
)
|
|
|
|
// resumeDigest attempts to restore the state of the internal hash function
|
|
// by loading the most recent saved hash state equal to the current size of the blob.
|
|
func (bw *blobWriter) resumeDigest(ctx context.Context) error {
|
|
if !bw.resumableDigestEnabled {
|
|
return errResumableDigestNotAvailable
|
|
}
|
|
|
|
h, ok := bw.digester.Hash().(resumable.Hash)
|
|
if !ok {
|
|
return errResumableDigestNotAvailable
|
|
}
|
|
offset := bw.fileWriter.Size()
|
|
if offset == int64(h.Len()) {
|
|
// State of digester is already at the requested offset.
|
|
return nil
|
|
}
|
|
|
|
// List hash states from storage backend.
|
|
var hashStateMatch hashStateEntry
|
|
hashStates, err := bw.getStoredHashStates(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
|
|
}
|
|
|
|
// Find the highest stored hashState with offset equal to
|
|
// the requested offset.
|
|
for _, hashState := range hashStates {
|
|
if hashState.offset == offset {
|
|
hashStateMatch = hashState
|
|
break // Found an exact offset match.
|
|
}
|
|
}
|
|
|
|
if hashStateMatch.offset == 0 {
|
|
// No need to load any state, just reset the hasher.
|
|
h.Reset()
|
|
} else {
|
|
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = h.Restore(storedState); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Mind the gap.
|
|
if gapLen := offset - int64(h.Len()); gapLen > 0 {
|
|
return errResumableDigestNotAvailable
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type hashStateEntry struct {
|
|
offset int64
|
|
path string
|
|
}
|
|
|
|
// getStoredHashStates returns a slice of hashStateEntries for this upload.
|
|
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
|
|
uploadHashStatePathPrefix, err := pathFor(uploadHashStatePathSpec{
|
|
name: bw.blobStore.repository.Named().String(),
|
|
id: bw.id,
|
|
alg: bw.digester.Digest().Algorithm(),
|
|
list: true,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
|
|
if err != nil {
|
|
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
|
|
return nil, err
|
|
}
|
|
// Treat PathNotFoundError as no entries.
|
|
paths = nil
|
|
}
|
|
|
|
hashStateEntries := make([]hashStateEntry, 0, len(paths))
|
|
|
|
for _, p := range paths {
|
|
pathSuffix := path.Base(p)
|
|
// The suffix should be the offset.
|
|
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
|
|
if err != nil {
|
|
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
|
|
}
|
|
|
|
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
|
|
}
|
|
|
|
return hashStateEntries, nil
|
|
}
|
|
|
|
func (bw *blobWriter) storeHashState(ctx context.Context) error {
|
|
if !bw.resumableDigestEnabled {
|
|
return errResumableDigestNotAvailable
|
|
}
|
|
|
|
h, ok := bw.digester.Hash().(resumable.Hash)
|
|
if !ok {
|
|
return errResumableDigestNotAvailable
|
|
}
|
|
|
|
uploadHashStatePath, err := pathFor(uploadHashStatePathSpec{
|
|
name: bw.blobStore.repository.Named().String(),
|
|
id: bw.id,
|
|
alg: bw.digester.Digest().Algorithm(),
|
|
offset: int64(h.Len()),
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hashState, err := h.State()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
|
|
}
|