distribution/registry/storage/blob_test.go
Sebastiaan van Stijn 1d33874951
go.mod: change imports to github.com/distribution/distribution/v3
Go 1.13 and up enforce import paths to be versioned if a project
contains a go.mod and has released v2 or up.

The current v2.x branches (and releases) do not yet have a go.mod,
and therefore are still allowed to be imported with a non-versioned
import path (go modules add a `+incompatible` annotation in that case).

However, now that this project has a `go.mod` file, incompatible
import paths will not be accepted by go modules, and attempting
to use code from this repository will fail.

This patch uses `v3` for the import-paths (not `v2`), because changing
import paths itself is a breaking change, which means that  the
next release should increment the "major" version to comply with
SemVer (as go modules dictate).

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2021-02-08 18:30:46 +01:00

614 lines
17 KiB
Go

package storage
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"path"
"reflect"
"testing"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/reference"
"github.com/distribution/distribution/v3/registry/storage/cache/memory"
"github.com/distribution/distribution/v3/registry/storage/driver/testdriver"
"github.com/distribution/distribution/v3/testutil"
"github.com/opencontainers/go-digest"
)
// TestWriteSeek tests that the current file size can be
// obtained using Seek
func TestWriteSeek(t *testing.T) {
ctx := context.Background()
imageName, _ := reference.WithName("foo/bar")
driver := testdriver.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs := repository.Blobs(ctx)
blobUpload, err := bs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
contents := []byte{1, 2, 3}
blobUpload.Write(contents)
blobUpload.Close()
offset := blobUpload.Size()
if offset != int64(len(contents)) {
t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents))
}
}
// TestSimpleBlobUpload covers the blob upload process, exercising common
// error paths that might be seen during an upload.
func TestSimpleBlobUpload(t *testing.T) {
randomDataReader, dgst, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
ctx := context.Background()
imageName, _ := reference.WithName("foo/bar")
driver := testdriver.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs := repository.Blobs(ctx)
h := sha256.New()
rd := io.TeeReader(randomDataReader, h)
blobUpload, err := bs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Cancel the upload then restart it
if err := blobUpload.Cancel(ctx); err != nil {
t.Fatalf("unexpected error during upload cancellation: %v", err)
}
// get the enclosing directory
uploadPath := path.Dir(blobUpload.(*blobWriter).path)
// ensure state was cleaned up
_, err = driver.List(ctx, uploadPath)
if err == nil {
t.Fatal("files in upload path after cleanup")
}
// Do a resume, get unknown upload
_, err = bs.Resume(ctx, blobUpload.ID())
if err != distribution.ErrBlobUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unknown: %v", err)
}
// Restart!
blobUpload, err = bs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
nn, err := io.Copy(blobUpload, rd)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
if nn != randomDataSize {
t.Fatalf("layer data write incomplete")
}
blobUpload.Close()
offset := blobUpload.Size()
if offset != nn {
t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn)
}
// Do a resume, for good fun
blobUpload, err = bs.Resume(ctx, blobUpload.ID())
if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err)
}
sha256Digest := digest.NewDigest("sha256", h)
desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// ensure state was cleaned up
uploadPath = path.Dir(blobUpload.(*blobWriter).path)
_, err = driver.List(ctx, uploadPath)
if err == nil {
t.Fatal("files in upload path after commit")
}
// After finishing an upload, it should no longer exist.
if _, err := bs.Resume(ctx, blobUpload.ID()); err != distribution.ErrBlobUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)
}
// Test for existence.
statDesc, err := bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs)
}
if !reflect.DeepEqual(statDesc, desc) {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
rc, err := bs.Open(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error opening blob for read: %v", err)
}
defer rc.Close()
h.Reset()
nn, err = io.Copy(h, rc)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if digest.NewDigest("sha256", h) != sha256Digest {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest)
}
// Delete a blob
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err := bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
_, err = bs.Open(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected success opening deleted blob for read")
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type getting deleted manifest: %#v", err)
}
// Re-upload the blob
randomBlob, err := ioutil.ReadAll(randomDataReader)
if err != nil {
t.Fatalf("Error reading all of blob %s", err.Error())
}
expectedDigest := digest.FromBytes(randomBlob)
simpleUpload(t, bs, randomBlob, expectedDigest)
d, err = bs.Stat(ctx, expectedDigest)
if err != nil {
t.Errorf("unexpected error stat-ing blob")
}
if d.Digest != expectedDigest {
t.Errorf("Mismatching digest with restored blob")
}
_, err = bs.Open(ctx, expectedDigest)
if err != nil {
t.Errorf("Unexpected error opening blob")
}
// Reuse state to test delete with a delete-disabled registry
registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err = registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs = repository.Blobs(ctx)
err = bs.Delete(ctx, desc.Digest)
if err == nil {
t.Errorf("Unexpected success deleting while disabled")
}
}
// TestSimpleBlobRead just creates a simple blob file and ensures that basic
// open, read, seek, read works. More specific edge cases should be covered in
// other tests.
func TestSimpleBlobRead(t *testing.T) {
ctx := context.Background()
imageName, _ := reference.WithName("foo/bar")
driver := testdriver.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs := repository.Blobs(ctx)
randomLayerReader, dgst, err := testutil.CreateRandomTarFile() // TODO(stevvooe): Consider using just a random string.
if err != nil {
t.Fatalf("error creating random data: %v", err)
}
// Test for existence.
desc, err := bs.Stat(ctx, dgst)
if err != distribution.ErrBlobUnknown {
t.Fatalf("expected not found error when testing for existence: %v", err)
}
_, err = bs.Open(ctx, dgst)
if err != distribution.ErrBlobUnknown {
t.Fatalf("expected not found error when opening non-existent blob: %v", err)
}
randomLayerSize, err := seekerSize(randomLayerReader)
if err != nil {
t.Fatalf("error getting seeker size for random layer: %v", err)
}
descBefore := distribution.Descriptor{Digest: dgst, MediaType: "application/octet-stream", Size: randomLayerSize}
t.Logf("desc: %v", descBefore)
desc, err = addBlob(ctx, bs, descBefore, randomLayerReader)
if err != nil {
t.Fatalf("error adding blob to blobservice: %v", err)
}
if desc.Size != randomLayerSize {
t.Fatalf("committed blob has incorrect length: %v != %v", desc.Size, randomLayerSize)
}
rc, err := bs.Open(ctx, desc.Digest) // note that we are opening with original digest.
if err != nil {
t.Fatalf("error opening blob with %v: %v", dgst, err)
}
defer rc.Close()
// Now check the sha digest and ensure its the same
h := sha256.New()
nn, err := io.Copy(h, rc)
if err != nil {
t.Fatalf("unexpected error copying to hash: %v", err)
}
if nn != randomLayerSize {
t.Fatalf("stored incorrect number of bytes in blob: %d != %d", nn, randomLayerSize)
}
sha256Digest := digest.NewDigest("sha256", h)
if sha256Digest != desc.Digest {
t.Fatalf("fetched digest does not match: %q != %q", sha256Digest, desc.Digest)
}
// Now seek back the blob, read the whole thing and check against randomLayerData
offset, err := rc.Seek(0, io.SeekStart)
if err != nil {
t.Fatalf("error seeking blob: %v", err)
}
if offset != 0 {
t.Fatalf("seek failed: expected 0 offset, got %d", offset)
}
p, err := ioutil.ReadAll(rc)
if err != nil {
t.Fatalf("error reading all of blob: %v", err)
}
if len(p) != int(randomLayerSize) {
t.Fatalf("blob data read has different length: %v != %v", len(p), randomLayerSize)
}
// Reset the randomLayerReader and read back the buffer
_, err = randomLayerReader.Seek(0, io.SeekStart)
if err != nil {
t.Fatalf("error resetting layer reader: %v", err)
}
randomLayerData, err := ioutil.ReadAll(randomLayerReader)
if err != nil {
t.Fatalf("random layer read failed: %v", err)
}
if !bytes.Equal(p, randomLayerData) {
t.Fatalf("layer data not equal")
}
}
// TestBlobMount covers the blob mount process, exercising common
// error paths that might be seen during a mount.
func TestBlobMount(t *testing.T) {
randomDataReader, dgst, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
ctx := context.Background()
imageName, _ := reference.WithName("foo/bar")
sourceImageName, _ := reference.WithName("foo/source")
driver := testdriver.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sourceRepository, err := registry.Repository(ctx, sourceImageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sbs := sourceRepository.Blobs(ctx)
blobUpload, err := sbs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
_, err = io.Copy(blobUpload, randomDataReader)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// Test for existence.
statDesc, err := sbs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs)
}
if !reflect.DeepEqual(statDesc, desc) {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
bs := repository.Blobs(ctx)
// Test destination for existence.
_, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
}
canonicalRef, err := reference.WithDigest(sourceRepository.Named(), desc.Digest)
if err != nil {
t.Fatal(err)
}
bw, err := bs.Create(ctx, WithMountFrom(canonicalRef))
if bw != nil {
t.Fatal("unexpected blobwriter returned from Create call, should mount instead")
}
ebm, ok := err.(distribution.ErrBlobMounted)
if !ok {
t.Fatalf("unexpected error mounting layer: %v", err)
}
if !reflect.DeepEqual(ebm.Descriptor, desc) {
t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc)
}
// Test for existence.
statDesc, err = bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs)
}
if !reflect.DeepEqual(statDesc, desc) {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
rc, err := bs.Open(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error opening blob for read: %v", err)
}
defer rc.Close()
h := sha256.New()
nn, err := io.Copy(h, rc)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if digest.NewDigest("sha256", h) != dgst {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst)
}
// Delete the blob from the source repo
err = sbs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
_, err = bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error stating blob deleted from source repository: %v", err)
}
d, err := sbs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
// Delete the blob from the dest repo
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
}
// TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()
imageName, _ := reference.WithName("foo/bar")
driver := testdriver.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
bs := repository.Blobs(ctx)
simpleUpload(t, bs, []byte{}, digestSha256Empty)
}
func simpleUpload(t *testing.T, bs distribution.BlobIngester, blob []byte, expectedDigest digest.Digest) {
ctx := context.Background()
wr, err := bs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting upload: %v", err)
}
nn, err := io.Copy(wr, bytes.NewReader(blob))
if err != nil {
t.Fatalf("error copying into blob writer: %v", err)
}
if nn != 0 {
t.Fatalf("unexpected number of bytes copied: %v > 0", nn)
}
dgst, err := digest.FromReader(bytes.NewReader(blob))
if err != nil {
t.Fatalf("error getting digest: %v", err)
}
if dgst != expectedDigest {
// sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, expectedDigest)
}
desc, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("unexpected error committing write: %v", err)
}
if desc.Digest != dgst {
t.Fatalf("unexpected digest: %v != %v", desc.Digest, dgst)
}
}
// seekerSize seeks to the end of seeker, checks the size and returns it to
// the original state, returning the size. The state of the seeker should be
// treated as unknown if an error is returned.
func seekerSize(seeker io.ReadSeeker) (int64, error) {
current, err := seeker.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
end, err := seeker.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
resumed, err := seeker.Seek(current, io.SeekStart)
if err != nil {
return 0, err
}
if resumed != current {
return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location")
}
return end, nil
}
// addBlob simply consumes the reader and inserts into the blob service,
// returning a descriptor on success.
func addBlob(ctx context.Context, bs distribution.BlobIngester, desc distribution.Descriptor, rd io.Reader) (distribution.Descriptor, error) {
wr, err := bs.Create(ctx)
if err != nil {
return distribution.Descriptor{}, err
}
defer wr.Cancel(ctx)
if nn, err := io.Copy(wr, rd); err != nil {
return distribution.Descriptor{}, err
} else if nn != desc.Size {
return distribution.Descriptor{}, fmt.Errorf("incorrect number of bytes copied: %v != %v", nn, desc.Size)
}
return wr.Commit(ctx, desc)
}