From 11ed0515d057c18781c7720bc04723fff60b734e Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Fri, 19 Dec 2014 19:20:07 +0200 Subject: [PATCH] Implements zero fill behaviour for large offset in WriteStream This requires a very intricate WriteStream test, which will be in the next commit. --- storagedriver/s3/s3.go | 258 +++++++++++++++++++++---- storagedriver/testsuites/testsuites.go | 10 +- 2 files changed, 225 insertions(+), 43 deletions(-) diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index b4811885..7a0f92cd 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -192,6 +192,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total } partNumber := 1 + bytesRead := 0 parts := []s3.Part{} var part s3.Part @@ -201,6 +202,7 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total } buf := make([]byte, chunkSize) + zeroBuf := make([]byte, chunkSize) // We never want to leave a dangling multipart upload, our only consistent state is // when there is a whole object at path. This is in order to remain consistent with @@ -211,64 +213,240 @@ func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (total // made prior to the machine crashing. defer func() { if len(parts) > 0 { - err = multi.Complete(parts) - if err != nil { - multi.Abort() + if multi == nil { + // Parts should be empty if the multi is not initialized + panic("Unreachable") + } else { + if multi.Complete(parts) != nil { + multi.Abort() + } } } }() + // Fills from 0 to total from current + fromSmallCurrent := func(total int64) error { + current, err := d.ReadStream(path, 0) + if err != nil { + return err + } + + bytesRead = 0 + for int64(bytesRead) < total { + //The loop should very rarely enter a second iteration + nn, err := io.ReadFull(current, buf[bytesRead:total]) + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + return err + } + + bytesRead += nn + } + return nil + } + + // Fills from parameter to chunkSize from reader + fromReader := func(from int64) error { + bytesRead = 0 + for int64(bytesRead) < chunkSize { + nn, err := io.ReadFull(reader, buf[from+int64(bytesRead):]) + totalRead += int64(nn) + bytesRead += nn + + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + return err + } + } + + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) + if err != nil { + return err + } + + return nil + } + if offset > 0 { resp, err := d.Bucket.Head(d.s3Path(path), nil) if err != nil { - return 0, err - } - if resp.ContentLength < offset { - return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } - - if resp.ContentLength < chunkSize { - // If everything written so far is less than the minimum part size of 5MB, we need - // to fill out the first part up to that minimum. - current, err := d.ReadStream(path, 0) - if err != nil { + if s3Err, ok := err.(*s3.Error); !ok || s3Err.Code != "NoSuchKey" { return 0, err } + } - bytesRead, err := io.ReadFull(current, buf[0:offset]) - if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { - return 0, err - } else if int64(bytesRead) != offset { - //TODO Maybe a different error? I don't even think this case is reachable... - return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + currentLength := int64(0) + if err == nil { + currentLength = resp.ContentLength + } + + if currentLength >= offset { + if offset < chunkSize { + // chunkSize > currentLength >= offset + if err = fromSmallCurrent(offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset); err != nil { + return totalRead, err + } + } else { + // currentLength >= offset >= chunkSize + _, part, err = multi.PutPartCopy(partNumber, + s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)}, + d.Bucket.Name+"/"+d.s3Path(path)) + if err != nil { + return 0, err + } } - bytesRead, err = io.ReadFull(reader, buf[offset:]) - totalRead += int64(bytesRead) + parts = append(parts, part) + partNumber++ - if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { - return totalRead, err - } - - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+offset])) - if err != nil { - return totalRead, err + if totalRead+offset < chunkSize { + return totalRead, nil } } else { - fmt.Println("About to PutPartCopy") - // If the file that we already have is larger than 5MB, then we make it the first part - // of the new multipart upload. - _, part, err = multi.PutPartCopy(partNumber, s3.CopyOptions{}, d.Bucket.Name+"/"+d.s3Path(path)) - if err != nil { - return 0, err + // Fills between parameters with 0s but only when to - from <= chunkSize + fromZeroFillSmall := func(from, to int64) error { + bytesRead = 0 + for from+int64(bytesRead) < to { + nn, err := io.ReadFull(bytes.NewReader(zeroBuf), buf[from+int64(bytesRead):to]) + bytesRead += nn + + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + return err + } + } + + return nil } - } - parts = append(parts, part) - partNumber++ + // Fills between parameters with 0s, making new parts + fromZeroFillLarge := func(from, to int64) error { + bytesRead64 := int64(0) + for to-(from+bytesRead64) >= chunkSize { + part, err := multi.PutPart(int(partNumber), bytes.NewReader(zeroBuf)) + if err != nil { + return err + } + bytesRead64 += chunkSize + + parts = append(parts, part) + partNumber++ + } + + bytesRead = 0 + for from+bytesRead64+int64(bytesRead) < to { + nn, err := io.ReadFull(bytes.NewReader(zeroBuf), buf[0+bytesRead:(to-from)%chunkSize]) + bytesRead64 += int64(nn) + + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + return err + } + } + + return nil + } + + // currentLength < offset + if currentLength < chunkSize { + if offset < chunkSize { + // chunkSize > offset > currentLength + if err = fromSmallCurrent(currentLength); err != nil { + return totalRead, err + } + + if err = fromZeroFillSmall(currentLength, offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset); err != nil { + return totalRead, err + } + + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) + if err != nil { + return totalRead, err + } + + parts = append(parts, part) + partNumber++ + + if totalRead+offset < chunkSize { + return totalRead, nil + } + } else { + // offset >= chunkSize > currentLength + if err = fromSmallCurrent(currentLength); err != nil { + return totalRead, err + } + + if err = fromZeroFillSmall(currentLength, chunkSize); err != nil { + return totalRead, err + } + + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) + if err != nil { + return totalRead, err + } + + parts = append(parts, part) + partNumber++ + + //Zero fill from chunkSize up to offset, then some reader + if err = fromZeroFillLarge(chunkSize, offset); err != nil { + return totalRead, err + } + + if err = fromReader(offset % chunkSize); err != nil { + return totalRead, err + } + + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) + if err != nil { + return totalRead, err + } + + parts = append(parts, part) + partNumber++ + + if totalRead+(offset%chunkSize) < chunkSize { + return totalRead, nil + } + } + } else { + // offset > currentLength >= chunkSize + _, part, err = multi.PutPartCopy(partNumber, + s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(currentLength-1, 10)}, + d.Bucket.Name+"/"+d.s3Path(path)) + if err != nil { + return 0, err + } + + parts = append(parts, part) + partNumber++ + + //Zero fill from currentLength up to offset, then some reader + if err = fromZeroFillLarge(currentLength, offset); err != nil { + return totalRead, err + } + + if err = fromReader((offset - currentLength) % chunkSize); err != nil { + return totalRead, err + } + + part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) + if err != nil { + return totalRead, err + } + + parts = append(parts, part) + partNumber++ + + if totalRead+((offset-currentLength)%chunkSize) < chunkSize { + return totalRead, nil + } + } - if totalRead+offset < chunkSize { - return totalRead, nil } } diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index be8ede6c..1a70362d 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -916,9 +916,13 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf.Sync() tf.Seek(0, os.SEEK_SET) - nn, err := suite.StorageDriver.WriteStream(filename, 0, tf) - c.Assert(err, check.IsNil) - c.Assert(nn, check.Equals, size) + totalRead := int64(0) + for totalRead < size { + nn, err := suite.StorageDriver.WriteStream(filename, 0, tf) + c.Assert(err, check.IsNil) + totalRead += nn + } + c.Assert(totalRead, check.Equals, size) reader, err := suite.StorageDriver.ReadStream(filename, 0) c.Assert(err, check.IsNil)