From 3f95694180729cf78629f4062617ac10132bec50 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Oct 2014 15:02:20 -0700 Subject: [PATCH 01/12] Adds storage driver interface, tests, and two basic implementations --- .travis.yml | 5 + main/storagedriver/filesystem/filesystem.go | 26 ++ main/storagedriver/inmemory/inmemory.go | 10 + storagedriver/filesystem/filesystem.go | 173 ++++++++++ storagedriver/filesystem/filesystem_test.go | 24 ++ storagedriver/inmemory/inmemory.go | 147 ++++++++ storagedriver/inmemory/inmemory_test.go | 20 ++ storagedriver/ipc/client.go | 285 ++++++++++++++++ storagedriver/ipc/ipc.go | 83 +++++ storagedriver/ipc/server.go | 160 +++++++++ storagedriver/storagedriver.go | 34 ++ storagedriver/testsuites/testsuites.go | 353 ++++++++++++++++++++ 12 files changed, 1320 insertions(+) create mode 100644 .travis.yml create mode 100644 main/storagedriver/filesystem/filesystem.go create mode 100644 main/storagedriver/inmemory/inmemory.go create mode 100644 storagedriver/filesystem/filesystem.go create mode 100644 storagedriver/filesystem/filesystem_test.go create mode 100644 storagedriver/inmemory/inmemory.go create mode 100644 storagedriver/inmemory/inmemory_test.go create mode 100644 storagedriver/ipc/client.go create mode 100644 storagedriver/ipc/ipc.go create mode 100644 storagedriver/ipc/server.go create mode 100644 storagedriver/storagedriver.go create mode 100644 storagedriver/testsuites/testsuites.go diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..d48424c3 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: go + +go: +- 1.3 +- tip diff --git a/main/storagedriver/filesystem/filesystem.go b/main/storagedriver/filesystem/filesystem.go new file mode 100644 index 00000000..8c0e2677 --- /dev/null +++ b/main/storagedriver/filesystem/filesystem.go @@ -0,0 +1,26 @@ +package main + +import ( + "encoding/json" + "os" + + "github.com/docker/docker-registry/storagedriver/filesystem" + "github.com/docker/docker-registry/storagedriver/ipc" +) + +func main() { + parametersBytes := []byte(os.Args[1]) + var parameters map[string]interface{} + err := json.Unmarshal(parametersBytes, ¶meters) + if err != nil { + panic(err) + } + rootDirectory := "/tmp/registry" + if parameters != nil { + rootDirParam, ok := parameters["RootDirectory"].(string) + if ok && rootDirParam != "" { + rootDirectory = rootDirParam + } + } + ipc.Server(filesystem.NewDriver(rootDirectory)) +} diff --git a/main/storagedriver/inmemory/inmemory.go b/main/storagedriver/inmemory/inmemory.go new file mode 100644 index 00000000..f55c8d5f --- /dev/null +++ b/main/storagedriver/inmemory/inmemory.go @@ -0,0 +1,10 @@ +package main + +import ( + "github.com/docker/docker-registry/storagedriver/inmemory" + "github.com/docker/docker-registry/storagedriver/ipc" +) + +func main() { + ipc.Server(inmemory.NewDriver()) +} diff --git a/storagedriver/filesystem/filesystem.go b/storagedriver/filesystem/filesystem.go new file mode 100644 index 00000000..79106e37 --- /dev/null +++ b/storagedriver/filesystem/filesystem.go @@ -0,0 +1,173 @@ +package filesystem + +import ( + "io" + "io/ioutil" + "os" + "path" + "strings" + + "github.com/docker/docker-registry/storagedriver" +) + +type FilesystemDriver struct { + rootDirectory string +} + +func NewDriver(rootDirectory string) *FilesystemDriver { + return &FilesystemDriver{rootDirectory} +} + +func (d *FilesystemDriver) subPath(subPath string) string { + return path.Join(d.rootDirectory, subPath) +} + +func (d *FilesystemDriver) GetContent(path string) ([]byte, error) { + contents, err := ioutil.ReadFile(d.subPath(path)) + if err != nil { + return nil, storagedriver.PathNotFoundError{path} + } + return contents, nil +} + +func (d *FilesystemDriver) PutContent(subPath string, contents []byte) error { + fullPath := d.subPath(subPath) + parentDir := path.Dir(fullPath) + err := os.MkdirAll(parentDir, 0755) + if err != nil { + return err + } + + err = ioutil.WriteFile(fullPath, contents, 0644) + return err +} + +func (d *FilesystemDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + + seekPos, err := file.Seek(int64(offset), os.SEEK_SET) + if err != nil { + file.Close() + return nil, err + } else if seekPos < int64(offset) { + file.Close() + return nil, storagedriver.InvalidOffsetError{path, offset} + } + + return file, nil +} + +func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { + defer reader.Close() + + resumableOffset, err := d.ResumeWritePosition(subPath) + if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound { + return err + } + + if offset > resumableOffset { + return storagedriver.InvalidOffsetError{subPath, offset} + } + + fullPath := d.subPath(subPath) + parentDir := path.Dir(fullPath) + err = os.MkdirAll(parentDir, 0755) + if err != nil { + return err + } + + var file *os.File + if offset == 0 { + file, err = os.Create(fullPath) + } else { + file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0) + } + + if err != nil { + return err + } + defer file.Close() + + buf := make([]byte, 32*1024) + for { + bytesRead, er := reader.Read(buf) + if bytesRead > 0 { + bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) + if bytesWritten > 0 { + offset += uint64(bytesWritten) + } + if ew != nil { + err = ew + break + } + if bytesRead != bytesWritten { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return err +} + +func (d *FilesystemDriver) ResumeWritePosition(subPath string) (uint64, error) { + fullPath := d.subPath(subPath) + + fileInfo, err := os.Stat(fullPath) + if err != nil && !os.IsNotExist(err) { + return 0, err + } else if err != nil { + return 0, storagedriver.PathNotFoundError{subPath} + } + return uint64(fileInfo.Size()), nil +} + +func (d *FilesystemDriver) List(prefix string) ([]string, error) { + prefix = strings.TrimRight(prefix, "/") + fullPath := d.subPath(prefix) + + dir, err := os.Open(fullPath) + if err != nil { + return nil, err + } + + fileNames, err := dir.Readdirnames(0) + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(fileNames)) + for _, fileName := range fileNames { + keys = append(keys, path.Join(prefix, fileName)) + } + + return keys, nil +} + +func (d *FilesystemDriver) Move(sourcePath string, destPath string) error { + err := os.Rename(d.subPath(sourcePath), d.subPath(destPath)) + return err +} + +func (d *FilesystemDriver) Delete(subPath string) error { + fullPath := d.subPath(subPath) + + _, err := os.Stat(fullPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err != nil { + return storagedriver.PathNotFoundError{subPath} + } + + err = os.RemoveAll(fullPath) + return err +} diff --git a/storagedriver/filesystem/filesystem_test.go b/storagedriver/filesystem/filesystem_test.go new file mode 100644 index 00000000..c445e178 --- /dev/null +++ b/storagedriver/filesystem/filesystem_test.go @@ -0,0 +1,24 @@ +package filesystem + +import ( + "os" + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/testsuites" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +func init() { + rootDirectory := "/tmp/driver" + os.RemoveAll(rootDirectory) + + filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { + return NewDriver(rootDirectory), nil + } + testsuites.RegisterInProcessSuite(filesystemDriverConstructor) + testsuites.RegisterIPCSuite("filesystem", map[string]string{"RootDirectory": rootDirectory}) +} diff --git a/storagedriver/inmemory/inmemory.go b/storagedriver/inmemory/inmemory.go new file mode 100644 index 00000000..ea44bb39 --- /dev/null +++ b/storagedriver/inmemory/inmemory.go @@ -0,0 +1,147 @@ +package inmemory + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "regexp" + "strings" + "sync" + + "github.com/docker/docker-registry/storagedriver" +) + +type InMemoryDriver struct { + storage map[string][]byte + mutex sync.RWMutex +} + +func NewDriver() *InMemoryDriver { + return &InMemoryDriver{storage: make(map[string][]byte)} +} + +func (d *InMemoryDriver) GetContent(path string) ([]byte, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, ok := d.storage[path] + if !ok { + return nil, storagedriver.PathNotFoundError{path} + } + return contents, nil +} + +func (d *InMemoryDriver) PutContent(path string, contents []byte) error { + d.mutex.Lock() + defer d.mutex.Unlock() + d.storage[path] = contents + return nil +} + +func (d *InMemoryDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, err := d.GetContent(path) + if err != nil { + return nil, err + } else if len(contents) < int(offset) { + return nil, storagedriver.InvalidOffsetError{path, offset} + } + + src := contents[offset:] + buf := make([]byte, len(src)) + copy(buf, src) + return ioutil.NopCloser(bytes.NewReader(buf)), nil +} + +func (d *InMemoryDriver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { + defer reader.Close() + d.mutex.RLock() + defer d.mutex.RUnlock() + + resumableOffset, err := d.ResumeWritePosition(path) + if err != nil { + return err + } + + if offset > resumableOffset { + return storagedriver.InvalidOffsetError{path, offset} + } + + contents, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + + if offset > 0 { + contents = append(d.storage[path][0:offset], contents...) + } + + d.storage[path] = contents + return nil +} + +func (d *InMemoryDriver) ResumeWritePosition(path string) (uint64, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, ok := d.storage[path] + if !ok { + return 0, nil + } + return uint64(len(contents)), nil +} + +func (d *InMemoryDriver) List(prefix string) ([]string, error) { + subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s/[^/]+", prefix)) + if err != nil { + return nil, err + } + + d.mutex.RLock() + defer d.mutex.RUnlock() + // we use map to collect uniq keys + keySet := make(map[string]struct{}) + for k := range d.storage { + if key := subPathMatcher.FindString(k); key != "" { + keySet[key] = struct{}{} + } + } + + keys := make([]string, 0, len(keySet)) + for k := range keySet { + keys = append(keys, k) + } + return keys, nil +} + +func (d *InMemoryDriver) Move(sourcePath string, destPath string) error { + d.mutex.Lock() + defer d.mutex.Unlock() + contents, ok := d.storage[sourcePath] + if !ok { + return storagedriver.PathNotFoundError{sourcePath} + } + d.storage[destPath] = contents + delete(d.storage, sourcePath) + return nil +} + +func (d *InMemoryDriver) Delete(path string) error { + d.mutex.Lock() + defer d.mutex.Unlock() + subPaths := make([]string, 0) + for k := range d.storage { + if strings.HasPrefix(k, path) { + subPaths = append(subPaths, k) + } + } + + if len(subPaths) == 0 { + return storagedriver.PathNotFoundError{path} + } + + for _, subPath := range subPaths { + delete(d.storage, subPath) + } + return nil +} diff --git a/storagedriver/inmemory/inmemory_test.go b/storagedriver/inmemory/inmemory_test.go new file mode 100644 index 00000000..fa62d30d --- /dev/null +++ b/storagedriver/inmemory/inmemory_test.go @@ -0,0 +1,20 @@ +package inmemory + +import ( + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/testsuites" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +func init() { + inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { + return NewDriver(), nil + } + testsuites.RegisterInProcessSuite(inmemoryDriverConstructor) + testsuites.RegisterIPCSuite("inmemory", nil) +} diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go new file mode 100644 index 00000000..c4e50a4d --- /dev/null +++ b/storagedriver/ipc/client.go @@ -0,0 +1,285 @@ +package ipc + +import ( + "encoding/json" + "io" + "net" + "os" + "os/exec" + "path" + "syscall" + + "github.com/docker/libchan" + "github.com/docker/libchan/spdy" +) + +type StorageDriverClient struct { + subprocess *exec.Cmd + socket *os.File + transport *spdy.Transport + sender libchan.Sender +} + +func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) { + paramsBytes, err := json.Marshal(parameters) + if err != nil { + return nil, err + } + + driverPath := os.ExpandEnv(path.Join("$GOPATH", "bin", name)) + if _, err := os.Stat(driverPath); os.IsNotExist(err) { + driverPath = path.Join(path.Dir(os.Args[0]), name) + } + if _, err := os.Stat(driverPath); os.IsNotExist(err) { + driverPath, err = exec.LookPath(name) + if err != nil { + return nil, err + } + } + + command := exec.Command(driverPath, string(paramsBytes)) + + return &StorageDriverClient{ + subprocess: command, + }, nil +} + +func (driver *StorageDriverClient) Start() error { + fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) + if err != nil { + return err + } + + childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket") + parentSocket := os.NewFile(uintptr(fileDescriptors[1]), "parentSocket") + + driver.subprocess.Stdout = os.Stdout + driver.subprocess.Stderr = os.Stderr + driver.subprocess.ExtraFiles = []*os.File{childSocket} + + if err = driver.subprocess.Start(); err != nil { + parentSocket.Close() + return err + } + + if err = childSocket.Close(); err != nil { + parentSocket.Close() + return err + } + + connection, err := net.FileConn(parentSocket) + if err != nil { + parentSocket.Close() + return err + } + transport, err := spdy.NewClientTransport(connection) + if err != nil { + parentSocket.Close() + return err + } + sender, err := transport.NewSendChannel() + if err != nil { + transport.Close() + parentSocket.Close() + return err + } + + driver.socket = parentSocket + driver.transport = transport + driver.sender = sender + + return nil +} + +func (driver *StorageDriverClient) Stop() error { + closeSenderErr := driver.sender.Close() + closeTransportErr := driver.transport.Close() + closeSocketErr := driver.socket.Close() + killErr := driver.subprocess.Process.Kill() + + if closeSenderErr != nil { + return closeSenderErr + } else if closeTransportErr != nil { + return closeTransportErr + } else if closeSocketErr != nil { + return closeSocketErr + } + return killErr +} + +func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response GetContentResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Content, nil +} + +func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Contents": contents} + err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response PutContentResponse + err = receiver.Receive(&response) + if err != nil { + panic(err) + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Offset": offset} + err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response ReadStreamResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Reader, nil +} + +func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)} + err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response WriteStreamResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "ResumeWritePosition", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return 0, err + } + + var response ResumeWritePositionResponse + err = receiver.Receive(&response) + if err != nil { + return 0, err + } + + if response.Error != nil { + return 0, response.Error + } + + return response.Position, nil +} + +func (driver *StorageDriverClient) List(prefix string) ([]string, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Prefix": prefix} + err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response ListResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Keys, nil +} + +func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} + err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response MoveResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) Delete(path string) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response DeleteResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go new file mode 100644 index 00000000..ab960b82 --- /dev/null +++ b/storagedriver/ipc/ipc.go @@ -0,0 +1,83 @@ +package ipc + +import ( + "errors" + "fmt" + "io" + "reflect" + + "github.com/docker/libchan" +) + +type Request struct { + Type string + Parameters map[string]interface{} + ResponseChannel libchan.Sender +} + +type noWriteReadWriteCloser struct { + io.ReadCloser +} + +func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { + return 0, errors.New("Write unsupported") +} + +func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser { + return noWriteReadWriteCloser{readCloser} +} + +type responseError struct { + Type string + Message string +} + +func ResponseError(err error) *responseError { + if err == nil { + return nil + } + return &responseError{ + Type: reflect.TypeOf(err).String(), + Message: err.Error(), + } +} + +func (err *responseError) Error() string { + return fmt.Sprintf("%s: %s", err.Type, err.Message) +} + +type GetContentResponse struct { + Content []byte + Error *responseError +} + +type PutContentResponse struct { + Error *responseError +} + +type ReadStreamResponse struct { + Reader io.ReadWriteCloser + Error *responseError +} + +type WriteStreamResponse struct { + Error *responseError +} + +type ResumeWritePositionResponse struct { + Position uint64 + Error *responseError +} + +type ListResponse struct { + Keys []string + Error *responseError +} + +type MoveResponse struct { + Error *responseError +} + +type DeleteResponse struct { + Error *responseError +} diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go new file mode 100644 index 00000000..2e240f42 --- /dev/null +++ b/storagedriver/ipc/server.go @@ -0,0 +1,160 @@ +package ipc + +import ( + "io" + "net" + "os" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/libchan" + "github.com/docker/libchan/spdy" +) + +func Server(driver storagedriver.StorageDriver) error { + childSocket := os.NewFile(3, "childSocket") + defer childSocket.Close() + conn, err := net.FileConn(childSocket) + if err != nil { + panic(err) + } + defer conn.Close() + if transport, err := spdy.NewServerTransport(conn); err != nil { + panic(err) + } else { + for { + receiver, err := transport.WaitReceiveChannel() + if err != nil { + panic(err) + } + go receive(driver, receiver) + } + return nil + } +} + +func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { + for { + var request Request + err := receiver.Receive(&request) + if err != nil { + panic(err) + } + go handleRequest(driver, request) + } +} + +func handleRequest(driver storagedriver.StorageDriver, request Request) { + + switch request.Type { + case "GetContent": + path, _ := request.Parameters["Path"].(string) + content, err := driver.GetContent(path) + response := GetContentResponse{ + Content: content, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "PutContent": + path, _ := request.Parameters["Path"].(string) + contents, _ := request.Parameters["Contents"].([]byte) + err := driver.PutContent(path, contents) + response := PutContentResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "ReadStream": + var offset uint64 + + path, _ := request.Parameters["Path"].(string) + offset, ok := request.Parameters["Offset"].(uint64) + if !ok { + offsetSigned, _ := request.Parameters["Offset"].(int64) + offset = uint64(offsetSigned) + } + reader, err := driver.ReadStream(path, offset) + var response ReadStreamResponse + if err != nil { + response = ReadStreamResponse{Error: ResponseError(err)} + } else { + response = ReadStreamResponse{Reader: WrapReadCloser(reader)} + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "WriteStream": + var offset uint64 + + path, _ := request.Parameters["Path"].(string) + offset, ok := request.Parameters["Offset"].(uint64) + if !ok { + offsetSigned, _ := request.Parameters["Offset"].(int64) + offset = uint64(offsetSigned) + } + size, ok := request.Parameters["Size"].(uint64) + if !ok { + sizeSigned, _ := request.Parameters["Size"].(int64) + size = uint64(sizeSigned) + } + reader, _ := request.Parameters["Reader"].(io.ReadCloser) + err := driver.WriteStream(path, offset, size, reader) + response := WriteStreamResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "ResumeWritePosition": + path, _ := request.Parameters["Path"].(string) + position, err := driver.ResumeWritePosition(path) + response := ResumeWritePositionResponse{ + Position: position, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "List": + prefix, _ := request.Parameters["Prefix"].(string) + keys, err := driver.List(prefix) + response := ListResponse{ + Keys: keys, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "Move": + sourcePath, _ := request.Parameters["SourcePath"].(string) + destPath, _ := request.Parameters["DestPath"].(string) + err := driver.Move(sourcePath, destPath) + response := MoveResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "Delete": + path, _ := request.Parameters["Path"].(string) + err := driver.Delete(path) + response := DeleteResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + default: + panic(request) + } +} diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go new file mode 100644 index 00000000..bfbfc110 --- /dev/null +++ b/storagedriver/storagedriver.go @@ -0,0 +1,34 @@ +package storagedriver + +import ( + "fmt" + "io" +) + +type StorageDriver interface { + GetContent(path string) ([]byte, error) + PutContent(path string, content []byte) error + ReadStream(path string, offset uint64) (io.ReadCloser, error) + WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + ResumeWritePosition(path string) (uint64, error) + List(prefix string) ([]string, error) + Move(sourcePath string, destPath string) error + Delete(path string) error +} + +type PathNotFoundError struct { + Path string +} + +func (err PathNotFoundError) Error() string { + return fmt.Sprintf("Path not found: %s", err.Path) +} + +type InvalidOffsetError struct { + Path string + Offset uint64 +} + +func (err InvalidOffsetError) Error() string { + return fmt.Sprintf("Invalid offset: %d for path: %s", err.Offset, err.Path) +} diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go new file mode 100644 index 00000000..7ca196d6 --- /dev/null +++ b/storagedriver/testsuites/testsuites.go @@ -0,0 +1,353 @@ +package testsuites + +import ( + "bytes" + "io/ioutil" + "math/rand" + "path" + "sort" + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/ipc" + + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner +func Test(t *testing.T) { TestingT(t) } + +func RegisterInProcessSuite(driverConstructor DriverConstructor) { + Suite(&DriverSuite{ + Constructor: driverConstructor, + }) +} + +func RegisterIPCSuite(driverName string, ipcParams map[string]string) { + suite := &DriverSuite{ + Constructor: func() (storagedriver.StorageDriver, error) { + d, err := ipc.NewDriverClient(driverName, ipcParams) + if err != nil { + return nil, err + } + err = d.Start() + if err != nil { + return nil, err + } + return d, nil + }, + } + suite.Teardown = func() error { + driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) + return driverClient.Stop() + } + Suite(suite) +} + +type DriverConstructor func() (storagedriver.StorageDriver, error) +type DriverTeardown func() error + +type DriverSuite struct { + Constructor DriverConstructor + Teardown DriverTeardown + storagedriver.StorageDriver +} + +type TestDriverConfig struct { + name string + params map[string]string +} + +func (suite *DriverSuite) SetUpSuite(c *C) { + d, err := suite.Constructor() + c.Assert(err, IsNil) + suite.StorageDriver = d +} + +func (suite *DriverSuite) TearDownSuite(c *C) { + if suite.Teardown != nil { + err := suite.Teardown() + c.Assert(err, IsNil) + } +} + +func (suite *DriverSuite) TestWriteRead1(c *C) { + filename := randomString(32) + contents := []byte("a") + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead2(c *C) { + filename := randomString(32) + contents := []byte("\xc3\x9f") + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead3(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead4(c *C) { + filename := randomString(32) + contents := []byte(randomString(1024 * 1024)) + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestReadNonexistent(c *C) { + filename := randomString(32) + _, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestWriteReadStreams1(c *C) { + filename := randomString(32) + contents := []byte("a") + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams2(c *C) { + filename := randomString(32) + contents := []byte("\xc3\x9f") + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams3(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams4(c *C) { + filename := randomString(32) + contents := []byte(randomString(1024 * 1024)) + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestContinueStreamAppend(c *C) { + filename := randomString(32) + + chunkSize := uint64(32) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + + fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + + err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1))) + c.Assert(err, IsNil) + + offset, err := suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + if offset > chunkSize { + c.Fatalf("Offset too large, %d > %d", offset, chunkSize) + } + err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) + c.Assert(err, IsNil) + + offset, err = suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + if offset > 2*chunkSize { + c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) + } + + err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) + c.Assert(err, IsNil) + + received, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, IsNil) + c.Assert(received, DeepEquals, fullContents) + + offset, err = suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + c.Assert(offset, Equals, uint64(3*chunkSize)) +} + +func (suite *DriverSuite) TestReadStreamWithOffset(c *C) { + filename := randomString(32) + + chunkSize := uint64(32) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + + err := suite.StorageDriver.PutContent(filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) + c.Assert(err, IsNil) + + reader, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err := ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, append(contentsChunk2, contentsChunk3...)) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*2) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contentsChunk3) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, []byte{}) +} + +func (suite *DriverSuite) TestReadNonexistentStream(c *C) { + filename := randomString(32) + _, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestList(c *C) { + rootDirectory := randomString(uint64(8 + rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFiles := make([]string, 50) + for i := 0; i < len(childFiles); i++ { + childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFiles[i] = childFile + err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) + c.Assert(err, IsNil) + } + sort.Strings(childFiles) + + keys, err := suite.StorageDriver.List(rootDirectory) + c.Assert(err, IsNil) + c.Assert(keys, DeepEquals, []string{parentDirectory}) + + keys, err = suite.StorageDriver.List(parentDirectory) + c.Assert(err, IsNil) + + sort.Strings(keys) + c.Assert(keys, DeepEquals, childFiles) +} + +func (suite *DriverSuite) TestMove(c *C) { + contents := []byte(randomString(32)) + sourcePath := randomString(32) + destPath := randomString(32) + + err := suite.StorageDriver.PutContent(sourcePath, contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Move(sourcePath, destPath) + c.Assert(err, IsNil) + + received, err := suite.StorageDriver.GetContent(destPath) + c.Assert(err, IsNil) + c.Assert(received, DeepEquals, contents) + + _, err = suite.StorageDriver.GetContent(sourcePath) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestMoveNonexistent(c *C) { + sourcePath := randomString(32) + destPath := randomString(32) + + err := suite.StorageDriver.Move(sourcePath, destPath) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemove(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + + err := suite.StorageDriver.PutContent(filename, contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) + + _, err = suite.StorageDriver.GetContent(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemoveNonexistent(c *C) { + filename := randomString(32) + err := suite.StorageDriver.Delete(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemoveFolder(c *C) { + dirname := randomString(32) + filename1 := randomString(32) + filename2 := randomString(32) + contents := []byte(randomString(32)) + + err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.PutContent(path.Join(dirname, filename2), contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Delete(dirname) + c.Assert(err, IsNil) + + _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1)) + c.Assert(err, NotNil) + + _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2)) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) { + err := suite.StorageDriver.PutContent(filename, contents) + c.Assert(err, IsNil) + + readContents, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contents) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) +} + +func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) { + err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + c.Assert(err, IsNil) + + reader, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err := ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contents) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) +} + +var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") + +func randomString(length uint64) string { + b := make([]byte, length) + for i := range b { + b[i] = pathChars[rand.Intn(len(pathChars))] + } + return string(b) +} From 47ca8be42f47edbaa50644a717ddb2932b20ef61 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Fri, 24 Oct 2014 16:36:17 -0700 Subject: [PATCH 02/12] Slight additions/modifications to the test suite --- storagedriver/testsuites/testsuites.go | 37 ++++++++++++-------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 7ca196d6..dae5cc08 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -127,8 +127,9 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) { func (suite *DriverSuite) TestContinueStreamAppend(c *C) { filename := randomString(32) + defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(32) + chunkSize := uint64(5 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) @@ -159,14 +160,11 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *C) { received, err := suite.StorageDriver.GetContent(filename) c.Assert(err, IsNil) c.Assert(received, DeepEquals, fullContents) - - offset, err = suite.StorageDriver.ResumeWritePosition(filename) - c.Assert(err, IsNil) - c.Assert(offset, Equals, uint64(3*chunkSize)) } func (suite *DriverSuite) TestReadStreamWithOffset(c *C) { filename := randomString(32) + defer suite.StorageDriver.Delete(filename) chunkSize := uint64(32) @@ -203,15 +201,6 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *C) { c.Assert(err, IsNil) c.Assert(readContents, DeepEquals, contentsChunk3) - - reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) - c.Assert(err, IsNil) - defer reader.Close() - - readContents, err = ioutil.ReadAll(reader) - c.Assert(err, IsNil) - - c.Assert(readContents, DeepEquals, []byte{}) } func (suite *DriverSuite) TestReadNonexistentStream(c *C) { @@ -222,6 +211,8 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *C) { func (suite *DriverSuite) TestList(c *C) { rootDirectory := randomString(uint64(8 + rand.Intn(8))) + defer suite.StorageDriver.Delete(rootDirectory) + parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) childFiles := make([]string, 50) for i := 0; i < len(childFiles); i++ { @@ -248,6 +239,9 @@ func (suite *DriverSuite) TestMove(c *C) { sourcePath := randomString(32) destPath := randomString(32) + defer suite.StorageDriver.Delete(sourcePath) + defer suite.StorageDriver.Delete(destPath) + err := suite.StorageDriver.PutContent(sourcePath, contents) c.Assert(err, IsNil) @@ -274,6 +268,8 @@ func (suite *DriverSuite) TestRemove(c *C) { filename := randomString(32) contents := []byte(randomString(32)) + defer suite.StorageDriver.Delete(filename) + err := suite.StorageDriver.PutContent(filename, contents) c.Assert(err, IsNil) @@ -296,6 +292,9 @@ func (suite *DriverSuite) TestRemoveFolder(c *C) { filename2 := randomString(32) contents := []byte(randomString(32)) + defer suite.StorageDriver.Delete(path.Join(dirname, filename1)) + defer suite.StorageDriver.Delete(path.Join(dirname, filename2)) + err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents) c.Assert(err, IsNil) @@ -313,6 +312,8 @@ func (suite *DriverSuite) TestRemoveFolder(c *C) { } func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) { + defer suite.StorageDriver.Delete(filename) + err := suite.StorageDriver.PutContent(filename, contents) c.Assert(err, IsNil) @@ -320,12 +321,11 @@ func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expe c.Assert(err, IsNil) c.Assert(readContents, DeepEquals, contents) - - err = suite.StorageDriver.Delete(filename) - c.Assert(err, IsNil) } func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) { + defer suite.StorageDriver.Delete(filename) + err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) c.Assert(err, IsNil) @@ -337,9 +337,6 @@ func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, content c.Assert(err, IsNil) c.Assert(readContents, DeepEquals, contents) - - err = suite.StorageDriver.Delete(filename) - c.Assert(err, IsNil) } var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") From 134287336765f0df516415d74cf7e91bcf7e81b6 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Fri, 24 Oct 2014 16:37:25 -0700 Subject: [PATCH 03/12] Add s3 driver for the new Storage Layer API --- main/storagedriver/s3/s3.go | 57 ++++++++ storagedriver/s3/s3.go | 257 ++++++++++++++++++++++++++++++++++++ storagedriver/s3/s3_test.go | 29 ++++ 3 files changed, 343 insertions(+) create mode 100644 main/storagedriver/s3/s3.go create mode 100644 storagedriver/s3/s3.go create mode 100644 storagedriver/s3/s3_test.go diff --git a/main/storagedriver/s3/s3.go b/main/storagedriver/s3/s3.go new file mode 100644 index 00000000..0fbc376c --- /dev/null +++ b/main/storagedriver/s3/s3.go @@ -0,0 +1,57 @@ +package main + +import ( + "encoding/json" + "os" + "strconv" + + "github.com/crowdmob/goamz/aws" + "github.com/docker/docker-registry/storagedriver/ipc" + "github.com/docker/docker-registry/storagedriver/s3" +) + +func main() { + parametersBytes := []byte(os.Args[1]) + var parameters map[string]interface{} + err := json.Unmarshal(parametersBytes, ¶meters) + if err != nil { + panic(err) + } + + accessKey, ok := parameters["accessKey"].(string) + if !ok || accessKey == "" { + panic("No accessKey parameter") + } + + secretKey, ok := parameters["secretKey"].(string) + if !ok || secretKey == "" { + panic("No secretKey parameter") + } + + region, ok := parameters["region"].(string) + if !ok || region == "" { + panic("No region parameter") + } + + bucket, ok := parameters["bucket"].(string) + if !ok || bucket == "" { + panic("No bucket parameter") + } + + encrypt, ok := parameters["encrypt"].(string) + if !ok { + panic("No encrypt parameter") + } + + encryptBool, err := strconv.ParseBool(encrypt) + if err != nil { + panic(err) + } + + driver, err := s3.NewDriver(accessKey, secretKey, aws.GetRegion(region), encryptBool, bucket) + if err != nil { + panic(err) + } + + ipc.Server(driver) +} diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go new file mode 100644 index 00000000..26561000 --- /dev/null +++ b/storagedriver/s3/s3.go @@ -0,0 +1,257 @@ +package s3 + +import ( + "bytes" + "io" + "net/http" + "strconv" + + "github.com/crowdmob/goamz/aws" + "github.com/crowdmob/goamz/s3" + "github.com/docker/docker-registry/storagedriver" +) + +/* Chunks need to be at least 5MB to store with a multipart upload on S3 */ +const minChunkSize = uint64(5 * 1024 * 1024) + +/* The largest amount of parts you can request from S3 */ +const listPartsMax = 1000 + +type S3Driver struct { + S3 *s3.S3 + Bucket *s3.Bucket + Encrypt bool +} + +func NewDriver(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) { + auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey} + s3obj := s3.New(auth, region) + bucket := s3obj.Bucket(bucketName) + + if err := bucket.PutBucket(s3.PublicRead); err != nil { + s3Err, ok := err.(*s3.Error) + if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") { + return nil, err + } + } + + return &S3Driver{s3obj, bucket, encrypt}, nil +} + +func (d *S3Driver) GetContent(path string) ([]byte, error) { + return d.Bucket.Get(path) +} + +func (d *S3Driver) PutContent(path string, contents []byte) error { + return d.Bucket.Put(path, contents, d.getContentType(), d.getPermissions(), d.getOptions()) +} + +func (d *S3Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + headers := make(http.Header) + headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-") + + resp, err := d.Bucket.GetResponseWithHeaders(path, headers) + if resp != nil { + return resp.Body, err + } + + return nil, err +} + +func (d *S3Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { + defer reader.Close() + + chunkSize := minChunkSize + for size/chunkSize >= listPartsMax { + chunkSize *= 2 + } + + partNumber := 1 + totalRead := uint64(0) + multi, parts, err := d.getAllParts(path) + if err != nil { + return err + } + + if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) { + return storagedriver.InvalidOffsetError{path, offset} + } + + if len(parts) > 0 { + partNumber = int(offset/chunkSize) + 1 + totalRead = offset + parts = parts[0 : partNumber-1] + } + + buf := make([]byte, chunkSize) + for { + bytesRead, err := io.ReadFull(reader, buf) + totalRead += uint64(bytesRead) + + if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + return err + } else if (uint64(bytesRead) < chunkSize) && totalRead != size { + break + } else { + part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) + if err != nil { + + return err + } + + parts = append(parts, part) + if totalRead == size { + multi.Complete(parts) + break + } + + partNumber++ + } + } + + return nil +} + +func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) { + _, parts, err := d.getAllParts(path) + if err != nil { + return 0, err + } + + if len(parts) == 0 { + return 0, nil + } + + return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil +} + +func (d *S3Driver) List(prefix string) ([]string, error) { + listResponse, err := d.Bucket.List(prefix+"/", "/", "", listPartsMax) + if err != nil { + return nil, err + } + + files := []string{} + directories := []string{} + + for len(listResponse.Contents) > 0 || len(listResponse.CommonPrefixes) > 0 { + for _, key := range listResponse.Contents { + files = append(files, key.Key) + } + + for _, commonPrefix := range listResponse.CommonPrefixes { + directories = append(directories, commonPrefix[0:len(commonPrefix)-1]) + } + + lastFile := "" + lastDirectory := "" + lastMarker := "" + + if len(files) > 0 { + lastFile = files[len(files)-1] + } + + if len(directories) > 0 { + lastDirectory = directories[len(directories)-1] + "/" + } + + if lastDirectory > lastFile { + lastMarker = lastDirectory + } else { + lastMarker = lastFile + } + + listResponse, err = d.Bucket.List(prefix+"/", "/", lastMarker, listPartsMax) + if err != nil { + return nil, err + } + } + + return append(files, directories...), nil +} + +func (d *S3Driver) Move(sourcePath string, destPath string) error { + /* This is terrible, but aws doesn't have an actual move. */ + _, err := d.Bucket.PutCopy(destPath, d.getPermissions(), s3.CopyOptions{d.getOptions(), "", d.getContentType()}, d.Bucket.Name+"/"+sourcePath) + if err != nil { + return err + } + + return d.Delete(sourcePath) +} + +func (d *S3Driver) Delete(path string) error { + listResponse, err := d.Bucket.List(path, "", "", listPartsMax) + if err != nil || len(listResponse.Contents) == 0 { + return storagedriver.PathNotFoundError{path} + } + + s3Objects := make([]s3.Object, listPartsMax) + + for len(listResponse.Contents) > 0 { + for index, key := range listResponse.Contents { + s3Objects[index].Key = key.Key + } + + err := d.Bucket.DelMulti(s3.Delete{false, s3Objects[0:len(listResponse.Contents)]}) + if err != nil { + return nil + } + + listResponse, err = d.Bucket.List(path, "", "", listPartsMax) + if err != nil { + return err + } + } + + return nil +} + +func (d *S3Driver) getHighestIdMulti(path string) (multi *s3.Multi, err error) { + multis, _, err := d.Bucket.ListMulti(path, "") + if err != nil && !hasCode(err, "NoSuchUpload") { + return nil, err + } + + uploadId := "" + + if len(multis) > 0 { + for _, m := range multis { + if m.Key == path && m.UploadId >= uploadId { + uploadId = m.UploadId + multi = m + } + } + return multi, nil + } else { + multi, err := d.Bucket.InitMulti(path, d.getContentType(), d.getPermissions(), d.getOptions()) + return multi, err + } +} + +func (d *S3Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) { + multi, err := d.getHighestIdMulti(path) + if err != nil { + return nil, nil, err + } + + parts, err := multi.ListParts() + return multi, parts, err +} + +func hasCode(err error, code string) bool { + s3err, ok := err.(*aws.Error) + return ok && s3err.Code == code +} + +func (d *S3Driver) getOptions() s3.Options { + return s3.Options{SSE: d.Encrypt} +} + +func (d *S3Driver) getPermissions() s3.ACL { + return s3.Private +} + +func (d *S3Driver) getContentType() string { + return "application/octet-stream" +} diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go new file mode 100644 index 00000000..400ec7ad --- /dev/null +++ b/storagedriver/s3/s3_test.go @@ -0,0 +1,29 @@ +package s3 + +import ( + "os" + "testing" + + "github.com/crowdmob/goamz/aws" + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/testsuites" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +func init() { + accessKey := os.Getenv("ACCESS_KEY") + secretKey := os.Getenv("SECRET_KEY") + region := os.Getenv("AWS_REGION") + bucket := os.Getenv("S3_BUCKET") + encrypt := os.Getenv("S3_ENCRYPT") + + s3DriverConstructor := func() (storagedriver.StorageDriver, error) { + return NewDriver(accessKey, secretKey, aws.GetRegion(region), true, bucket) + } + + testsuites.RegisterInProcessSuite(s3DriverConstructor) + testsuites.RegisterIPCSuite("s3", map[string]string{"accessKey": accessKey, "secretKey": secretKey, "region": region, "bucket": bucket, "encrypt": encrypt}) +} From 7c892deb1c12c8587b24a2f57cad8f56f9a0817d Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Fri, 24 Oct 2014 18:33:23 -0700 Subject: [PATCH 04/12] Uses streams internally for ipc Get/Put Content This is done because libchan/spdystream does not currently support sending serialzied objects of size larger than 16MB See https://github.com/docker/libchan/issues/65 --- storagedriver/ipc/client.go | 18 ++++++++++++------ storagedriver/ipc/ipc.go | 20 +++++++++----------- storagedriver/ipc/server.go | 23 +++++++++++++++-------- storagedriver/testsuites/testsuites.go | 2 +- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c4e50a4d..0025d2bc 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -1,8 +1,10 @@ package ipc import ( + "bytes" "encoding/json" "io" + "io/ioutil" "net" "os" "os/exec" @@ -116,7 +118,7 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { return nil, err } - var response GetContentResponse + var response ReadStreamResponse err = receiver.Receive(&response) if err != nil { return nil, err @@ -126,22 +128,26 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { return nil, response.Error } - return response.Content, nil + defer response.Reader.Close() + contents, err := ioutil.ReadAll(response.Reader) + if err != nil { + return nil, err + } + return contents, nil } func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Contents": contents} + params := map[string]interface{}{"Path": path, "Reader": WrapReader(bytes.NewReader(contents))} err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err } - var response PutContentResponse + var response WriteStreamResponse err = receiver.Receive(&response) if err != nil { - panic(err) return err } @@ -177,7 +183,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)} + params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReader(reader)} err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index ab960b82..89b0cf20 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "reflect" "github.com/docker/libchan" @@ -23,8 +24,14 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { return 0, errors.New("Write unsupported") } -func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser { - return noWriteReadWriteCloser{readCloser} +func WrapReader(reader io.Reader) io.ReadWriteCloser { + if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok { + return readWriteCloser + } else if readCloser, ok := reader.(io.ReadCloser); ok { + return noWriteReadWriteCloser{readCloser} + } else { + return noWriteReadWriteCloser{ioutil.NopCloser(reader)} + } } type responseError struct { @@ -46,15 +53,6 @@ func (err *responseError) Error() string { return fmt.Sprintf("%s: %s", err.Type, err.Message) } -type GetContentResponse struct { - Content []byte - Error *responseError -} - -type PutContentResponse struct { - Error *responseError -} - type ReadStreamResponse struct { Reader io.ReadWriteCloser Error *responseError diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 2e240f42..0d39a31b 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -1,7 +1,9 @@ package ipc import ( + "bytes" "io" + "io/ioutil" "net" "os" @@ -44,14 +46,15 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { } func handleRequest(driver storagedriver.StorageDriver, request Request) { - switch request.Type { case "GetContent": path, _ := request.Parameters["Path"].(string) content, err := driver.GetContent(path) - response := GetContentResponse{ - Content: content, - Error: ResponseError(err), + var response ReadStreamResponse + if err != nil { + response = ReadStreamResponse{Error: ResponseError(err)} + } else { + response = ReadStreamResponse{Reader: WrapReader(bytes.NewReader(content))} } err = request.ResponseChannel.Send(&response) if err != nil { @@ -59,9 +62,13 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { } case "PutContent": path, _ := request.Parameters["Path"].(string) - contents, _ := request.Parameters["Contents"].([]byte) - err := driver.PutContent(path, contents) - response := PutContentResponse{ + reader, _ := request.Parameters["Reader"].(io.ReadCloser) + contents, err := ioutil.ReadAll(reader) + defer reader.Close() + if err == nil { + err = driver.PutContent(path, contents) + } + response := WriteStreamResponse{ Error: ResponseError(err), } err = request.ResponseChannel.Send(&response) @@ -82,7 +89,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { if err != nil { response = ReadStreamResponse{Error: ResponseError(err)} } else { - response = ReadStreamResponse{Reader: WrapReadCloser(reader)} + response = ReadStreamResponse{Reader: WrapReader(reader)} } err = request.ResponseChannel.Send(&response) if err != nil { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 7ca196d6..d9d3dead 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -128,7 +128,7 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) { func (suite *DriverSuite) TestContinueStreamAppend(c *C) { filename := randomString(32) - chunkSize := uint64(32) + chunkSize := uint64(5 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) From e3a5955cd27f011e2fd1777336d45426323f4e91 Mon Sep 17 00:00:00 2001 From: Andrey Kostov Date: Sun, 26 Oct 2014 10:00:53 -0700 Subject: [PATCH 05/12] Unify permissions settings --- storagedriver/s3/s3.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index 26561000..a73e5e3d 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -28,7 +28,7 @@ func NewDriver(accessKey string, secretKey string, region aws.Region, encrypt bo s3obj := s3.New(auth, region) bucket := s3obj.Bucket(bucketName) - if err := bucket.PutBucket(s3.PublicRead); err != nil { + if err := bucket.PutBucket(getPermissions()); err != nil { s3Err, ok := err.(*s3.Error) if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") { return nil, err @@ -43,7 +43,7 @@ func (d *S3Driver) GetContent(path string) ([]byte, error) { } func (d *S3Driver) PutContent(path string, contents []byte) error { - return d.Bucket.Put(path, contents, d.getContentType(), d.getPermissions(), d.getOptions()) + return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions()) } func (d *S3Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { @@ -172,7 +172,7 @@ func (d *S3Driver) List(prefix string) ([]string, error) { func (d *S3Driver) Move(sourcePath string, destPath string) error { /* This is terrible, but aws doesn't have an actual move. */ - _, err := d.Bucket.PutCopy(destPath, d.getPermissions(), s3.CopyOptions{d.getOptions(), "", d.getContentType()}, d.Bucket.Name+"/"+sourcePath) + _, err := d.Bucket.PutCopy(destPath, getPermissions(), s3.CopyOptions{d.getOptions(), "", d.getContentType()}, d.Bucket.Name+"/"+sourcePath) if err != nil { return err } @@ -224,7 +224,7 @@ func (d *S3Driver) getHighestIdMulti(path string) (multi *s3.Multi, err error) { } return multi, nil } else { - multi, err := d.Bucket.InitMulti(path, d.getContentType(), d.getPermissions(), d.getOptions()) + multi, err := d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions()) return multi, err } } @@ -248,7 +248,7 @@ func (d *S3Driver) getOptions() s3.Options { return s3.Options{SSE: d.Encrypt} } -func (d *S3Driver) getPermissions() s3.ACL { +func getPermissions() s3.ACL { return s3.Private } From ff81f3a71995658464a8a02407b1fbba9224ed91 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 27 Oct 2014 13:24:07 -0700 Subject: [PATCH 06/12] Adds conditional SkipCheck for storage driver tests --- storagedriver/filesystem/filesystem_test.go | 4 ++-- storagedriver/inmemory/inmemory_test.go | 4 ++-- storagedriver/s3/s3_test.go | 11 +++++++++-- storagedriver/testsuites/testsuites.go | 20 +++++++++++++++++--- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/storagedriver/filesystem/filesystem_test.go b/storagedriver/filesystem/filesystem_test.go index c445e178..15ef9663 100644 --- a/storagedriver/filesystem/filesystem_test.go +++ b/storagedriver/filesystem/filesystem_test.go @@ -19,6 +19,6 @@ func init() { filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { return NewDriver(rootDirectory), nil } - testsuites.RegisterInProcessSuite(filesystemDriverConstructor) - testsuites.RegisterIPCSuite("filesystem", map[string]string{"RootDirectory": rootDirectory}) + testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip) + testsuites.RegisterIPCSuite("filesystem", map[string]string{"RootDirectory": rootDirectory}, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/inmemory_test.go b/storagedriver/inmemory/inmemory_test.go index fa62d30d..accbb5f8 100644 --- a/storagedriver/inmemory/inmemory_test.go +++ b/storagedriver/inmemory/inmemory_test.go @@ -15,6 +15,6 @@ func init() { inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { return NewDriver(), nil } - testsuites.RegisterInProcessSuite(inmemoryDriverConstructor) - testsuites.RegisterIPCSuite("inmemory", nil) + testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) + testsuites.RegisterIPCSuite("inmemory", nil, testsuites.NeverSkip) } diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go index 400ec7ad..b6862ab9 100644 --- a/storagedriver/s3/s3_test.go +++ b/storagedriver/s3/s3_test.go @@ -24,6 +24,13 @@ func init() { return NewDriver(accessKey, secretKey, aws.GetRegion(region), true, bucket) } - testsuites.RegisterInProcessSuite(s3DriverConstructor) - testsuites.RegisterIPCSuite("s3", map[string]string{"accessKey": accessKey, "secretKey": secretKey, "region": region, "bucket": bucket, "encrypt": encrypt}) + skipCheck := func() string { + if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" { + return "Must set ACCESS_KEY, SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" + } + return "" + } + + testsuites.RegisterInProcessSuite(s3DriverConstructor, skipCheck) + testsuites.RegisterIPCSuite("s3", map[string]string{"accessKey": accessKey, "secretKey": secretKey, "region": region, "bucket": bucket, "encrypt": encrypt}, skipCheck) } diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index dae5cc08..ff93b038 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -17,13 +17,14 @@ import ( // Hook up gocheck into the "go test" runner func Test(t *testing.T) { TestingT(t) } -func RegisterInProcessSuite(driverConstructor DriverConstructor) { +func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { Suite(&DriverSuite{ Constructor: driverConstructor, + SkipCheck: skipCheck, }) } -func RegisterIPCSuite(driverName string, ipcParams map[string]string) { +func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { suite := &DriverSuite{ Constructor: func() (storagedriver.StorageDriver, error) { d, err := ipc.NewDriverClient(driverName, ipcParams) @@ -36,20 +37,30 @@ func RegisterIPCSuite(driverName string, ipcParams map[string]string) { } return d, nil }, + SkipCheck: skipCheck, } suite.Teardown = func() error { + if suite.StorageDriver == nil { + return nil + } + driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) return driverClient.Stop() } Suite(suite) } +type SkipCheck func() (reason string) + +var NeverSkip = func() string { return "" } + type DriverConstructor func() (storagedriver.StorageDriver, error) type DriverTeardown func() error type DriverSuite struct { Constructor DriverConstructor Teardown DriverTeardown + SkipCheck storagedriver.StorageDriver } @@ -59,6 +70,9 @@ type TestDriverConfig struct { } func (suite *DriverSuite) SetUpSuite(c *C) { + if reason := suite.SkipCheck(); reason != "" { + c.Skip(reason) + } d, err := suite.Constructor() c.Assert(err, IsNil) suite.StorageDriver = d @@ -129,7 +143,7 @@ func (suite *DriverSuite) TestContinueStreamAppend(c *C) { filename := randomString(32) defer suite.StorageDriver.Delete(filename) - chunkSize := uint64(5 * 1024 * 1024) + chunkSize := uint64(10 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize)) From ca0084fad1a033a1b8da5bfbbcbf509701aa253d Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 28 Oct 2014 18:15:40 -0700 Subject: [PATCH 07/12] Adds StorageDriverFactory, unifying creation of StorageDrivers Custom storage drivers can register a factory to create the driver by name, similar to the database/sql package's Register and Open factory.Create returns an in-process driver if registered or an IPC driver if one can be found, erroring otherwise This standardizes parameter passing for creation of storage drivers Also adds documentation for storagedriver package and children --- main/storagedriver/filesystem/filesystem.go | 13 ++-- main/storagedriver/inmemory/inmemory.go | 4 +- main/storagedriver/s3/s3.go | 39 ++---------- storagedriver/factory/factory.go | 64 +++++++++++++++++++ storagedriver/filesystem/filesystem.go | 36 ++++++++++- storagedriver/filesystem/filesystem_test.go | 4 +- storagedriver/inmemory/inmemory.go | 19 +++++- storagedriver/inmemory/inmemory_test.go | 4 +- storagedriver/ipc/client.go | 13 ++++ storagedriver/ipc/ipc.go | 15 +++++ storagedriver/ipc/server.go | 37 ++++++----- storagedriver/s3/s3.go | 68 ++++++++++++++++++++- storagedriver/s3/s3_test.go | 22 +++++-- storagedriver/storagedriver.go | 25 ++++++++ storagedriver/testsuites/testsuites.go | 6 ++ 15 files changed, 290 insertions(+), 79 deletions(-) create mode 100644 storagedriver/factory/factory.go diff --git a/main/storagedriver/filesystem/filesystem.go b/main/storagedriver/filesystem/filesystem.go index 8c0e2677..8a5fc93c 100644 --- a/main/storagedriver/filesystem/filesystem.go +++ b/main/storagedriver/filesystem/filesystem.go @@ -8,19 +8,14 @@ import ( "github.com/docker/docker-registry/storagedriver/ipc" ) +// An out-of-process filesystem driver, intended to be run by ipc.NewDriverClient func main() { parametersBytes := []byte(os.Args[1]) - var parameters map[string]interface{} + var parameters map[string]string err := json.Unmarshal(parametersBytes, ¶meters) if err != nil { panic(err) } - rootDirectory := "/tmp/registry" - if parameters != nil { - rootDirParam, ok := parameters["RootDirectory"].(string) - if ok && rootDirParam != "" { - rootDirectory = rootDirParam - } - } - ipc.Server(filesystem.NewDriver(rootDirectory)) + + ipc.StorageDriverServer(filesystem.FromParameters(parameters)) } diff --git a/main/storagedriver/inmemory/inmemory.go b/main/storagedriver/inmemory/inmemory.go index f55c8d5f..999c05d7 100644 --- a/main/storagedriver/inmemory/inmemory.go +++ b/main/storagedriver/inmemory/inmemory.go @@ -5,6 +5,8 @@ import ( "github.com/docker/docker-registry/storagedriver/ipc" ) +// An out-of-process inmemory driver, intended to be run by ipc.NewDriverClient +// This exists primarily for example and testing purposes func main() { - ipc.Server(inmemory.NewDriver()) + ipc.StorageDriverServer(inmemory.New()) } diff --git a/main/storagedriver/s3/s3.go b/main/storagedriver/s3/s3.go index 0fbc376c..aa5a1180 100644 --- a/main/storagedriver/s3/s3.go +++ b/main/storagedriver/s3/s3.go @@ -3,55 +3,24 @@ package main import ( "encoding/json" "os" - "strconv" - "github.com/crowdmob/goamz/aws" "github.com/docker/docker-registry/storagedriver/ipc" "github.com/docker/docker-registry/storagedriver/s3" ) +// An out-of-process S3 driver, intended to be run by ipc.NewDriverClient func main() { parametersBytes := []byte(os.Args[1]) - var parameters map[string]interface{} + var parameters map[string]string err := json.Unmarshal(parametersBytes, ¶meters) if err != nil { panic(err) } - accessKey, ok := parameters["accessKey"].(string) - if !ok || accessKey == "" { - panic("No accessKey parameter") - } - - secretKey, ok := parameters["secretKey"].(string) - if !ok || secretKey == "" { - panic("No secretKey parameter") - } - - region, ok := parameters["region"].(string) - if !ok || region == "" { - panic("No region parameter") - } - - bucket, ok := parameters["bucket"].(string) - if !ok || bucket == "" { - panic("No bucket parameter") - } - - encrypt, ok := parameters["encrypt"].(string) - if !ok { - panic("No encrypt parameter") - } - - encryptBool, err := strconv.ParseBool(encrypt) + driver, err := s3.FromParameters(parameters) if err != nil { panic(err) } - driver, err := s3.NewDriver(accessKey, secretKey, aws.GetRegion(region), encryptBool, bucket) - if err != nil { - panic(err) - } - - ipc.Server(driver) + ipc.StorageDriverServer(driver) } diff --git a/storagedriver/factory/factory.go b/storagedriver/factory/factory.go new file mode 100644 index 00000000..c13c6c1e --- /dev/null +++ b/storagedriver/factory/factory.go @@ -0,0 +1,64 @@ +package factory + +import ( + "fmt" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/ipc" +) + +// Internal mapping between storage driver names and their respective factories +var driverFactories = make(map[string]StorageDriverFactory) + +// Factory interface for the storagedriver.StorageDriver interface +// Storage drivers should call Register() with a factory to make the driver available by name +type StorageDriverFactory interface { + // Creates and returns a new storagedriver.StorageDriver with the given parameters + // Parameters will vary by driver and may be ignored + // Each parameter key must only consist of lowercase letters and numbers + Create(parameters map[string]string) (storagedriver.StorageDriver, error) +} + +// Register makes a storage driver available by the provided name. +// If Register is called twice with the same name or if driver factory is nil, it panics. +func Register(name string, factory StorageDriverFactory) { + if factory == nil { + panic("Must not provide nil StorageDriverFactory") + } + _, registered := driverFactories[name] + if registered { + panic(fmt.Sprintf("StorageDriverFactory named %s already registered", name)) + } + + driverFactories[name] = factory +} + +// Create a new storagedriver.StorageDriver with the given name and parameters +// To run in-process, the StorageDriverFactory must first be registered with the given name +// If no in-process drivers are found with the given name, this attempts to create an IPC driver +// If no in-process or external drivers are found, an InvalidStorageDriverError is returned +func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) { + driverFactory, ok := driverFactories[name] + if !ok { + // No registered StorageDriverFactory found, try ipc + driverClient, err := ipc.NewDriverClient(name, parameters) + if err != nil { + return nil, InvalidStorageDriverError{name} + } + err = driverClient.Start() + if err != nil { + return nil, err + } + return driverClient, nil + } + return driverFactory.Create(parameters) +} + +// Error returned when attempting to construct an unregistered storage driver +type InvalidStorageDriverError struct { + Name string +} + +func (err InvalidStorageDriverError) Error() string { + return fmt.Sprintf("StorageDriver not registered: %s", err.Name) +} diff --git a/storagedriver/filesystem/filesystem.go b/storagedriver/filesystem/filesystem.go index 79106e37..27ffcf7a 100644 --- a/storagedriver/filesystem/filesystem.go +++ b/storagedriver/filesystem/filesystem.go @@ -8,13 +8,45 @@ import ( "strings" "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" ) +const DriverName = "filesystem" +const DefaultRootDirectory = "/tmp/registry/storage" + +func init() { + factory.Register(DriverName, &filesystemDriverFactory{}) +} + +// Implements the factory.StorageDriverFactory interface +type filesystemDriverFactory struct{} + +func (factory *filesystemDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { + return FromParameters(parameters), nil +} + +// Storage Driver backed by a local filesystem +// All provided paths will be subpaths of the RootDirectory type FilesystemDriver struct { rootDirectory string } -func NewDriver(rootDirectory string) *FilesystemDriver { +// Constructs a new FilesystemDriver with a given parameters map +// Optional Parameters: +// - rootdirectory +func FromParameters(parameters map[string]string) *FilesystemDriver { + var rootDirectory = DefaultRootDirectory + if parameters != nil { + rootDir, ok := parameters["rootdirectory"] + if ok { + rootDirectory = rootDir + } + } + return New(rootDirectory) +} + +// Constructs a new FilesystemDriver with a given rootDirectory +func New(rootDirectory string) *FilesystemDriver { return &FilesystemDriver{rootDirectory} } @@ -22,6 +54,8 @@ func (d *FilesystemDriver) subPath(subPath string) string { return path.Join(d.rootDirectory, subPath) } +// Implement the storagedriver.StorageDriver interface + func (d *FilesystemDriver) GetContent(path string) ([]byte, error) { contents, err := ioutil.ReadFile(d.subPath(path)) if err != nil { diff --git a/storagedriver/filesystem/filesystem_test.go b/storagedriver/filesystem/filesystem_test.go index 15ef9663..7eb4024c 100644 --- a/storagedriver/filesystem/filesystem_test.go +++ b/storagedriver/filesystem/filesystem_test.go @@ -17,8 +17,8 @@ func init() { os.RemoveAll(rootDirectory) filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { - return NewDriver(rootDirectory), nil + return New(rootDirectory), nil } testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite("filesystem", map[string]string{"RootDirectory": rootDirectory}, testsuites.NeverSkip) + testsuites.RegisterIPCSuite(DriverName, map[string]string{"rootdirectory": rootDirectory}, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/inmemory.go b/storagedriver/inmemory/inmemory.go index ea44bb39..2cf1b9f4 100644 --- a/storagedriver/inmemory/inmemory.go +++ b/storagedriver/inmemory/inmemory.go @@ -10,14 +10,31 @@ import ( "sync" "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" ) +const DriverName = "inmemory" + +func init() { + factory.Register(DriverName, &inMemoryDriverFactory{}) +} + +// Implements the factory.StorageDriverFactory interface +type inMemoryDriverFactory struct{} + +func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { + return New(), nil +} + +// InMemory Storage Driver backed by a map +// Intended solely for example and testing purposes type InMemoryDriver struct { storage map[string][]byte mutex sync.RWMutex } -func NewDriver() *InMemoryDriver { +// Constructs a new InMemoryDriver +func New() *InMemoryDriver { return &InMemoryDriver{storage: make(map[string][]byte)} } diff --git a/storagedriver/inmemory/inmemory_test.go b/storagedriver/inmemory/inmemory_test.go index accbb5f8..feea5eab 100644 --- a/storagedriver/inmemory/inmemory_test.go +++ b/storagedriver/inmemory/inmemory_test.go @@ -13,8 +13,8 @@ func Test(t *testing.T) { TestingT(t) } func init() { inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { - return NewDriver(), nil + return New(), nil } testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite("inmemory", nil, testsuites.NeverSkip) + testsuites.RegisterIPCSuite(DriverName, nil, testsuites.NeverSkip) } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index 0025d2bc..6327b156 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -15,6 +15,7 @@ import ( "github.com/docker/libchan/spdy" ) +// Storage Driver implementation using a managed child process communicating over IPC type StorageDriverClient struct { subprocess *exec.Cmd socket *os.File @@ -22,6 +23,13 @@ type StorageDriverClient struct { sender libchan.Sender } +// Constructs a new out-of-process storage driver using the driver name and configuration parameters +// Must call Start() on this driver client before remote method calls can be made +// +// Looks for drivers in the following locations in order: +// - Storage drivers directory (to be determined, yet not implemented) +// - $GOPATH/bin +// - $PATH func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) { paramsBytes, err := json.Marshal(parameters) if err != nil { @@ -46,6 +54,7 @@ func NewDriverClient(name string, parameters map[string]string) (*StorageDriverC }, nil } +// Starts the designated child process storage driver and binds a socket to this process for IPC func (driver *StorageDriverClient) Start() error { fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) if err != nil { @@ -93,6 +102,8 @@ func (driver *StorageDriverClient) Start() error { return nil } +// Stops the child process storage driver +// storagedriver.StorageDriver methods called after Stop() will fail func (driver *StorageDriverClient) Stop() error { closeSenderErr := driver.sender.Close() closeTransportErr := driver.transport.Close() @@ -109,6 +120,8 @@ func (driver *StorageDriverClient) Stop() error { return killErr } +// Implement the storagedriver.StorageDriver interface over IPC + func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { receiver, remoteSender := libchan.Pipe() diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index 89b0cf20..4e7e65c7 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -10,12 +10,16 @@ import ( "github.com/docker/libchan" ) +// Defines a remote method call request +// A return value struct is to be sent over the ResponseChannel type Request struct { Type string Parameters map[string]interface{} ResponseChannel libchan.Sender } +// A simple wrapper around an io.ReadCloser that implements the io.ReadWriteCloser interface +// Writes are disallowed and will return an error if ever called type noWriteReadWriteCloser struct { io.ReadCloser } @@ -24,6 +28,8 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { return 0, errors.New("Write unsupported") } +// Wraps an io.Reader as an io.ReadWriteCloser with a nop Close and unsupported Write method +// Has no effect when an io.ReadWriteCloser is passed in func WrapReader(reader io.Reader) io.ReadWriteCloser { if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok { return readWriteCloser @@ -39,6 +45,7 @@ type responseError struct { Message string } +// Wraps an error in a serializable struct containing the error's type and message func ResponseError(err error) *responseError { if err == nil { return nil @@ -53,29 +60,37 @@ func (err *responseError) Error() string { return fmt.Sprintf("%s: %s", err.Type, err.Message) } +// IPC method call response object definitions + +// Response for a ReadStream request type ReadStreamResponse struct { Reader io.ReadWriteCloser Error *responseError } +// Response for a WriteStream request type WriteStreamResponse struct { Error *responseError } +// Response for a ResumeWritePosition request type ResumeWritePositionResponse struct { Position uint64 Error *responseError } +// Response for a List request type ListResponse struct { Keys []string Error *responseError } +// Response for a Move request type MoveResponse struct { Error *responseError } +// Response for a Delete request type DeleteResponse struct { Error *responseError } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 0d39a31b..0af41d0a 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -6,13 +6,18 @@ import ( "io/ioutil" "net" "os" + "reflect" "github.com/docker/docker-registry/storagedriver" "github.com/docker/libchan" "github.com/docker/libchan/spdy" ) -func Server(driver storagedriver.StorageDriver) error { +// Construct a new IPC server handling requests for the given storagedriver.StorageDriver +// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in client.go +// +// To create a new out-of-process driver, create a main package which calls StorageDriverServer with a storagedriver.StorageDriver +func StorageDriverServer(driver storagedriver.StorageDriver) error { childSocket := os.NewFile(3, "childSocket") defer childSocket.Close() conn, err := net.FileConn(childSocket) @@ -34,6 +39,9 @@ func Server(driver storagedriver.StorageDriver) error { } } +// Receives new storagedriver.StorageDriver method requests and creates a new goroutine to handle each request +// +// Requests are expected to be of type ipc.Request as the parameters are unknown until the request type is deserialized func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { for { var request Request @@ -45,6 +53,8 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { } } +// Handles storagedriver.StorageDriver method requests as defined in client.go +// Responds to requests using the Request.ResponseChannel func handleRequest(driver storagedriver.StorageDriver, request Request) { switch request.Type { case "GetContent": @@ -76,14 +86,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { panic(err) } case "ReadStream": - var offset uint64 - path, _ := request.Parameters["Path"].(string) - offset, ok := request.Parameters["Offset"].(uint64) - if !ok { - offsetSigned, _ := request.Parameters["Offset"].(int64) - offset = uint64(offsetSigned) - } + // Depending on serialization method, Offset may be convereted to any int/uint type + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() reader, err := driver.ReadStream(path, offset) var response ReadStreamResponse if err != nil { @@ -96,19 +101,11 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { panic(err) } case "WriteStream": - var offset uint64 - path, _ := request.Parameters["Path"].(string) - offset, ok := request.Parameters["Offset"].(uint64) - if !ok { - offsetSigned, _ := request.Parameters["Offset"].(int64) - offset = uint64(offsetSigned) - } - size, ok := request.Parameters["Size"].(uint64) - if !ok { - sizeSigned, _ := request.Parameters["Size"].(int64) - size = uint64(sizeSigned) - } + // Depending on serialization method, Offset may be convereted to any int/uint type + offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint() + // Depending on serialization method, Size may be convereted to any int/uint type + size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint() reader, _ := request.Parameters["Reader"].(io.ReadCloser) err := driver.WriteStream(path, offset, size, reader) response := WriteStreamResponse{ diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index a73e5e3d..0c301126 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -2,6 +2,7 @@ package s3 import ( "bytes" + "fmt" "io" "net/http" "strconv" @@ -9,21 +10,82 @@ import ( "github.com/crowdmob/goamz/aws" "github.com/crowdmob/goamz/s3" "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/factory" ) -/* Chunks need to be at least 5MB to store with a multipart upload on S3 */ +const DriverName = "s3" + +// Chunks need to be at least 5MB to store with a multipart upload on S3 const minChunkSize = uint64(5 * 1024 * 1024) -/* The largest amount of parts you can request from S3 */ +// The largest amount of parts you can request from S3 const listPartsMax = 1000 +func init() { + factory.Register(DriverName, &s3DriverFactory{}) +} + +// Implements the factory.StorageDriverFactory interface +type s3DriverFactory struct{} + +func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +// Storage Driver backed by Amazon S3 +// Objects are stored at absolute keys in the provided bucket type S3Driver struct { S3 *s3.S3 Bucket *s3.Bucket Encrypt bool } -func NewDriver(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) { +// Constructs a new S3Driver with a given parameters map +// Required parameters: +// - accesskey +// - secretkey +// - region +// - bucket +// - encrypt +func FromParameters(parameters map[string]string) (*S3Driver, error) { + accessKey, ok := parameters["accesskey"] + if !ok || accessKey == "" { + return nil, fmt.Errorf("No accesskey parameter provided") + } + + secretKey, ok := parameters["secretkey"] + if !ok || secretKey == "" { + return nil, fmt.Errorf("No secretkey parameter provided") + } + + regionName, ok := parameters["region"] + if !ok || regionName == "" { + return nil, fmt.Errorf("No region parameter provided") + } + region := aws.GetRegion(regionName) + if region.Name == "" { + return nil, fmt.Errorf("Invalid region provided: %s", region) + } + + bucket, ok := parameters["bucket"] + if !ok || bucket == "" { + return nil, fmt.Errorf("No bucket parameter provided") + } + + encrypt, ok := parameters["encrypt"] + if !ok { + return nil, fmt.Errorf("No encrypt parameter provided") + } + + encryptBool, err := strconv.ParseBool(encrypt) + if err != nil { + return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err) + } + return New(accessKey, secretKey, region, encryptBool, bucket) +} + +// Constructs a new S3Driver with the given AWS credentials, region, encryption flag, and bucketName +func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) { auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey} s3obj := s3.New(auth, region) bucket := s3obj.Bucket(bucketName) diff --git a/storagedriver/s3/s3_test.go b/storagedriver/s3/s3_test.go index b6862ab9..576c3623 100644 --- a/storagedriver/s3/s3_test.go +++ b/storagedriver/s3/s3_test.go @@ -2,6 +2,7 @@ package s3 import ( "os" + "strconv" "testing" "github.com/crowdmob/goamz/aws" @@ -14,23 +15,34 @@ import ( func Test(t *testing.T) { TestingT(t) } func init() { - accessKey := os.Getenv("ACCESS_KEY") - secretKey := os.Getenv("SECRET_KEY") + accessKey := os.Getenv("AWS_ACCESS_KEY") + secretKey := os.Getenv("AWS_SECRET_KEY") region := os.Getenv("AWS_REGION") bucket := os.Getenv("S3_BUCKET") encrypt := os.Getenv("S3_ENCRYPT") s3DriverConstructor := func() (storagedriver.StorageDriver, error) { - return NewDriver(accessKey, secretKey, aws.GetRegion(region), true, bucket) + shouldEncrypt, err := strconv.ParseBool(encrypt) + if err != nil { + return nil, err + } + return New(accessKey, secretKey, aws.GetRegion(region), shouldEncrypt, bucket) } + // Skip S3 storage driver tests if environment variable parameters are not provided skipCheck := func() string { if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" { - return "Must set ACCESS_KEY, SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" + return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" } return "" } testsuites.RegisterInProcessSuite(s3DriverConstructor, skipCheck) - testsuites.RegisterIPCSuite("s3", map[string]string{"accessKey": accessKey, "secretKey": secretKey, "region": region, "bucket": bucket, "encrypt": encrypt}, skipCheck) + testsuites.RegisterIPCSuite(DriverName, map[string]string{ + "accesskey": accessKey, + "secretkey": secretKey, + "region": region, + "bucket": bucket, + "encrypt": encrypt, + }, skipCheck) } diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index bfbfc110..55596cd6 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -5,17 +5,41 @@ import ( "io" ) +// Defines methods that a Storage Driver must implement for a filesystem-like key/value object storage type StorageDriver interface { + // Retrieve the content stored at "path" as a []byte + // Should primarily be used for small objects GetContent(path string) ([]byte, error) + + // Store the []byte content at a location designated by "path" + // Should primarily be used for small objects PutContent(path string, content []byte) error + + // Retrieve an io.ReadCloser for the content stored at "path" with a given byte offset + // May be used to resume reading a stream by providing a nonzero offset ReadStream(path string, offset uint64) (io.ReadCloser, error) + + // Store the contents of the provided io.ReadCloser at a location designated by "path" + // The driver will know it has received the full contents when it has read "size" bytes + // May be used to resume writing a stream by providing a nonzero offset + // The offset must be no larger than the number of bytes already written to this path WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + + // Retrieve the byte offset at which it is safe to continue writing at "path" ResumeWritePosition(path string) (uint64, error) + + // Recursively lists the objects stored at a subpath of the given prefix List(prefix string) ([]string, error) + + // Moves an object stored at sourcePath to destPath, removing the original object + // Note: This may be no more efficient than a copy followed by a delete for many implementations Move(sourcePath string, destPath string) error + + // Recursively deletes all objects stored at "path" and its subpaths Delete(path string) error } +// Error returned when operating on a nonexistent path type PathNotFoundError struct { Path string } @@ -24,6 +48,7 @@ func (err PathNotFoundError) Error() string { return fmt.Sprintf("Path not found: %s", err.Path) } +// Error returned when attempting to read or write from an invalid offset type InvalidOffsetError struct { Path string Offset uint64 diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index ff93b038..45c621d3 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -17,6 +17,9 @@ import ( // Hook up gocheck into the "go test" runner func Test(t *testing.T) { TestingT(t) } +// Registers an in-process storage driver test suite with the go test runner +// +// If skipCheck returns a non-empty skip reason, the suite is skipped with the given reason func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { Suite(&DriverSuite{ Constructor: driverConstructor, @@ -24,6 +27,9 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC }) } +// Registers a storage driver test suite which runs the named driver as a child process with the given parameters +// +// If skipCheck returns a non-empty skip reason, the suite is skipped with the given reason func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { suite := &DriverSuite{ Constructor: func() (storagedriver.StorageDriver, error) { From 0e5d41ff9b7dd8a0eadbddd72beb7329c49b55fa Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Wed, 29 Oct 2014 12:14:19 -0700 Subject: [PATCH 08/12] Updates documentation to follow godoc conventions --- storagedriver/factory/factory.go | 9 +++++---- storagedriver/filesystem/filesystem.go | 9 +++++---- storagedriver/inmemory/inmemory.go | 8 +++++--- storagedriver/ipc/client.go | 15 +++++++++------ storagedriver/ipc/ipc.go | 23 ++++++++++++----------- storagedriver/ipc/server.go | 18 +++++++++++------- storagedriver/s3/s3.go | 16 ++++++++++------ storagedriver/storagedriver.go | 26 +++++++++++++++----------- storagedriver/testsuites/testsuites.go | 24 ++++++++++++------------ 9 files changed, 84 insertions(+), 64 deletions(-) diff --git a/storagedriver/factory/factory.go b/storagedriver/factory/factory.go index c13c6c1e..0b85f372 100644 --- a/storagedriver/factory/factory.go +++ b/storagedriver/factory/factory.go @@ -7,13 +7,14 @@ import ( "github.com/docker/docker-registry/storagedriver/ipc" ) -// Internal mapping between storage driver names and their respective factories +// driverFactories stores an internal mapping between storage driver names and their respective +// factories var driverFactories = make(map[string]StorageDriverFactory) -// Factory interface for the storagedriver.StorageDriver interface +// StorageDriverFactory is a factory interface for creating storagedriver.StorageDriver interfaces // Storage drivers should call Register() with a factory to make the driver available by name type StorageDriverFactory interface { - // Creates and returns a new storagedriver.StorageDriver with the given parameters + // Create returns a new storagedriver.StorageDriver with the given parameters // Parameters will vary by driver and may be ignored // Each parameter key must only consist of lowercase letters and numbers Create(parameters map[string]string) (storagedriver.StorageDriver, error) @@ -54,7 +55,7 @@ func Create(name string, parameters map[string]string) (storagedriver.StorageDri return driverFactory.Create(parameters) } -// Error returned when attempting to construct an unregistered storage driver +// InvalidStorageDriverError records an attempt to construct an unregistered storage driver type InvalidStorageDriverError struct { Name string } diff --git a/storagedriver/filesystem/filesystem.go b/storagedriver/filesystem/filesystem.go index 27ffcf7a..4f100dd3 100644 --- a/storagedriver/filesystem/filesystem.go +++ b/storagedriver/filesystem/filesystem.go @@ -18,20 +18,20 @@ func init() { factory.Register(DriverName, &filesystemDriverFactory{}) } -// Implements the factory.StorageDriverFactory interface +// filesystemDriverFactory implements the factory.StorageDriverFactory interface type filesystemDriverFactory struct{} func (factory *filesystemDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { return FromParameters(parameters), nil } -// Storage Driver backed by a local filesystem +// FilesystemDriver is a storagedriver.StorageDriver implementation backed by a local filesystem // All provided paths will be subpaths of the RootDirectory type FilesystemDriver struct { rootDirectory string } -// Constructs a new FilesystemDriver with a given parameters map +// FromParameters constructs a new FilesystemDriver with a given parameters map // Optional Parameters: // - rootdirectory func FromParameters(parameters map[string]string) *FilesystemDriver { @@ -45,11 +45,12 @@ func FromParameters(parameters map[string]string) *FilesystemDriver { return New(rootDirectory) } -// Constructs a new FilesystemDriver with a given rootDirectory +// New constructs a new FilesystemDriver with a given rootDirectory func New(rootDirectory string) *FilesystemDriver { return &FilesystemDriver{rootDirectory} } +// subPath returns the absolute path of a key within the FilesystemDriver's storage func (d *FilesystemDriver) subPath(subPath string) string { return path.Join(d.rootDirectory, subPath) } diff --git a/storagedriver/inmemory/inmemory.go b/storagedriver/inmemory/inmemory.go index 2cf1b9f4..d7d4ccea 100644 --- a/storagedriver/inmemory/inmemory.go +++ b/storagedriver/inmemory/inmemory.go @@ -19,25 +19,27 @@ func init() { factory.Register(DriverName, &inMemoryDriverFactory{}) } -// Implements the factory.StorageDriverFactory interface +// inMemoryDriverFacotry implements the factory.StorageDriverFactory interface type inMemoryDriverFactory struct{} func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { return New(), nil } -// InMemory Storage Driver backed by a map +// InMemoryDriver is a storagedriver.StorageDriver implementation backed by a local map // Intended solely for example and testing purposes type InMemoryDriver struct { storage map[string][]byte mutex sync.RWMutex } -// Constructs a new InMemoryDriver +// New constructs a new InMemoryDriver func New() *InMemoryDriver { return &InMemoryDriver{storage: make(map[string][]byte)} } +// Implement the storagedriver.StorageDriver interface + func (d *InMemoryDriver) GetContent(path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index 6327b156..cdac8b11 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -15,7 +15,8 @@ import ( "github.com/docker/libchan/spdy" ) -// Storage Driver implementation using a managed child process communicating over IPC +// StorageDriverClient is a storagedriver.StorageDriver implementation using a managed child process +// communicating over IPC using libchan with a unix domain socket type StorageDriverClient struct { subprocess *exec.Cmd socket *os.File @@ -23,8 +24,9 @@ type StorageDriverClient struct { sender libchan.Sender } -// Constructs a new out-of-process storage driver using the driver name and configuration parameters -// Must call Start() on this driver client before remote method calls can be made +// NewDriverClient constructs a new out-of-process storage driver using the driver name and +// configuration parameters +// A user must call Start on this driver client before remote method calls can be made // // Looks for drivers in the following locations in order: // - Storage drivers directory (to be determined, yet not implemented) @@ -54,7 +56,8 @@ func NewDriverClient(name string, parameters map[string]string) (*StorageDriverC }, nil } -// Starts the designated child process storage driver and binds a socket to this process for IPC +// Start starts the designated child process storage driver and binds a socket to this process for +// IPC method calls func (driver *StorageDriverClient) Start() error { fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) if err != nil { @@ -102,8 +105,8 @@ func (driver *StorageDriverClient) Start() error { return nil } -// Stops the child process storage driver -// storagedriver.StorageDriver methods called after Stop() will fail +// Stop stops the child process storage driver +// storagedriver.StorageDriver methods called after Stop will fail func (driver *StorageDriverClient) Stop() error { closeSenderErr := driver.sender.Close() closeTransportErr := driver.transport.Close() diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index 4e7e65c7..30f63393 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -10,7 +10,7 @@ import ( "github.com/docker/libchan" ) -// Defines a remote method call request +// Request defines a remote method call request // A return value struct is to be sent over the ResponseChannel type Request struct { Type string @@ -18,8 +18,9 @@ type Request struct { ResponseChannel libchan.Sender } -// A simple wrapper around an io.ReadCloser that implements the io.ReadWriteCloser interface -// Writes are disallowed and will return an error if ever called +// noWriteReadWriteCloser is a simple wrapper around an io.ReadCloser that implements the +// io.ReadWriteCloser interface +// Calls to Write are disallowed and will return an error type noWriteReadWriteCloser struct { io.ReadCloser } @@ -28,7 +29,7 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { return 0, errors.New("Write unsupported") } -// Wraps an io.Reader as an io.ReadWriteCloser with a nop Close and unsupported Write method +// WrapReader wraps an io.Reader as an io.ReadWriteCloser with a nop Close and unsupported Write // Has no effect when an io.ReadWriteCloser is passed in func WrapReader(reader io.Reader) io.ReadWriteCloser { if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok { @@ -45,7 +46,7 @@ type responseError struct { Message string } -// Wraps an error in a serializable struct containing the error's type and message +// ResponseError wraps an error in a serializable struct containing the error's type and message func ResponseError(err error) *responseError { if err == nil { return nil @@ -62,35 +63,35 @@ func (err *responseError) Error() string { // IPC method call response object definitions -// Response for a ReadStream request +// ReadStreamResponse is a response for a ReadStream request type ReadStreamResponse struct { Reader io.ReadWriteCloser Error *responseError } -// Response for a WriteStream request +// WriteStreamResponse is a response for a WriteStream request type WriteStreamResponse struct { Error *responseError } -// Response for a ResumeWritePosition request +// ResumeWritePositionResponse is a response for a ResumeWritePosition request type ResumeWritePositionResponse struct { Position uint64 Error *responseError } -// Response for a List request +// ListResponse is a response for a List request type ListResponse struct { Keys []string Error *responseError } -// Response for a Move request +// MoveResponse is a response for a Move request type MoveResponse struct { Error *responseError } -// Response for a Delete request +// DeleteResponse is a response for a Delete request type DeleteResponse struct { Error *responseError } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 0af41d0a..81432cc3 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -13,10 +13,13 @@ import ( "github.com/docker/libchan/spdy" ) -// Construct a new IPC server handling requests for the given storagedriver.StorageDriver -// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in client.go +// StorageDriverServer runs a new IPC server handling requests for the given +// storagedriver.StorageDriver +// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in +// client.go // -// To create a new out-of-process driver, create a main package which calls StorageDriverServer with a storagedriver.StorageDriver +// To create a new out-of-process driver, create a main package which calls StorageDriverServer with +// a storagedriver.StorageDriver func StorageDriverServer(driver storagedriver.StorageDriver) error { childSocket := os.NewFile(3, "childSocket") defer childSocket.Close() @@ -39,9 +42,10 @@ func StorageDriverServer(driver storagedriver.StorageDriver) error { } } -// Receives new storagedriver.StorageDriver method requests and creates a new goroutine to handle each request -// -// Requests are expected to be of type ipc.Request as the parameters are unknown until the request type is deserialized +// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to +// handle each request +// Requests are expected to be of type ipc.Request as the parameters are unknown until the request +// type is deserialized func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { for { var request Request @@ -53,7 +57,7 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { } } -// Handles storagedriver.StorageDriver method requests as defined in client.go +// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go // Responds to requests using the Request.ResponseChannel func handleRequest(driver storagedriver.StorageDriver, request Request) { switch request.Type { diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index 0c301126..5338a276 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -15,24 +15,25 @@ import ( const DriverName = "s3" -// Chunks need to be at least 5MB to store with a multipart upload on S3 +// minChunkSize defines the minimum multipart upload chunk size +// S3 API requires multipart upload chunks to be at least 5MB const minChunkSize = uint64(5 * 1024 * 1024) -// The largest amount of parts you can request from S3 +// listPartsMax is the largest amount of parts you can request from S3 const listPartsMax = 1000 func init() { factory.Register(DriverName, &s3DriverFactory{}) } -// Implements the factory.StorageDriverFactory interface +// s3DriverFactory implements the factory.StorageDriverFactory interface type s3DriverFactory struct{} func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) { return FromParameters(parameters) } -// Storage Driver backed by Amazon S3 +// S3Driver is a storagedriver.StorageDriver implementation backed by Amazon S3 // Objects are stored at absolute keys in the provided bucket type S3Driver struct { S3 *s3.S3 @@ -40,7 +41,7 @@ type S3Driver struct { Encrypt bool } -// Constructs a new S3Driver with a given parameters map +// FromParameters constructs a new S3Driver with a given parameters map // Required parameters: // - accesskey // - secretkey @@ -84,7 +85,8 @@ func FromParameters(parameters map[string]string) (*S3Driver, error) { return New(accessKey, secretKey, region, encryptBool, bucket) } -// Constructs a new S3Driver with the given AWS credentials, region, encryption flag, and bucketName +// New constructs a new S3Driver with the given AWS credentials, region, encryption flag, and +// bucketName func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) { auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey} s3obj := s3.New(auth, region) @@ -100,6 +102,8 @@ func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bu return &S3Driver{s3obj, bucket, encrypt}, nil } +// Implement the storagedriver.StorageDriver interface + func (d *S3Driver) GetContent(path string) ([]byte, error) { return d.Bucket.Get(path) } diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 55596cd6..d03fec0c 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -5,41 +5,45 @@ import ( "io" ) -// Defines methods that a Storage Driver must implement for a filesystem-like key/value object storage +// StorageDriver defines methods that a Storage Driver must implement for a filesystem-like +// key/value object storage type StorageDriver interface { - // Retrieve the content stored at "path" as a []byte + // GetContent retrieves the content stored at "path" as a []byte // Should primarily be used for small objects GetContent(path string) ([]byte, error) - // Store the []byte content at a location designated by "path" + // PutContent stores the []byte content at a location designated by "path" // Should primarily be used for small objects PutContent(path string, content []byte) error - // Retrieve an io.ReadCloser for the content stored at "path" with a given byte offset + // ReadStream retrieves an io.ReadCloser for the content stored at "path" with a given byte + // offset // May be used to resume reading a stream by providing a nonzero offset ReadStream(path string, offset uint64) (io.ReadCloser, error) - // Store the contents of the provided io.ReadCloser at a location designated by "path" + // WriteStream stores the contents of the provided io.ReadCloser at a location designated by + // the given path // The driver will know it has received the full contents when it has read "size" bytes // May be used to resume writing a stream by providing a nonzero offset // The offset must be no larger than the number of bytes already written to this path WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error - // Retrieve the byte offset at which it is safe to continue writing at "path" + // ResumeWritePosition retrieves the byte offset at which it is safe to continue writing at the + // given path ResumeWritePosition(path string) (uint64, error) - // Recursively lists the objects stored at a subpath of the given prefix + // List recursively lists the objects stored at a subpath of the given prefix List(prefix string) ([]string, error) - // Moves an object stored at sourcePath to destPath, removing the original object + // Move moves an object stored at sourcePath to destPath, removing the original object // Note: This may be no more efficient than a copy followed by a delete for many implementations Move(sourcePath string, destPath string) error - // Recursively deletes all objects stored at "path" and its subpaths + // Delete recursively deletes all objects stored at "path" and its subpaths Delete(path string) error } -// Error returned when operating on a nonexistent path +// PathNotFoundError is returned when operating on a nonexistent path type PathNotFoundError struct { Path string } @@ -48,7 +52,7 @@ func (err PathNotFoundError) Error() string { return fmt.Sprintf("Path not found: %s", err.Path) } -// Error returned when attempting to read or write from an invalid offset +// InvalidOffsetError is returned when attempting to read or write from an invalid offset type InvalidOffsetError struct { Path string Offset uint64 diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 45c621d3..94d85461 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -17,9 +17,7 @@ import ( // Hook up gocheck into the "go test" runner func Test(t *testing.T) { TestingT(t) } -// Registers an in-process storage driver test suite with the go test runner -// -// If skipCheck returns a non-empty skip reason, the suite is skipped with the given reason +// RegisterInProcessSuite registers an in-process storage driver test suite with the go test runner func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { Suite(&DriverSuite{ Constructor: driverConstructor, @@ -27,9 +25,8 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC }) } -// Registers a storage driver test suite which runs the named driver as a child process with the given parameters -// -// If skipCheck returns a non-empty skip reason, the suite is skipped with the given reason +// RegisterIPCSuite registers a storage driver test suite which runs the named driver as a child +// process with the given parameters func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { suite := &DriverSuite{ Constructor: func() (storagedriver.StorageDriver, error) { @@ -56,13 +53,21 @@ func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck Suite(suite) } +// SkipCheck is a function used to determine if a test suite should be skipped +// If a SkipCheck returns a non-empty skip reason, the suite is skipped with the given reason type SkipCheck func() (reason string) -var NeverSkip = func() string { return "" } +// NeverSkip is a default SkipCheck which never skips the suite +var NeverSkip SkipCheck = func() string { return "" } +// DriverConstructor is a function which returns a new storagedriver.StorageDriver type DriverConstructor func() (storagedriver.StorageDriver, error) + +// DriverTeardown is a function which cleans up a suite's storagedriver.StorageDriver type DriverTeardown func() error +// DriverSuite is a gocheck test suite designed to test a storagedriver.StorageDriver +// The intended way to create a DriverSuite is with RegisterInProcessSuite or RegisterIPCSuite type DriverSuite struct { Constructor DriverConstructor Teardown DriverTeardown @@ -70,11 +75,6 @@ type DriverSuite struct { storagedriver.StorageDriver } -type TestDriverConfig struct { - name string - params map[string]string -} - func (suite *DriverSuite) SetUpSuite(c *C) { if reason := suite.SkipCheck(); reason != "" { c.Skip(reason) From 3e4738587f31b47edb79d361ed370cb2ae6d41fd Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 30 Oct 2014 11:42:59 -0700 Subject: [PATCH 09/12] Adds README for the storagedriver package --- storagedriver/README.md | 47 ++++++++++++++++++++++++++++++++++ storagedriver/storagedriver.go | 2 +- 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 storagedriver/README.md diff --git a/storagedriver/README.md b/storagedriver/README.md new file mode 100644 index 00000000..f2795834 --- /dev/null +++ b/storagedriver/README.md @@ -0,0 +1,47 @@ +Docker-Registry Storage Driver +============================== + +This document describes the registry storage driver model, implementation, and explains how to contribute new storage drivers. + +Provided Drivers +================ + +This storage driver package comes bundled with three default drivers. + +1. filesystem: A local storage driver configured to use a directory tree in the local filesystem. +2. s3: A driver storing objects in an Amazon Simple Storage Solution (S3) bucket. +3. inmemory: A temporary storage driver using a local inmemory map. This exists solely for reference and testing. + +Storage Driver API +================== + +The storage driver API is designed to model a filesystem-like key/value storage in a manner abstract enough to support a range of drivers from the local filesystem to Amazon S3 or other distributed object storage systems. + +Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key. + +Storage drivers are intended (but not required) to be written in go, providing compile-time validation of the `storagedriver.StorageDriver` interface, although an IPC driver wrapper means that it is not required for drivers to be included in the compiled registry. The `storagedriver/ipc` package provides a client/server protocol for running storage drivers provided in external executables as a managed child server process. + +Driver Selection and Configuration +================================== + +The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package. + +Storage driver factories may be registered by name using the `factory.Register` method, and then later invoked by calling `factory.Create` with a driver name and parameters map. If no driver is registered with the given name, this factory will attempt to find an executable storage driver with the same name and return an IPC storage driver wrapper managing the driver subprocess. If no such storage driver can be found, `factory.Create` will return an `InvalidStorageDriverError`. + +Driver Contribution +=================== + +## Writing new storage drivers +To create a valid storage driver, one must implement the `storagedriver.StorageDriver` interface and make sure to expose this driver via the factory system and as a distributable IPC server executable. + +### In-process drivers +Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase. + +### Out-of-process drivers +As many users will run the registry as a pre-constructed docker container, storage drivers should also be distributable as IPC server executables. Drivers written in go should model the main method provided in `main/storagedriver/filesystem/filesystem.go`. Parameters to IPC drivers will be provided as a JSON-serialized map in the first argument to the process. These parameters should be validated and then a blocking call to `ipc.StorageDriverServer` should be made with a new storage driver. + +## Testing +Storage driver test suites are provided in `storagedriver/testsuites/testsuites.go` and may be used for any storage driver written in go. Two methods are provided for registering test suites, `RegisterInProcessSuite` and `RegisterIPCSuite`, which run the same set of tests for the driver imported or managed over IPC respectively. + +## Drivers written in other languages +Although storage drivers are strongly recommended to be written in go for consistency, compile-time validation, and support, the IPC framework allows for a level of language-agnosticism. Non-go drivers must implement the storage driver protocol by mimicing StorageDriverServer in `storagedriver/ipc/server.go`. As the IPC framework is a layer on top of [docker/libchan](https://github.com/docker/libchan), this currently limits language support to Java via [ndeloof/chan](https://github.com/ndeloof/jchan) and Javascript via [GraftJS/jschan](https://github.com/GraftJS/jschan), although contributions to the libchan project are welcome. diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index d03fec0c..57e34c0d 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -25,7 +25,7 @@ type StorageDriver interface { // the given path // The driver will know it has received the full contents when it has read "size" bytes // May be used to resume writing a stream by providing a nonzero offset - // The offset must be no larger than the number of bytes already written to this path + // The offset must be no larger than the ResumeWritePosition for this path WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error // ResumeWritePosition retrieves the byte offset at which it is safe to continue writing at the From b522fbd67507b0121a45ab9337cd5adfb2b501db Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Fri, 31 Oct 2014 11:50:02 -0700 Subject: [PATCH 10/12] Removes WrapReader boilerplate for updates to libchan libchan now supports io.ReadCloser and io.WriteCloser, so we don't need io.ReadWriteCloser wrapping --- storagedriver/ipc/client.go | 4 ++-- storagedriver/ipc/ipc.go | 27 +-------------------------- storagedriver/ipc/server.go | 4 ++-- 3 files changed, 5 insertions(+), 30 deletions(-) diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index cdac8b11..54090945 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -155,7 +155,7 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Reader": WrapReader(bytes.NewReader(contents))} + params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err @@ -199,7 +199,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReader(reader)} + params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)} err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index 30f63393..9c6b1dc0 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -1,10 +1,8 @@ package ipc import ( - "errors" "fmt" "io" - "io/ioutil" "reflect" "github.com/docker/libchan" @@ -18,29 +16,6 @@ type Request struct { ResponseChannel libchan.Sender } -// noWriteReadWriteCloser is a simple wrapper around an io.ReadCloser that implements the -// io.ReadWriteCloser interface -// Calls to Write are disallowed and will return an error -type noWriteReadWriteCloser struct { - io.ReadCloser -} - -func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { - return 0, errors.New("Write unsupported") -} - -// WrapReader wraps an io.Reader as an io.ReadWriteCloser with a nop Close and unsupported Write -// Has no effect when an io.ReadWriteCloser is passed in -func WrapReader(reader io.Reader) io.ReadWriteCloser { - if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok { - return readWriteCloser - } else if readCloser, ok := reader.(io.ReadCloser); ok { - return noWriteReadWriteCloser{readCloser} - } else { - return noWriteReadWriteCloser{ioutil.NopCloser(reader)} - } -} - type responseError struct { Type string Message string @@ -65,7 +40,7 @@ func (err *responseError) Error() string { // ReadStreamResponse is a response for a ReadStream request type ReadStreamResponse struct { - Reader io.ReadWriteCloser + Reader io.ReadCloser Error *responseError } diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 81432cc3..989b44ba 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -68,7 +68,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { if err != nil { response = ReadStreamResponse{Error: ResponseError(err)} } else { - response = ReadStreamResponse{Reader: WrapReader(bytes.NewReader(content))} + response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))} } err = request.ResponseChannel.Send(&response) if err != nil { @@ -98,7 +98,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { if err != nil { response = ReadStreamResponse{Error: ResponseError(err)} } else { - response = ReadStreamResponse{Reader: WrapReader(reader)} + response = ReadStreamResponse{Reader: ioutil.NopCloser(reader)} } err = request.ResponseChannel.Send(&response) if err != nil { From 43716a28508cec2888337ac70fcfb8c19c84788c Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 3 Nov 2014 16:20:38 -0800 Subject: [PATCH 11/12] Uses IsTruncated and NextMarker for S3 list internal pagination --- storagedriver/s3/s3.go | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/storagedriver/s3/s3.go b/storagedriver/s3/s3.go index 5338a276..ea13b87c 100644 --- a/storagedriver/s3/s3.go +++ b/storagedriver/s3/s3.go @@ -161,7 +161,6 @@ func (d *S3Driver) WriteStream(path string, offset, size uint64, reader io.ReadC } else { part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead])) if err != nil { - return err } @@ -192,7 +191,10 @@ func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) { } func (d *S3Driver) List(prefix string) ([]string, error) { - listResponse, err := d.Bucket.List(prefix+"/", "/", "", listPartsMax) + if prefix[len(prefix)-1] != '/' { + prefix = prefix + "/" + } + listResponse, err := d.Bucket.List(prefix, "/", "", listPartsMax) if err != nil { return nil, err } @@ -200,7 +202,7 @@ func (d *S3Driver) List(prefix string) ([]string, error) { files := []string{} directories := []string{} - for len(listResponse.Contents) > 0 || len(listResponse.CommonPrefixes) > 0 { + for { for _, key := range listResponse.Contents { files = append(files, key.Key) } @@ -209,27 +211,13 @@ func (d *S3Driver) List(prefix string) ([]string, error) { directories = append(directories, commonPrefix[0:len(commonPrefix)-1]) } - lastFile := "" - lastDirectory := "" - lastMarker := "" - - if len(files) > 0 { - lastFile = files[len(files)-1] - } - - if len(directories) > 0 { - lastDirectory = directories[len(directories)-1] + "/" - } - - if lastDirectory > lastFile { - lastMarker = lastDirectory + if listResponse.IsTruncated { + listResponse, err = d.Bucket.List(prefix, "/", listResponse.NextMarker, listPartsMax) + if err != nil { + return nil, err + } } else { - lastMarker = lastFile - } - - listResponse, err = d.Bucket.List(prefix+"/", "/", lastMarker, listPartsMax) - if err != nil { - return nil, err + break } } From 7daa850d44dd2154e05ff5d178c8f0f3667a6119 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 4 Nov 2014 09:52:24 -0800 Subject: [PATCH 12/12] Fixes documentation to show that StorageDriver.List is non-recursive --- storagedriver/filesystem/filesystem.go | 8 ++++---- storagedriver/inmemory/inmemory.go | 4 ++-- storagedriver/ipc/client.go | 4 ++-- storagedriver/ipc/server.go | 4 ++-- storagedriver/storagedriver.go | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/storagedriver/filesystem/filesystem.go b/storagedriver/filesystem/filesystem.go index 4f100dd3..0bdf6017 100644 --- a/storagedriver/filesystem/filesystem.go +++ b/storagedriver/filesystem/filesystem.go @@ -166,9 +166,9 @@ func (d *FilesystemDriver) ResumeWritePosition(subPath string) (uint64, error) { return uint64(fileInfo.Size()), nil } -func (d *FilesystemDriver) List(prefix string) ([]string, error) { - prefix = strings.TrimRight(prefix, "/") - fullPath := d.subPath(prefix) +func (d *FilesystemDriver) List(subPath string) ([]string, error) { + subPath = strings.TrimRight(subPath, "/") + fullPath := d.subPath(subPath) dir, err := os.Open(fullPath) if err != nil { @@ -182,7 +182,7 @@ func (d *FilesystemDriver) List(prefix string) ([]string, error) { keys := make([]string, 0, len(fileNames)) for _, fileName := range fileNames { - keys = append(keys, path.Join(prefix, fileName)) + keys = append(keys, path.Join(subPath, fileName)) } return keys, nil diff --git a/storagedriver/inmemory/inmemory.go b/storagedriver/inmemory/inmemory.go index d7d4ccea..9b9fd947 100644 --- a/storagedriver/inmemory/inmemory.go +++ b/storagedriver/inmemory/inmemory.go @@ -110,8 +110,8 @@ func (d *InMemoryDriver) ResumeWritePosition(path string) (uint64, error) { return uint64(len(contents)), nil } -func (d *InMemoryDriver) List(prefix string) ([]string, error) { - subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s/[^/]+", prefix)) +func (d *InMemoryDriver) List(path string) ([]string, error) { + subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s/[^/]+", path)) if err != nil { return nil, err } diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index 54090945..fd5f15c3 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -240,10 +240,10 @@ func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, err return response.Position, nil } -func (driver *StorageDriverClient) List(prefix string) ([]string, error) { +func (driver *StorageDriverClient) List(path string) ([]string, error) { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Prefix": prefix} + params := map[string]interface{}{"Path": path} err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return nil, err diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 989b44ba..d73be2f6 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -131,8 +131,8 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { panic(err) } case "List": - prefix, _ := request.Parameters["Prefix"].(string) - keys, err := driver.List(prefix) + path, _ := request.Parameters["Path"].(string) + keys, err := driver.List(path) response := ListResponse{ Keys: keys, Error: ResponseError(err), diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go index 57e34c0d..a66dba0c 100644 --- a/storagedriver/storagedriver.go +++ b/storagedriver/storagedriver.go @@ -32,8 +32,8 @@ type StorageDriver interface { // given path ResumeWritePosition(path string) (uint64, error) - // List recursively lists the objects stored at a subpath of the given prefix - List(prefix string) ([]string, error) + // List returns a list of the objects that are direct descendants of the given path + List(path string) ([]string, error) // Move moves an object stored at sourcePath to destPath, removing the original object // Note: This may be no more efficient than a copy followed by a delete for many implementations