distribution/storagedriver/s3/s3.go
Stephen J Day 66107df1af Use int64 for ReadStream and WriteStream offsets
This change brings the storagedriver API in line with the Go standard library's
use of int64 for offsets. The main benefit is simplicity in interfacing with
the io library reducing the number of type conversions in simple code.
2014-12-02 19:01:00 -08:00

329 lines
8.4 KiB
Go

package s3
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"github.com/crowdmob/goamz/aws"
"github.com/crowdmob/goamz/s3"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/factory"
)
const driverName = "s3"
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 * 1024 * 1024
// listPartsMax is the largest amount of parts you can request from S3
const listPartsMax = 1000
func init() {
factory.Register(driverName, &s3DriverFactory{})
}
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct{}
func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket
type Driver struct {
S3 *s3.S3
Bucket *s3.Bucket
Encrypt bool
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters(parameters map[string]string) (*Driver, error) {
accessKey, ok := parameters["accesskey"]
if !ok || accessKey == "" {
return nil, fmt.Errorf("No accesskey parameter provided")
}
secretKey, ok := parameters["secretkey"]
if !ok || secretKey == "" {
return nil, fmt.Errorf("No secretkey parameter provided")
}
regionName, ok := parameters["region"]
if !ok || regionName == "" {
return nil, fmt.Errorf("No region parameter provided")
}
region := aws.GetRegion(regionName)
if region.Name == "" {
return nil, fmt.Errorf("Invalid region provided: %v", region)
}
bucket, ok := parameters["bucket"]
if !ok || bucket == "" {
return nil, fmt.Errorf("No bucket parameter provided")
}
encrypt, ok := parameters["encrypt"]
if !ok {
return nil, fmt.Errorf("No encrypt parameter provided")
}
encryptBool, err := strconv.ParseBool(encrypt)
if err != nil {
return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err)
}
return New(accessKey, secretKey, region, encryptBool, bucket)
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*Driver, error) {
auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey}
s3obj := s3.New(auth, region)
bucket := s3obj.Bucket(bucketName)
if err := bucket.PutBucket(getPermissions()); err != nil {
s3Err, ok := err.(*s3.Error)
if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") {
return nil, err
}
}
return &Driver{s3obj, bucket, encrypt}, nil
}
// Implement the storagedriver.StorageDriver interface
// GetContent retrieves the content stored at "path" as a []byte.
func (d *Driver) GetContent(path string) ([]byte, error) {
content, err := d.Bucket.Get(path)
if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return content, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *Driver) PutContent(path string, contents []byte) error {
return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions())
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
resp, err := d.Bucket.GetResponseWithHeaders(path, headers)
if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return resp.Body, nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
defer reader.Close()
chunkSize := int64(minChunkSize)
for size/chunkSize >= listPartsMax {
chunkSize *= 2
}
partNumber := 1
var totalRead int64
multi, parts, err := d.getAllParts(path)
if err != nil {
return err
}
if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
if len(parts) > 0 {
partNumber = int(offset/chunkSize) + 1
totalRead = offset
parts = parts[0 : partNumber-1]
}
buf := make([]byte, chunkSize)
for {
bytesRead, err := io.ReadFull(reader, buf)
totalRead += int64(bytesRead)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return err
} else if (int64(bytesRead) < chunkSize) && totalRead != size {
break
} else {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))
if err != nil {
return err
}
parts = append(parts, part)
if totalRead == size {
multi.Complete(parts)
break
}
partNumber++
}
}
return nil
}
// CurrentSize retrieves the curernt size in bytes of the object at the given
// path.
func (d *Driver) CurrentSize(path string) (uint64, error) {
_, parts, err := d.getAllParts(path)
if err != nil {
return 0, err
}
if len(parts) == 0 {
return 0, nil
}
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *Driver) List(path string) ([]string, error) {
if path[len(path)-1] != '/' {
path = path + "/"
}
listResponse, err := d.Bucket.List(path, "/", "", listPartsMax)
if err != nil {
return nil, err
}
files := []string{}
directories := []string{}
for {
for _, key := range listResponse.Contents {
files = append(files, key.Key)
}
for _, commonPrefix := range listResponse.CommonPrefixes {
directories = append(directories, commonPrefix[0:len(commonPrefix)-1])
}
if listResponse.IsTruncated {
listResponse, err = d.Bucket.List(path, "/", listResponse.NextMarker, listPartsMax)
if err != nil {
return nil, err
}
} else {
break
}
}
return append(files, directories...), nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *Driver) Move(sourcePath string, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
_, err := d.Bucket.PutCopy(destPath, getPermissions(),
s3.CopyOptions{Options: d.getOptions(), MetadataDirective: "", ContentType: d.getContentType()},
d.Bucket.Name+"/"+sourcePath)
if err != nil {
return storagedriver.PathNotFoundError{Path: sourcePath}
}
return d.Delete(sourcePath)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *Driver) Delete(path string) error {
listResponse, err := d.Bucket.List(path, "", "", listPartsMax)
if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
s3Objects := make([]s3.Object, listPartsMax)
for len(listResponse.Contents) > 0 {
for index, key := range listResponse.Contents {
s3Objects[index].Key = key.Key
}
err := d.Bucket.DelMulti(s3.Delete{Quiet: false, Objects: s3Objects[0:len(listResponse.Contents)]})
if err != nil {
return nil
}
listResponse, err = d.Bucket.List(path, "", "", listPartsMax)
if err != nil {
return err
}
}
return nil
}
func (d *Driver) getHighestIDMulti(path string) (multi *s3.Multi, err error) {
multis, _, err := d.Bucket.ListMulti(path, "")
if err != nil && !hasCode(err, "NoSuchUpload") {
return nil, err
}
uploadID := ""
if len(multis) > 0 {
for _, m := range multis {
if m.Key == path && m.UploadId >= uploadID {
uploadID = m.UploadId
multi = m
}
}
return multi, nil
}
multi, err = d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions())
return multi, err
}
func (d *Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) {
multi, err := d.getHighestIDMulti(path)
if err != nil {
return nil, nil, err
}
parts, err := multi.ListParts()
return multi, parts, err
}
func hasCode(err error, code string) bool {
s3err, ok := err.(*aws.Error)
return ok && s3err.Code == code
}
func (d *Driver) getOptions() s3.Options {
return s3.Options{SSE: d.Encrypt}
}
func getPermissions() s3.ACL {
return s3.Private
}
func (d *Driver) getContentType() string {
return "application/octet-stream"
}