Merge pull request #1807 from docker/jchorl-master

fixed s3 Delete bug due to read-after-delete inconsistency
This commit is contained in:
Richard Scothern 2016-06-28 15:09:35 -07:00 committed by GitHub
commit 1f1d042f55
2 changed files with 66 additions and 16 deletions

View File

@ -561,45 +561,62 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
return d.Delete(ctx, sourcePath)
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
// We must be careful since S3 does not guarantee read after delete consistency
func (d *driver) Delete(ctx context.Context, path string) error {
resp, err := d.S3.ListObjects(&s3.ListObjectsInput{
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
listObjectsInput := &s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
})
}
for {
// list all the objects
resp, err := d.S3.ListObjects(listObjectsInput)
// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListObjects
if err != nil || len(resp.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
for len(resp.Contents) > 0 {
for _, key := range resp.Contents {
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: key.Key,
})
}
// resp.Contents must have at least one element or we would have returned not found
listObjectsInput.Marker = resp.Contents[len(resp.Contents)-1].Key
// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}
// need to chunk objects into groups of 1000 per s3 restrictions
total := len(s3Objects)
for i := 0; i < total; i += 1000 {
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Objects: s3Objects[i:min(i+1000, total)],
Quiet: aws.Bool(false),
},
})
if err != nil {
return nil
}
resp, err = d.S3.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
})
if err != nil {
return err
}
}
return nil
}

View File

@ -203,3 +203,36 @@ func TestStorageClass(t *testing.T) {
}
}
func TestOverThousandBlobs(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}
rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(rootDir)
standardDriver, err := s3DriverConstructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
}
ctx := context.Background()
for i := 0; i < 1005; i++ {
filename := "/thousandfiletest/file" + strconv.Itoa(i)
contents := []byte("contents")
err = standardDriver.PutContent(ctx, filename, contents)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
}
// cant actually verify deletion because read-after-delete is inconsistent, but can ensure no errors
err = standardDriver.Delete(ctx, "/thousandfiletest")
if err != nil {
t.Fatalf("unexpected error deleting thousand files: %v", err)
}
}