Updates Swift driver to support new storagedriver.FileWriter interface

Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
Brian Bland 2016-03-08 15:57:12 -08:00
parent 5b48c81545
commit 490a2f5a55

View File

@ -16,8 +16,8 @@
package swift package swift
import ( import (
"bufio"
"bytes" "bytes"
"crypto/md5"
"crypto/rand" "crypto/rand"
"crypto/sha1" "crypto/sha1"
"crypto/tls" "crypto/tls"
@ -49,6 +49,9 @@ const defaultChunkSize = 20 * 1024 * 1024
// minChunkSize defines the minimum size of a segment // minChunkSize defines the minimum size of a segment
const minChunkSize = 1 << 20 const minChunkSize = 1 << 20
// contentType defines the Content-Type header associated with stored segments
const contentType = "application/octet-stream"
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded // readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
var readAfterWriteTimeout = 15 * time.Second var readAfterWriteTimeout = 15 * time.Second
@ -282,16 +285,16 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
// PutContent stores the []byte content at a location designated by "path". // PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, d.getContentType()) err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType)
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}
} }
return err return err
} }
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a // Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset. // given byte offset.
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
headers := make(swift.Headers) headers := make(swift.Headers)
headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-" headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
@ -305,224 +308,46 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.
return file, err return file, err
} }
// WriteStream stores the contents of the provided io.Reader at a // Writer returns a FileWriter which will store the content written to it
// location designated by the given path. The driver will know it has // at the location designated by "path" after the call to Commit.
// received the full contents when the reader returns io.EOF. The number func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
// of successfully READ bytes will be returned, even if an error is
// returned. May be used to resume writing a stream by providing a nonzero
// offset. Offsets past the current size will write from the position
// beyond the end of the file.
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) {
var ( var (
segments []swift.Object segments []swift.Object
multi io.Reader segmentsPath string
paddingReader io.Reader err error
currentLength int64
cursor int64
segmentPath string
) )
partNumber := 1 if !append {
chunkSize := int64(d.ChunkSize) segmentsPath, err = d.swiftSegmentPath(path)
zeroBuf := make([]byte, d.ChunkSize) if err != nil {
hash := md5.New() return nil, err
getSegment := func() string {
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
} }
} else {
max := func(a int64, b int64) int64 {
if a > b {
return a
}
return b
}
createManifest := true
info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path)) info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
if err == nil { if err == swift.ObjectNotFound {
return nil, storagedriver.PathNotFoundError{Path: path}
} else if err != nil {
return nil, err
}
manifest, ok := headers["X-Object-Manifest"] manifest, ok := headers["X-Object-Manifest"]
if !ok { if !ok {
if segmentPath, err = d.swiftSegmentPath(path); err != nil { segmentsPath, err = d.swiftSegmentPath(path)
return 0, err if err != nil {
return nil, err
} }
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegment()); err != nil { if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, segmentPath(segmentsPath, len(segments))); err != nil {
return 0, err return nil, err
} }
segments = append(segments, info) segments = []swift.Object{info}
} else { } else {
_, segmentPath = parseManifest(manifest) _, segmentsPath = parseManifest(manifest)
if segments, err = d.getAllSegments(segmentPath); err != nil { if segments, err = d.getAllSegments(segmentsPath); err != nil {
return 0, err return nil, err
} }
createManifest = false
}
currentLength = info.Bytes
} else if err == swift.ObjectNotFound {
if segmentPath, err = d.swiftSegmentPath(path); err != nil {
return 0, err
}
} else {
return 0, err
}
// First, we skip the existing segments that are not modified by this call
for i := range segments {
if offset < cursor+segments[i].Bytes {
break
}
cursor += segments[i].Bytes
hash.Write([]byte(segments[i].Hash))
partNumber++
}
// We reached the end of the file but we haven't reached 'offset' yet
// Therefore we add blocks of zeros
if offset >= currentLength {
for offset-currentLength >= chunkSize {
// Insert a block a zero
headers, err := d.Conn.ObjectPut(d.Container, getSegment(), bytes.NewReader(zeroBuf), false, "", d.getContentType(), nil)
if err != nil {
if err == swift.ObjectNotFound {
return 0, storagedriver.PathNotFoundError{Path: getSegment()}
}
return 0, err
}
currentLength += chunkSize
partNumber++
hash.Write([]byte(headers["Etag"]))
}
cursor = currentLength
paddingReader = bytes.NewReader(zeroBuf)
} else if offset-cursor > 0 {
// Offset is inside the current segment : we need to read the
// data from the beginning of the segment to offset
file, _, err := d.Conn.ObjectOpen(d.Container, getSegment(), false, nil)
if err != nil {
if err == swift.ObjectNotFound {
return 0, storagedriver.PathNotFoundError{Path: getSegment()}
}
return 0, err
}
defer file.Close()
paddingReader = file
}
readers := []io.Reader{}
if paddingReader != nil {
readers = append(readers, io.LimitReader(paddingReader, offset-cursor))
}
readers = append(readers, io.LimitReader(reader, chunkSize-(offset-cursor)))
multi = io.MultiReader(readers...)
writeSegment := func(segment string) (finished bool, bytesRead int64, err error) {
currentSegment, err := d.Conn.ObjectCreate(d.Container, segment, false, "", d.getContentType(), nil)
if err != nil {
if err == swift.ObjectNotFound {
return false, bytesRead, storagedriver.PathNotFoundError{Path: segment}
}
return false, bytesRead, err
}
segmentHash := md5.New()
writer := io.MultiWriter(currentSegment, segmentHash)
n, err := io.Copy(writer, multi)
if err != nil {
return false, bytesRead, err
}
if n > 0 {
defer func() {
closeError := currentSegment.Close()
if err != nil {
err = closeError
}
hexHash := hex.EncodeToString(segmentHash.Sum(nil))
hash.Write([]byte(hexHash))
}()
bytesRead += n - max(0, offset-cursor)
}
if n < chunkSize {
// We wrote all the data
if cursor+n < currentLength {
// Copy the end of the chunk
headers := make(swift.Headers)
headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(cursor+chunkSize, 10)
file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
if err != nil {
if err == swift.ObjectNotFound {
return false, bytesRead, storagedriver.PathNotFoundError{Path: path}
}
return false, bytesRead, err
}
_, copyErr := io.Copy(writer, file)
if err := file.Close(); err != nil {
if err == swift.ObjectNotFound {
return false, bytesRead, storagedriver.PathNotFoundError{Path: path}
}
return false, bytesRead, err
}
if copyErr != nil {
return false, bytesRead, copyErr
} }
} }
return true, bytesRead, nil return d.newWriter(path, segmentsPath, segments), nil
}
multi = io.LimitReader(reader, chunkSize)
cursor += chunkSize
partNumber++
return false, bytesRead, nil
}
finished := false
read := int64(0)
bytesRead := int64(0)
for finished == false {
finished, read, err = writeSegment(getSegment())
bytesRead += read
if err != nil {
return bytesRead, err
}
}
for ; partNumber < len(segments); partNumber++ {
hash.Write([]byte(segments[partNumber].Hash))
}
if createManifest {
if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
return 0, err
}
}
expectedHash := hex.EncodeToString(hash.Sum(nil))
waitingTime := readAfterWriteWait
endTime := time.Now().Add(readAfterWriteTimeout)
for {
var infos swift.Object
if infos, _, err = d.Conn.Object(d.Container, d.swiftPath(path)); err == nil {
if strings.Trim(infos.Hash, "\"") == expectedHash {
return bytesRead, nil
}
err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", path)
}
if time.Now().Add(waitingTime).After(endTime) {
break
}
time.Sleep(waitingTime)
waitingTime *= 2
}
return bytesRead, err
} }
// Stat retrieves the FileInfo for the given path, including the current size // Stat retrieves the FileInfo for the given path, including the current size
@ -763,10 +588,6 @@ func (d *driver) swiftSegmentPath(path string) (string, error) {
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
} }
func (d *driver) getContentType() string {
return "application/octet-stream"
}
func (d *driver) getAllSegments(path string) ([]swift.Object, error) { func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path}) segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
if err == swift.ContainerNotFound { if err == swift.ContainerNotFound {
@ -778,7 +599,7 @@ func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
func (d *driver) createManifest(path string, segments string) error { func (d *driver) createManifest(path string, segments string) error {
headers := make(swift.Headers) headers := make(swift.Headers)
headers["X-Object-Manifest"] = segments headers["X-Object-Manifest"] = segments
manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers) manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers)
if err != nil { if err != nil {
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}
@ -810,3 +631,152 @@ func generateSecret() (string, error) {
} }
return hex.EncodeToString(secretBytes[:]), nil return hex.EncodeToString(secretBytes[:]), nil
} }
func segmentPath(segmentsPath string, partNumber int) string {
return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
}
type writer struct {
driver *driver
path string
segmentsPath string
size int64
bw *bufio.Writer
closed bool
committed bool
cancelled bool
}
func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter {
var size int64
for _, segment := range segments {
size += segment.Bytes
}
return &writer{
driver: d,
path: path,
segmentsPath: segmentsPath,
size: size,
bw: bufio.NewWriterSize(&segmentWriter{
conn: d.Conn,
container: d.Container,
segmentsPath: segmentsPath,
segmentNumber: len(segments) + 1,
maxChunkSize: d.ChunkSize,
}, d.ChunkSize),
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
}
n, err := w.bw.Write(p)
w.size += int64(n)
return n, err
}
func (w *writer) Size() int64 {
return w.size
}
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
if err := w.bw.Flush(); err != nil {
return err
}
if !w.committed && !w.cancelled {
if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
return err
}
}
w.closed = true
return nil
}
func (w *writer) Cancel() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
}
w.cancelled = true
return w.driver.Delete(context.Background(), w.path)
}
func (w *writer) Commit() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
if err := w.bw.Flush(); err != nil {
return err
}
if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
return err
}
w.committed = true
var err error
waitingTime := readAfterWriteWait
endTime := time.Now().Add(readAfterWriteTimeout)
for {
var info swift.Object
if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {
if info.Bytes == w.size {
break
}
err = fmt.Errorf("Timeout expired while waiting for segments of %s to show up", w.path)
}
if time.Now().Add(waitingTime).After(endTime) {
break
}
time.Sleep(waitingTime)
waitingTime *= 2
}
return err
}
type segmentWriter struct {
conn swift.Connection
container string
segmentsPath string
segmentNumber int
maxChunkSize int
}
func (sw *segmentWriter) Write(p []byte) (int, error) {
n := 0
for offset := 0; offset < len(p); offset += sw.maxChunkSize {
chunkSize := sw.maxChunkSize
if offset+chunkSize > len(p) {
chunkSize = len(p) - offset
}
_, err := sw.conn.ObjectPut(sw.container, segmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
if err != nil {
return n, err
}
sw.segmentNumber++
n += chunkSize
}
return n, nil
}