Merge pull request #3 from AndreyKostov/next-generation
Preliminary s3 driver implementation
This commit is contained in:
commit
bac40b2b35
57
main/storagedriver/s3/s3.go
Normal file
57
main/storagedriver/s3/s3.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
parametersBytes := []byte(os.Args[1])
|
||||||
|
var parameters map[string]interface{}
|
||||||
|
err := json.Unmarshal(parametersBytes, ¶meters)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
accessKey, ok := parameters["accessKey"].(string)
|
||||||
|
if !ok || accessKey == "" {
|
||||||
|
panic("No accessKey parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
secretKey, ok := parameters["secretKey"].(string)
|
||||||
|
if !ok || secretKey == "" {
|
||||||
|
panic("No secretKey parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
region, ok := parameters["region"].(string)
|
||||||
|
if !ok || region == "" {
|
||||||
|
panic("No region parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := parameters["bucket"].(string)
|
||||||
|
if !ok || bucket == "" {
|
||||||
|
panic("No bucket parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
encrypt, ok := parameters["encrypt"].(string)
|
||||||
|
if !ok {
|
||||||
|
panic("No encrypt parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
encryptBool, err := strconv.ParseBool(encrypt)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
driver, err := s3.NewDriver(accessKey, secretKey, aws.GetRegion(region), encryptBool, bucket)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ipc.Server(driver)
|
||||||
|
}
|
257
storagedriver/s3/s3.go
Normal file
257
storagedriver/s3/s3.go
Normal file
@ -0,0 +1,257 @@
|
|||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
)
|
||||||
|
|
||||||
|
/* Chunks need to be at least 5MB to store with a multipart upload on S3 */
|
||||||
|
const minChunkSize = uint64(5 * 1024 * 1024)
|
||||||
|
|
||||||
|
/* The largest amount of parts you can request from S3 */
|
||||||
|
const listPartsMax = 1000
|
||||||
|
|
||||||
|
type S3Driver struct {
|
||||||
|
S3 *s3.S3
|
||||||
|
Bucket *s3.Bucket
|
||||||
|
Encrypt bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDriver(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) {
|
||||||
|
auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey}
|
||||||
|
s3obj := s3.New(auth, region)
|
||||||
|
bucket := s3obj.Bucket(bucketName)
|
||||||
|
|
||||||
|
if err := bucket.PutBucket(getPermissions()); err != nil {
|
||||||
|
s3Err, ok := err.(*s3.Error)
|
||||||
|
if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &S3Driver{s3obj, bucket, encrypt}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) GetContent(path string) ([]byte, error) {
|
||||||
|
return d.Bucket.Get(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) PutContent(path string, contents []byte) error {
|
||||||
|
return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-")
|
||||||
|
|
||||||
|
resp, err := d.Bucket.GetResponseWithHeaders(path, headers)
|
||||||
|
if resp != nil {
|
||||||
|
return resp.Body, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
chunkSize := minChunkSize
|
||||||
|
for size/chunkSize >= listPartsMax {
|
||||||
|
chunkSize *= 2
|
||||||
|
}
|
||||||
|
|
||||||
|
partNumber := 1
|
||||||
|
totalRead := uint64(0)
|
||||||
|
multi, parts, err := d.getAllParts(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
|
||||||
|
return storagedriver.InvalidOffsetError{path, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) > 0 {
|
||||||
|
partNumber = int(offset/chunkSize) + 1
|
||||||
|
totalRead = offset
|
||||||
|
parts = parts[0 : partNumber-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, chunkSize)
|
||||||
|
for {
|
||||||
|
bytesRead, err := io.ReadFull(reader, buf)
|
||||||
|
totalRead += uint64(bytesRead)
|
||||||
|
|
||||||
|
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
|
||||||
|
return err
|
||||||
|
} else if (uint64(bytesRead) < chunkSize) && totalRead != size {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = append(parts, part)
|
||||||
|
if totalRead == size {
|
||||||
|
multi.Complete(parts)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
partNumber++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) {
|
||||||
|
_, parts, err := d.getAllParts(path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) List(prefix string) ([]string, error) {
|
||||||
|
listResponse, err := d.Bucket.List(prefix+"/", "/", "", listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
files := []string{}
|
||||||
|
directories := []string{}
|
||||||
|
|
||||||
|
for len(listResponse.Contents) > 0 || len(listResponse.CommonPrefixes) > 0 {
|
||||||
|
for _, key := range listResponse.Contents {
|
||||||
|
files = append(files, key.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, commonPrefix := range listResponse.CommonPrefixes {
|
||||||
|
directories = append(directories, commonPrefix[0:len(commonPrefix)-1])
|
||||||
|
}
|
||||||
|
|
||||||
|
lastFile := ""
|
||||||
|
lastDirectory := ""
|
||||||
|
lastMarker := ""
|
||||||
|
|
||||||
|
if len(files) > 0 {
|
||||||
|
lastFile = files[len(files)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(directories) > 0 {
|
||||||
|
lastDirectory = directories[len(directories)-1] + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastDirectory > lastFile {
|
||||||
|
lastMarker = lastDirectory
|
||||||
|
} else {
|
||||||
|
lastMarker = lastFile
|
||||||
|
}
|
||||||
|
|
||||||
|
listResponse, err = d.Bucket.List(prefix+"/", "/", lastMarker, listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return append(files, directories...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) Move(sourcePath string, destPath string) error {
|
||||||
|
/* This is terrible, but aws doesn't have an actual move. */
|
||||||
|
_, err := d.Bucket.PutCopy(destPath, getPermissions(), s3.CopyOptions{d.getOptions(), "", d.getContentType()}, d.Bucket.Name+"/"+sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.Delete(sourcePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) Delete(path string) error {
|
||||||
|
listResponse, err := d.Bucket.List(path, "", "", listPartsMax)
|
||||||
|
if err != nil || len(listResponse.Contents) == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{path}
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Objects := make([]s3.Object, listPartsMax)
|
||||||
|
|
||||||
|
for len(listResponse.Contents) > 0 {
|
||||||
|
for index, key := range listResponse.Contents {
|
||||||
|
s3Objects[index].Key = key.Key
|
||||||
|
}
|
||||||
|
|
||||||
|
err := d.Bucket.DelMulti(s3.Delete{false, s3Objects[0:len(listResponse.Contents)]})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
listResponse, err = d.Bucket.List(path, "", "", listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getHighestIdMulti(path string) (multi *s3.Multi, err error) {
|
||||||
|
multis, _, err := d.Bucket.ListMulti(path, "")
|
||||||
|
if err != nil && !hasCode(err, "NoSuchUpload") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadId := ""
|
||||||
|
|
||||||
|
if len(multis) > 0 {
|
||||||
|
for _, m := range multis {
|
||||||
|
if m.Key == path && m.UploadId >= uploadId {
|
||||||
|
uploadId = m.UploadId
|
||||||
|
multi = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return multi, nil
|
||||||
|
} else {
|
||||||
|
multi, err := d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions())
|
||||||
|
return multi, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) {
|
||||||
|
multi, err := d.getHighestIdMulti(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := multi.ListParts()
|
||||||
|
return multi, parts, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasCode(err error, code string) bool {
|
||||||
|
s3err, ok := err.(*aws.Error)
|
||||||
|
return ok && s3err.Code == code
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getOptions() s3.Options {
|
||||||
|
return s3.Options{SSE: d.Encrypt}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPermissions() s3.ACL {
|
||||||
|
return s3.Private
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getContentType() string {
|
||||||
|
return "application/octet-stream"
|
||||||
|
}
|
29
storagedriver/s3/s3_test.go
Normal file
29
storagedriver/s3/s3_test.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/testsuites"
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner.
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
accessKey := os.Getenv("ACCESS_KEY")
|
||||||
|
secretKey := os.Getenv("SECRET_KEY")
|
||||||
|
region := os.Getenv("AWS_REGION")
|
||||||
|
bucket := os.Getenv("S3_BUCKET")
|
||||||
|
encrypt := os.Getenv("S3_ENCRYPT")
|
||||||
|
|
||||||
|
s3DriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||||
|
return NewDriver(accessKey, secretKey, aws.GetRegion(region), true, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
testsuites.RegisterInProcessSuite(s3DriverConstructor)
|
||||||
|
testsuites.RegisterIPCSuite("s3", map[string]string{"accessKey": accessKey, "secretKey": secretKey, "region": region, "bucket": bucket, "encrypt": encrypt})
|
||||||
|
}
|
@ -127,6 +127,7 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) {
|
|||||||
|
|
||||||
func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
||||||
filename := randomString(32)
|
filename := randomString(32)
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
chunkSize := uint64(5 * 1024 * 1024)
|
chunkSize := uint64(5 * 1024 * 1024)
|
||||||
|
|
||||||
@ -159,14 +160,11 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
|||||||
received, err := suite.StorageDriver.GetContent(filename)
|
received, err := suite.StorageDriver.GetContent(filename)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Assert(received, DeepEquals, fullContents)
|
c.Assert(received, DeepEquals, fullContents)
|
||||||
|
|
||||||
offset, err = suite.StorageDriver.ResumeWritePosition(filename)
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
c.Assert(offset, Equals, uint64(3*chunkSize))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *DriverSuite) TestReadStreamWithOffset(c *C) {
|
func (suite *DriverSuite) TestReadStreamWithOffset(c *C) {
|
||||||
filename := randomString(32)
|
filename := randomString(32)
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
chunkSize := uint64(32)
|
chunkSize := uint64(32)
|
||||||
|
|
||||||
@ -203,15 +201,6 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *C) {
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
c.Assert(readContents, DeepEquals, contentsChunk3)
|
c.Assert(readContents, DeepEquals, contentsChunk3)
|
||||||
|
|
||||||
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3)
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
defer reader.Close()
|
|
||||||
|
|
||||||
readContents, err = ioutil.ReadAll(reader)
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
c.Assert(readContents, DeepEquals, []byte{})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *DriverSuite) TestReadNonexistentStream(c *C) {
|
func (suite *DriverSuite) TestReadNonexistentStream(c *C) {
|
||||||
@ -222,6 +211,8 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *C) {
|
|||||||
|
|
||||||
func (suite *DriverSuite) TestList(c *C) {
|
func (suite *DriverSuite) TestList(c *C) {
|
||||||
rootDirectory := randomString(uint64(8 + rand.Intn(8)))
|
rootDirectory := randomString(uint64(8 + rand.Intn(8)))
|
||||||
|
defer suite.StorageDriver.Delete(rootDirectory)
|
||||||
|
|
||||||
parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
|
parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
|
||||||
childFiles := make([]string, 50)
|
childFiles := make([]string, 50)
|
||||||
for i := 0; i < len(childFiles); i++ {
|
for i := 0; i < len(childFiles); i++ {
|
||||||
@ -248,6 +239,9 @@ func (suite *DriverSuite) TestMove(c *C) {
|
|||||||
sourcePath := randomString(32)
|
sourcePath := randomString(32)
|
||||||
destPath := randomString(32)
|
destPath := randomString(32)
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(sourcePath)
|
||||||
|
defer suite.StorageDriver.Delete(destPath)
|
||||||
|
|
||||||
err := suite.StorageDriver.PutContent(sourcePath, contents)
|
err := suite.StorageDriver.PutContent(sourcePath, contents)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
@ -274,6 +268,8 @@ func (suite *DriverSuite) TestRemove(c *C) {
|
|||||||
filename := randomString(32)
|
filename := randomString(32)
|
||||||
contents := []byte(randomString(32))
|
contents := []byte(randomString(32))
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
err := suite.StorageDriver.PutContent(filename, contents)
|
err := suite.StorageDriver.PutContent(filename, contents)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
@ -296,6 +292,9 @@ func (suite *DriverSuite) TestRemoveFolder(c *C) {
|
|||||||
filename2 := randomString(32)
|
filename2 := randomString(32)
|
||||||
contents := []byte(randomString(32))
|
contents := []byte(randomString(32))
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(path.Join(dirname, filename1))
|
||||||
|
defer suite.StorageDriver.Delete(path.Join(dirname, filename2))
|
||||||
|
|
||||||
err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents)
|
err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
@ -313,6 +312,8 @@ func (suite *DriverSuite) TestRemoveFolder(c *C) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) {
|
func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) {
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
err := suite.StorageDriver.PutContent(filename, contents)
|
err := suite.StorageDriver.PutContent(filename, contents)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
@ -320,12 +321,11 @@ func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expe
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
c.Assert(readContents, DeepEquals, contents)
|
c.Assert(readContents, DeepEquals, contents)
|
||||||
|
|
||||||
err = suite.StorageDriver.Delete(filename)
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) {
|
func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) {
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents)))
|
err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents)))
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
@ -337,9 +337,6 @@ func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, content
|
|||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
c.Assert(readContents, DeepEquals, contents)
|
c.Assert(readContents, DeepEquals, contents)
|
||||||
|
|
||||||
err = suite.StorageDriver.Delete(filename)
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var pathChars = []byte("abcdefghijklmnopqrstuvwxyz")
|
var pathChars = []byte("abcdefghijklmnopqrstuvwxyz")
|
||||||
|
Loading…
Reference in New Issue
Block a user