detect outdated container listings during Stat() and getAllSegments()

Signed-off-by: Stefan Majewsky <stefan.majewsky@sap.com>
This commit is contained in:
Stefan Majewsky 2016-04-05 16:46:39 +02:00
parent 20fa47886d
commit 67321cb622

View File

@ -335,7 +335,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, segmentPath(segmentsPath, len(segments))); err != nil { if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
return nil, err return nil, err
} }
segments = []swift.Object{info} segments = []swift.Object{info}
@ -376,23 +376,26 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
fi.IsDir = true fi.IsDir = true
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
} else if obj.Name == swiftPath { } else if obj.Name == swiftPath {
// On Swift 1.12, the 'bytes' field is always 0 // The file exists. But on Swift 1.12, the 'bytes' field is always 0 so
// so we need to do a second HEAD request // we need to do a separate HEAD request.
info, _, err := d.Conn.Object(d.Container, swiftPath) break
if err != nil {
if err == swift.ObjectNotFound {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
fi.IsDir = false
fi.Size = info.Bytes
fi.ModTime = info.LastModified
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
} }
} }
return nil, storagedriver.PathNotFoundError{Path: path} //Don't trust an empty `objects` slice. A container listing can be
//outdated. For files, we can make a HEAD request on the object which
//reports existence (at least) much more reliably.
info, _, err := d.Conn.Object(d.Container, swiftPath)
if err != nil {
if err == swift.ObjectNotFound {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
fi.IsDir = false
fi.Size = info.Bytes
fi.ModTime = info.LastModified
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
} }
// List returns a list of the objects that are direct descendants of the given path. // List returns a list of the objects that are direct descendants of the given path.
@ -589,11 +592,52 @@ func (d *driver) swiftSegmentPath(path string) (string, error) {
} }
func (d *driver) getAllSegments(path string) ([]swift.Object, error) { func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
//a simple container listing works 99.9% of the time
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path}) segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
if err == swift.ContainerNotFound { if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path} if err == swift.ContainerNotFound {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
//build a lookup table by object name
hasObjectName := make(map[string]struct{})
for _, segment := range segments {
hasObjectName[segment.Name] = struct{}{}
}
//The container listing might be outdated (i.e. not contain all existing
//segment objects yet) because of temporary inconsistency (Swift is only
//eventually consistent!). Check its completeness.
segmentNumber := 0
for {
segmentNumber++
segmentPath := getSegmentPath(path, segmentNumber)
if _, seen := hasObjectName[segmentPath]; seen {
continue
}
//This segment is missing in the container listing. Use a more reliable
//request to check its existence. (HEAD requests on segments are
//guaranteed to return the correct metadata, except for the pathological
//case of an outage of large parts of the Swift cluster or its network,
//since every segment is only written once.)
segment, _, err := d.Conn.Object(d.Container, segmentPath)
switch err {
case nil:
//found new segment -> keep going, more might be missing
segments = append(segments, segment)
continue
case swift.ObjectNotFound:
//This segment is missing. Since we upload segments sequentially,
//there won't be any more segments after it.
return segments, nil
default:
return nil, err //unexpected error
}
} }
return segments, err
} }
func (d *driver) createManifest(path string, segments string) error { func (d *driver) createManifest(path string, segments string) error {
@ -632,7 +676,7 @@ func generateSecret() (string, error) {
return hex.EncodeToString(secretBytes[:]), nil return hex.EncodeToString(secretBytes[:]), nil
} }
func segmentPath(segmentsPath string, partNumber int) string { func getSegmentPath(segmentsPath string, partNumber int) string {
return fmt.Sprintf("%s/%016d", segmentsPath, partNumber) return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
} }
@ -769,7 +813,7 @@ func (sw *segmentWriter) Write(p []byte) (int, error) {
if offset+chunkSize > len(p) { if offset+chunkSize > len(p) {
chunkSize = len(p) - offset chunkSize = len(p) - offset
} }
_, err := sw.conn.ObjectPut(sw.container, segmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil) _, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
if err != nil { if err != nil {
return n, err return n, err
} }