Merge pull request #3479 from CollinShoop/updated-s3-delete-to-handle-edge-cases
Updated s3 delete to no longer noop under a rare edge case
This commit is contained in:
commit
0256de4688
@ -806,7 +806,21 @@ func min(a, b int) int {
|
|||||||
// We must be careful since S3 does not guarantee read after delete consistency
|
// We must be careful since S3 does not guarantee read after delete consistency
|
||||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
|
||||||
s3Path := d.s3Path(path)
|
|
||||||
|
// manually add the given path if it's a file
|
||||||
|
stat, err := d.Stat(ctx, path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if stat != nil && !stat.IsDir() {
|
||||||
|
path := d.s3Path(path)
|
||||||
|
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
||||||
|
Key: &path,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// list objects under the given path as a subpath (suffix with slash "/")
|
||||||
|
s3Path := d.s3Path(path) + "/"
|
||||||
listObjectsInput := &s3.ListObjectsInput{
|
listObjectsInput := &s3.ListObjectsInput{
|
||||||
Bucket: aws.String(d.Bucket),
|
Bucket: aws.String(d.Bucket),
|
||||||
Prefix: aws.String(s3Path),
|
Prefix: aws.String(s3Path),
|
||||||
@ -820,14 +834,10 @@ ListLoop:
|
|||||||
// if there were no more results to return after the first call, resp.IsTruncated would have been false
|
// if there were no more results to return after the first call, resp.IsTruncated would have been false
|
||||||
// and the loop would be exited without recalling ListObjects
|
// and the loop would be exited without recalling ListObjects
|
||||||
if err != nil || len(resp.Contents) == 0 {
|
if err != nil || len(resp.Contents) == 0 {
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
break ListLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, key := range resp.Contents {
|
for _, key := range resp.Contents {
|
||||||
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
|
|
||||||
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
|
|
||||||
break ListLoop
|
|
||||||
}
|
|
||||||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
|
||||||
Key: key.Key,
|
Key: key.Key,
|
||||||
})
|
})
|
||||||
@ -843,8 +853,12 @@ ListLoop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to chunk objects into groups of 1000 per s3 restrictions
|
|
||||||
total := len(s3Objects)
|
total := len(s3Objects)
|
||||||
|
if total == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
|
// need to chunk objects into groups of 1000 per s3 restrictions
|
||||||
for i := 0; i < total; i += 1000 {
|
for i := 0; i < total; i += 1000 {
|
||||||
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
|
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
|
||||||
Bucket: aws.String(d.Bucket),
|
Bucket: aws.String(d.Bucket),
|
||||||
|
@ -2,11 +2,13 @@ package s3
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gopkg.in/check.v1"
|
"gopkg.in/check.v1"
|
||||||
@ -271,6 +273,229 @@ func TestStorageClass(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDelete(t *testing.T) {
|
||||||
|
if skipS3() != "" {
|
||||||
|
t.Skip(skipS3())
|
||||||
|
}
|
||||||
|
|
||||||
|
rootDir, err := ioutil.TempDir("", "driver-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.Remove(rootDir)
|
||||||
|
|
||||||
|
driver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type errFn func(error) bool
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
delete string
|
||||||
|
expected []string
|
||||||
|
// error validation function
|
||||||
|
err errFn
|
||||||
|
}
|
||||||
|
|
||||||
|
errPathNotFound := func(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
errInvalidPath := func(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch err.(type) {
|
||||||
|
case storagedriver.InvalidPathError:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var objs = []string{
|
||||||
|
"/file1",
|
||||||
|
"/file1-2",
|
||||||
|
"/file1/2",
|
||||||
|
"/folder1/file1",
|
||||||
|
"/folder2/file1",
|
||||||
|
"/folder3/file1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
"/folder4/file1",
|
||||||
|
"/folder1-v2/file1",
|
||||||
|
"/folder1-v2/subfolder1/file1",
|
||||||
|
}
|
||||||
|
|
||||||
|
tcs := []testCase{
|
||||||
|
{
|
||||||
|
// special case where a given path is a file and has subpaths
|
||||||
|
name: "delete file1",
|
||||||
|
delete: "/file1",
|
||||||
|
expected: []string{
|
||||||
|
"/file1",
|
||||||
|
"/file1/2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete folder1",
|
||||||
|
delete: "/folder1",
|
||||||
|
expected: []string{
|
||||||
|
"/folder1/file1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete folder2",
|
||||||
|
delete: "/folder2",
|
||||||
|
expected: []string{
|
||||||
|
"/folder2/file1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete folder3",
|
||||||
|
delete: "/folder3",
|
||||||
|
expected: []string{
|
||||||
|
"/folder3/file1",
|
||||||
|
"/folder3/subfolder1/subfolder1/file1",
|
||||||
|
"/folder3/subfolder2/subfolder1/file1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete path that doesn't exist",
|
||||||
|
delete: "/path/does/not/exist",
|
||||||
|
expected: []string{},
|
||||||
|
err: errPathNotFound,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete path invalid: trailing slash",
|
||||||
|
delete: "/path/is/invalid/",
|
||||||
|
expected: []string{},
|
||||||
|
err: errInvalidPath,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delete path invalid: trailing special character",
|
||||||
|
delete: "/path/is/invalid*",
|
||||||
|
expected: []string{},
|
||||||
|
err: errInvalidPath,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// objects to skip auto-created test case
|
||||||
|
var skipCase = map[string]bool{
|
||||||
|
// special case where deleting "/file1" also deletes "/file1/2" is tested explicitly
|
||||||
|
"/file1": true,
|
||||||
|
}
|
||||||
|
// create a test case for each file
|
||||||
|
for _, path := range objs {
|
||||||
|
if skipCase[path] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tcs = append(tcs, testCase{
|
||||||
|
name: fmt.Sprintf("delete path:'%s'", path),
|
||||||
|
delete: path,
|
||||||
|
expected: []string{path},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
init := func() []string {
|
||||||
|
// init file structure matching objs
|
||||||
|
var created []string
|
||||||
|
for _, path := range objs {
|
||||||
|
err := driver.PutContent(context.Background(), path, []byte("content "+path))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("unable to init file %s: %s\n", path, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
created = append(created, path)
|
||||||
|
}
|
||||||
|
return created
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup := func(objs []string) {
|
||||||
|
var lastErr error
|
||||||
|
for _, path := range objs {
|
||||||
|
err := driver.Delete(context.Background(), path)
|
||||||
|
if err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastErr != nil {
|
||||||
|
t.Fatalf("cleanup failed: %s", lastErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer cleanup(objs)
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
objs := init()
|
||||||
|
|
||||||
|
err := driver.Delete(context.Background(), tc.delete)
|
||||||
|
|
||||||
|
if tc.err != nil {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error")
|
||||||
|
}
|
||||||
|
if !tc.err(err) {
|
||||||
|
t.Fatalf("error does not match expected: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if tc.err == nil && err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var issues []string
|
||||||
|
|
||||||
|
// validate all files expected to be deleted are deleted
|
||||||
|
// and all files not marked for deletion still remain
|
||||||
|
expected := tc.expected
|
||||||
|
isExpected := func(path string) bool {
|
||||||
|
for _, epath := range expected {
|
||||||
|
if epath == path {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, path := range objs {
|
||||||
|
stat, err := driver.Stat(context.Background(), path)
|
||||||
|
if err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
if !isExpected(path) {
|
||||||
|
issues = append(issues, fmt.Sprintf("unexpected path was deleted: %s", path))
|
||||||
|
}
|
||||||
|
// path was deleted & was supposed to be
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Fatalf("stat: %s", err)
|
||||||
|
}
|
||||||
|
if stat.IsDir() {
|
||||||
|
// for special cases where an object path has subpaths (eg /file1)
|
||||||
|
// once /file1 is deleted it's now a directory according to stat
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if isExpected(path) {
|
||||||
|
issues = append(issues, fmt.Sprintf("expected path was not deleted: %s", path))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(issues) > 0 {
|
||||||
|
t.Fatalf(strings.Join(issues, "; \n\t"))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOverThousandBlobs(t *testing.T) {
|
func TestOverThousandBlobs(t *testing.T) {
|
||||||
if skipS3() != "" {
|
if skipS3() != "" {
|
||||||
t.Skip(skipS3())
|
t.Skip(skipS3())
|
||||||
|
Loading…
Reference in New Issue
Block a user