Merge pull request #1141 from lebauce/swift-no-missing-segment

Ensure read after write for segments
This commit is contained in:
Richard Scothern 2015-11-10 10:39:43 -08:00
commit a33095af80

View File

@ -17,6 +17,7 @@ package swift
import ( import (
"bytes" "bytes"
"crypto/md5"
"crypto/rand" "crypto/rand"
"crypto/sha1" "crypto/sha1"
"crypto/tls" "crypto/tls"
@ -48,6 +49,12 @@ const defaultChunkSize = 20 * 1024 * 1024
// minChunkSize defines the minimum size of a segment // minChunkSize defines the minimum size of a segment
const minChunkSize = 1 << 20 const minChunkSize = 1 << 20
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
var readAfterWriteTimeout = 15 * time.Second
// readAfterWriteWait defines the time to sleep between two retries
var readAfterWriteWait = 200 * time.Millisecond
// Parameters A struct that encapsulates all of the driver parameters after all values have been set // Parameters A struct that encapsulates all of the driver parameters after all values have been set
type Parameters struct { type Parameters struct {
Username string Username string
@ -318,6 +325,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
partNumber := 1 partNumber := 1
chunkSize := int64(d.ChunkSize) chunkSize := int64(d.ChunkSize)
zeroBuf := make([]byte, d.ChunkSize) zeroBuf := make([]byte, d.ChunkSize)
hash := md5.New()
getSegment := func() string { getSegment := func() string {
return fmt.Sprintf("%s/%016d", segmentPath, partNumber) return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
@ -358,18 +366,13 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
return 0, err return 0, err
} }
if createManifest {
if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
return 0, err
}
}
// First, we skip the existing segments that are not modified by this call // First, we skip the existing segments that are not modified by this call
for i := range segments { for i := range segments {
if offset < cursor+segments[i].Bytes { if offset < cursor+segments[i].Bytes {
break break
} }
cursor += segments[i].Bytes cursor += segments[i].Bytes
hash.Write([]byte(segments[i].Hash))
partNumber++ partNumber++
} }
@ -378,7 +381,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
if offset >= currentLength { if offset >= currentLength {
for offset-currentLength >= chunkSize { for offset-currentLength >= chunkSize {
// Insert a block a zero // Insert a block a zero
_, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil) headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil)
if err != nil { if err != nil {
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
return 0, storagedriver.PathNotFoundError{Path: getSegment()} return 0, storagedriver.PathNotFoundError{Path: getSegment()}
@ -387,6 +390,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
} }
currentLength += chunkSize currentLength += chunkSize
partNumber++ partNumber++
hash.Write([]byte(headers["Etag"]))
} }
cursor = currentLength cursor = currentLength
@ -421,13 +425,23 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
return false, bytesRead, err return false, bytesRead, err
} }
n, err := io.Copy(currentSegment, multi) segmentHash := md5.New()
writer := io.MultiWriter(currentSegment, segmentHash)
n, err := io.Copy(writer, multi)
if err != nil { if err != nil {
return false, bytesRead, err return false, bytesRead, err
} }
if n > 0 { if n > 0 {
defer currentSegment.Close() defer func() {
closeError := currentSegment.Close()
if err != nil {
err = closeError
}
hexHash := hex.EncodeToString(segmentHash.Sum(nil))
hash.Write([]byte(hexHash))
}()
bytesRead += n - max(0, offset-cursor) bytesRead += n - max(0, offset-cursor)
} }
@ -445,7 +459,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
return false, bytesRead, err return false, bytesRead, err
} }
_, copyErr := io.Copy(currentSegment, file) _, copyErr := io.Copy(writer, file)
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
@ -480,7 +494,35 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
} }
} }
return bytesRead, nil for ; partNumber < len(segments); partNumber++ {
hash.Write([]byte(segments[partNumber].Hash))
}
if createManifest {
if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
return 0, err
}
}
expectedHash := hex.EncodeToString(hash.Sum(nil))
waitingTime := readAfterWriteWait
endTime := time.Now().Add(readAfterWriteTimeout)
for {
var infos swift.Object
if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil {
if strings.Trim(infos.Hash, "\"") == expectedHash {
return bytesRead, nil
}
err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path)
}
if time.Now().Add(waitingTime).After(endTime) {
break
}
time.Sleep(waitingTime)
waitingTime *= 2
}
return bytesRead, err
} }
// Stat retrieves the FileInfo for the given path, including the current size // Stat retrieves the FileInfo for the given path, including the current size