diff --git a/cmd/registry-storagedriver-azure/main.go b/cmd/registry-storagedriver-azure/main.go deleted file mode 100644 index 8083d23b..00000000 --- a/cmd/registry-storagedriver-azure/main.go +++ /dev/null @@ -1,31 +0,0 @@ -// +build ignore - -package main - -import ( - "encoding/json" - "os" - - log "github.com/Sirupsen/logrus" - "github.com/docker/distribution/registry/storage/driver/azure" - "github.com/docker/distribution/registry/storage/driver/ipc" -) - -// An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient -func main() { - parametersBytes := []byte(os.Args[1]) - var parameters map[string]interface{} - err := json.Unmarshal(parametersBytes, ¶meters) - if err != nil { - panic(err) - } - - driver, err := azure.FromParameters(parameters) - if err != nil { - panic(err) - } - - if err := ipc.StorageDriverServer(driver); err != nil { - log.Fatalln("driver error:", err) - } -} diff --git a/cmd/registry-storagedriver-filesystem/main.go b/cmd/registry-storagedriver-filesystem/main.go deleted file mode 100644 index 93756d2c..00000000 --- a/cmd/registry-storagedriver-filesystem/main.go +++ /dev/null @@ -1,27 +0,0 @@ -// +build ignore - -package main - -import ( - "encoding/json" - "os" - - "github.com/Sirupsen/logrus" - - "github.com/docker/distribution/registry/storage/driver/filesystem" - "github.com/docker/distribution/registry/storage/driver/ipc" -) - -// An out-of-process filesystem driver, intended to be run by ipc.NewDriverClient -func main() { - parametersBytes := []byte(os.Args[1]) - var parameters map[string]string - err := json.Unmarshal(parametersBytes, ¶meters) - if err != nil { - panic(err) - } - - if err := ipc.StorageDriverServer(filesystem.FromParameters(parameters)); err != nil { - logrus.Fatalln(err) - } -} diff --git a/cmd/registry-storagedriver-inmemory/main.go b/cmd/registry-storagedriver-inmemory/main.go deleted file mode 100644 index d957cc25..00000000 --- a/cmd/registry-storagedriver-inmemory/main.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build ignore - -package main - -import ( - "github.com/Sirupsen/logrus" - "github.com/docker/distribution/registry/storage/driver/inmemory" - "github.com/docker/distribution/registry/storage/driver/ipc" -) - -// An out-of-process inmemory driver, intended to be run by ipc.NewDriverClient -// This exists primarily for example and testing purposes -func main() { - if err := ipc.StorageDriverServer(inmemory.New()); err != nil { - logrus.Fatalln(err) - } -} diff --git a/cmd/registry-storagedriver-s3/main.go b/cmd/registry-storagedriver-s3/main.go deleted file mode 100644 index 8604fef0..00000000 --- a/cmd/registry-storagedriver-s3/main.go +++ /dev/null @@ -1,32 +0,0 @@ -// +build ignore - -package main - -import ( - "encoding/json" - "os" - - "github.com/Sirupsen/logrus" - - "github.com/docker/distribution/registry/storage/driver/ipc" - "github.com/docker/distribution/registry/storage/driver/s3" -) - -// An out-of-process S3 driver, intended to be run by ipc.NewDriverClient -func main() { - parametersBytes := []byte(os.Args[1]) - var parameters map[string]string - err := json.Unmarshal(parametersBytes, ¶meters) - if err != nil { - panic(err) - } - - driver, err := s3.FromParameters(parameters) - if err != nil { - panic(err) - } - - if err := ipc.StorageDriverServer(driver); err != nil { - logrus.Fatalln(err) - } -} diff --git a/docs/storagedrivers.md b/docs/storagedrivers.md index b7445f42..e574665a 100644 --- a/docs/storagedrivers.md +++ b/docs/storagedrivers.md @@ -30,29 +30,30 @@ The storage driver API is designed to model a filesystem-like key/value storage Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key. -Storage drivers are intended (but not required) to be written in go, providing compile-time validation of the `storagedriver.StorageDriver` interface, although an IPC driver wrapper means that it is not required for drivers to be included in the compiled registry. The `storagedriver/ipc` package provides a client/server protocol for running storage drivers provided in external executables as a managed child server process. +Storage drivers are intended to be written in Go, providing compile-time +validation of the `storagedriver.StorageDriver` interface. ## Driver Selection and Configuration The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package. -Storage driver factories may be registered by name using the `factory.Register` method, and then later invoked by calling `factory.Create` with a driver name and parameters map. If no driver is registered with the given name, this factory will attempt to find an executable storage driver with the executable name "registry-storage-\" and return an IPC storage driver wrapper managing the driver subprocess. If no such storage driver can be found, `factory.Create` will return an `InvalidStorageDriverError`. +Storage driver factories may be registered by name using the +`factory.Register` method, and then later invoked by calling `factory.Create` +with a driver name and parameters map. If no such storage driver can be found, +`factory.Create` will return an `InvalidStorageDriverError`. ## Driver Contribution ### Writing new storage drivers -To create a valid storage driver, one must implement the `storagedriver.StorageDriver` interface and make sure to expose this driver via the factory system and as a distributable IPC server executable. +To create a valid storage driver, one must implement the +`storagedriver.StorageDriver` interface and make sure to expose this driver +via the factory system. -#### In-process drivers +#### Registering Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase. -#### Out-of-process drivers -As many users will run the registry as a pre-constructed docker container, storage drivers should also be distributable as IPC server executables. Drivers written in go should model the main method provided in `registry/storage/driverfilesystem/driver.go`. Parameters to IPC drivers will be provided as a JSON-serialized map in the first argument to the process. These parameters should be validated and then a blocking call to `ipc.StorageDriverServer` should be made with a new storage driver. - -Out-of-process drivers must also implement the `ipc.IPCStorageDriver` interface, which exposes a `Version` check for the storage driver. This is used to validate storage driver api compatibility at driver load-time. - ## Testing -Storage driver test suites are provided in `storagedriver/testsuites/testsuites.go` and may be used for any storage driver written in go. Two methods are provided for registering test suites, `RegisterInProcessSuite` and `RegisterIPCSuite`, which run the same set of tests for the driver imported or managed over IPC respectively. - -## Drivers written in other languages -Although storage drivers are strongly recommended to be written in go for consistency, compile-time validation, and support, the IPC framework allows for a level of language-agnosticism. Non-go drivers must implement the storage driver protocol by mimicing StorageDriverServer in `storagedriver/ipc/server.go`. As the IPC framework is a layer on top of [docker/libchan](https://github.com/docker/libchan), this currently limits language support to Java via [ndeloof/chan](https://github.com/ndeloof/jchan) and Javascript via [GraftJS/jschan](https://github.com/GraftJS/jschan), although contributions to the libchan project are welcome. +Storage driver test suites are provided in +`storagedriver/testsuites/testsuites.go` and may be used for any storage +driver written in Go. Tests can be registered using the `RegisterSuite` +function, which run the same set of tests for any registered drivers. diff --git a/registry/storage/driver/azure/azure_test.go b/registry/storage/driver/azure/azure_test.go index 4990ba19..4a0661b3 100644 --- a/registry/storage/driver/azure/azure_test.go +++ b/registry/storage/driver/azure/azure_test.go @@ -59,11 +59,5 @@ func init() { return "" } - testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck) - // testsuites.RegisterIPCSuite(driverName, map[string]string{ - // paramAccountName: accountName, - // paramAccountKey: accountKey, - // paramContainer: container, - // paramRealm: realm, - // }, skipCheck) + testsuites.RegisterSuite(azureDriverConstructor, skipCheck) } diff --git a/registry/storage/driver/factory/factory.go b/registry/storage/driver/factory/factory.go index 66d160f3..e84f0026 100644 --- a/registry/storage/driver/factory/factory.go +++ b/registry/storage/driver/factory/factory.go @@ -33,30 +33,14 @@ func Register(name string, factory StorageDriverFactory) { driverFactories[name] = factory } -// Create a new storagedriver.StorageDriver with the given name and parameters -// To run in-process, the StorageDriverFactory must first be registered with the given name -// If no in-process drivers are found with the given name, this attempts to create an IPC driver -// If no in-process or external drivers are found, an InvalidStorageDriverError is returned +// Create a new storagedriver.StorageDriver with the given name and +// parameters. To use a driver, the StorageDriverFactory must first be +// registered with the given name. If no drivers are found, an +// InvalidStorageDriverError is returned func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) { driverFactory, ok := driverFactories[name] if !ok { return nil, InvalidStorageDriverError{name} - - // NOTE(stevvooe): We are disabling storagedriver ipc for now, as the - // server and client need to be updated for the changed API calls and - // there were some problems libchan hanging. We'll phase this - // functionality back in over the next few weeks. - - // No registered StorageDriverFactory found, try ipc - // driverClient, err := ipc.NewDriverClient(name, parameters) - // if err != nil { - // return nil, InvalidStorageDriverError{name} - // } - // err = driverClient.Start() - // if err != nil { - // return nil, err - // } - // return driverClient, nil } return driverFactory.Create(parameters) } diff --git a/registry/storage/driver/filesystem/driver_test.go b/registry/storage/driver/filesystem/driver_test.go index 8572de16..8b48b431 100644 --- a/registry/storage/driver/filesystem/driver_test.go +++ b/registry/storage/driver/filesystem/driver_test.go @@ -20,10 +20,7 @@ func init() { } defer os.Remove(root) - testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) { + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { return New(root), nil }, testsuites.NeverSkip) - - // BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later. - // testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip) } diff --git a/registry/storage/driver/inmemory/driver_test.go b/registry/storage/driver/inmemory/driver_test.go index a02ff23e..dbc1916f 100644 --- a/registry/storage/driver/inmemory/driver_test.go +++ b/registry/storage/driver/inmemory/driver_test.go @@ -5,7 +5,6 @@ import ( storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/testsuites" - "gopkg.in/check.v1" ) @@ -16,9 +15,5 @@ func init() { inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { return New(), nil } - testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) - - // BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot - // the problems with libchan. - // testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) + testsuites.RegisterSuite(inmemoryDriverConstructor, testsuites.NeverSkip) } diff --git a/registry/storage/driver/ipc/client.go b/registry/storage/driver/ipc/client.go deleted file mode 100644 index daa823d7..00000000 --- a/registry/storage/driver/ipc/client.go +++ /dev/null @@ -1,454 +0,0 @@ -// +build ignore - -package ipc - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net" - "os" - "os/exec" - "syscall" - - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/libchan" - "github.com/docker/libchan/spdy" -) - -// StorageDriverExecutablePrefix is the prefix which the IPC storage driver -// loader expects driver executables to begin with. For example, the s3 driver -// should be named "registry-storagedriver-s3". -const StorageDriverExecutablePrefix = "registry-storagedriver-" - -// StorageDriverClient is a storagedriver.StorageDriver implementation using a -// managed child process communicating over IPC using libchan with a unix domain -// socket -type StorageDriverClient struct { - subprocess *exec.Cmd - exitChan chan error - exitErr error - stopChan chan struct{} - socket *os.File - transport *spdy.Transport - sender libchan.Sender - version storagedriver.Version -} - -// NewDriverClient constructs a new out-of-process storage driver using the -// driver name and configuration parameters -// A user must call Start on this driver client before remote method calls can -// be made -// -// Looks for drivers in the following locations in order: -// - Storage drivers directory (to be determined, yet not implemented) -// - $GOPATH/bin -// - $PATH -func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) { - paramsBytes, err := json.Marshal(parameters) - if err != nil { - return nil, err - } - - driverExecName := StorageDriverExecutablePrefix + name - driverPath, err := exec.LookPath(driverExecName) - if err != nil { - return nil, err - } - - command := exec.Command(driverPath, string(paramsBytes)) - - return &StorageDriverClient{ - subprocess: command, - }, nil -} - -// Start starts the designated child process storage driver and binds a socket -// to this process for IPC method calls -func (driver *StorageDriverClient) Start() error { - driver.exitErr = nil - driver.exitChan = make(chan error) - driver.stopChan = make(chan struct{}) - - fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) - if err != nil { - return err - } - - childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket") - driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket") - - driver.subprocess.Stdout = os.Stdout - driver.subprocess.Stderr = os.Stderr - driver.subprocess.ExtraFiles = []*os.File{childSocket} - - if err = driver.subprocess.Start(); err != nil { - driver.Stop() - return err - } - - go driver.handleSubprocessExit() - - if err = childSocket.Close(); err != nil { - driver.Stop() - return err - } - - connection, err := net.FileConn(driver.socket) - if err != nil { - driver.Stop() - return err - } - driver.transport, err = spdy.NewClientTransport(connection) - if err != nil { - driver.Stop() - return err - } - driver.sender, err = driver.transport.NewSendChannel() - if err != nil { - driver.Stop() - return err - } - - // Check the driver's version to determine compatibility - receiver, remoteSender := libchan.Pipe() - err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender}) - if err != nil { - driver.Stop() - return err - } - - var response VersionResponse - err = receiver.Receive(&response) - if err != nil { - driver.Stop() - return err - } - - if response.Error != nil { - return response.Error.Unwrap() - } - - driver.version = response.Version - - if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() { - return IncompatibleVersionError{driver.version} - } - - return nil -} - -// Stop stops the child process storage driver -// storagedriver.StorageDriver methods called after Stop will fail -func (driver *StorageDriverClient) Stop() error { - var closeSenderErr, closeTransportErr, closeSocketErr, killErr error - - if driver.sender != nil { - closeSenderErr = driver.sender.Close() - } - if driver.transport != nil { - closeTransportErr = driver.transport.Close() - } - if driver.socket != nil { - closeSocketErr = driver.socket.Close() - } - if driver.subprocess != nil { - killErr = driver.subprocess.Process.Kill() - } - if driver.stopChan != nil { - close(driver.stopChan) - } - - if closeSenderErr != nil { - return closeSenderErr - } else if closeTransportErr != nil { - return closeTransportErr - } else if closeSocketErr != nil { - return closeSocketErr - } - - return killErr -} - -// Implement the storagedriver.StorageDriver interface over IPC - -// GetContent retrieves the content stored at "path" as a []byte. -func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { - if err := driver.exited(); err != nil { - return nil, err - } - - receiver, remoteSender := libchan.Pipe() - - params := map[string]interface{}{"Path": path} - err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return nil, err - } - - response := new(ReadStreamResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return nil, err - } - - if response.Error != nil { - return nil, response.Error.Unwrap() - } - - defer response.Reader.Close() - contents, err := ioutil.ReadAll(response.Reader) - if err != nil { - return nil, err - } - return contents, nil -} - -// PutContent stores the []byte content at a location designated by "path". -func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { - if err := driver.exited(); err != nil { - return err - } - - receiver, remoteSender := libchan.Pipe() - - params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} - err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return err - } - - response := new(WriteStreamResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return err - } - - if response.Error != nil { - return response.Error.Unwrap() - } - - return nil -} - -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a -// given byte offset. -func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) { - if err := driver.exited(); err != nil { - return nil, err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Offset": offset} - err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return nil, err - } - - response := new(ReadStreamResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return nil, err - } - - if response.Error != nil { - return nil, response.Error.Unwrap() - } - - return response.Reader, nil -} - -// WriteStream stores the contents of the provided io.ReadCloser at a location -// designated by the given path. -func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { - if err := driver.exited(); err != nil { - return err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": reader} - err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return err - } - - response := new(WriteStreamResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return err - } - - if response.Error != nil { - return response.Error.Unwrap() - } - - return nil -} - -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { - if err := driver.exited(); err != nil { - return 0, err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path} - err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return 0, err - } - - response := new(CurrentSizeResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return 0, err - } - - if response.Error != nil { - return 0, response.Error.Unwrap() - } - - return response.Position, nil -} - -// List returns a list of the objects that are direct descendants of the given -// path. -func (driver *StorageDriverClient) List(path string) ([]string, error) { - if err := driver.exited(); err != nil { - return nil, err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path} - err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return nil, err - } - - response := new(ListResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return nil, err - } - - if response.Error != nil { - return nil, response.Error.Unwrap() - } - - return response.Keys, nil -} - -// Move moves an object stored at sourcePath to destPath, removing the original -// object. -func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { - if err := driver.exited(); err != nil { - return err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} - err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return err - } - - response := new(MoveResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return err - } - - if response.Error != nil { - return response.Error.Unwrap() - } - - return nil -} - -// Delete recursively deletes all objects stored at "path" and its subpaths. -func (driver *StorageDriverClient) Delete(path string) error { - if err := driver.exited(); err != nil { - return err - } - - receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path} - err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) - if err != nil { - return err - } - - response := new(DeleteResponse) - err = driver.receiveResponse(receiver, response) - if err != nil { - return err - } - - if response.Error != nil { - return response.Error.Unwrap() - } - - return nil -} - -// handleSubprocessExit populates the exit channel until we have explicitly -// stopped the storage driver subprocess -// Requests can select on driver.exitChan and response receiving and not hang if -// the process exits -func (driver *StorageDriverClient) handleSubprocessExit() { - exitErr := driver.subprocess.Wait() - if exitErr == nil { - exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly") - } else { - exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr) - } - - driver.exitErr = exitErr - - for { - select { - case driver.exitChan <- exitErr: - case <-driver.stopChan: - close(driver.exitChan) - return - } - } -} - -// receiveResponse populates the response value with the next result from the -// given receiver, or returns an error if receiving failed or the driver has -// stopped -func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error { - receiveChan := make(chan error, 1) - go func(receiver libchan.Receiver, receiveChan chan<- error) { - receiveChan <- receiver.Receive(response) - }(receiver, receiveChan) - - var err error - var ok bool - select { - case err = <-receiveChan: - case err, ok = <-driver.exitChan: - if !ok { - err = driver.exitErr - } - } - - return err -} - -// exited returns an exit error if the driver has exited or nil otherwise -func (driver *StorageDriverClient) exited() error { - select { - case err, ok := <-driver.exitChan: - if !ok { - return driver.exitErr - } - return err - default: - return nil - } -} diff --git a/registry/storage/driver/ipc/ipc.go b/registry/storage/driver/ipc/ipc.go deleted file mode 100644 index dabb834d..00000000 --- a/registry/storage/driver/ipc/ipc.go +++ /dev/null @@ -1,148 +0,0 @@ -// +build ignore - -package ipc - -import ( - "fmt" - "io" - "reflect" - - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/libchan" -) - -// StorageDriver is the interface which IPC storage drivers must implement. As external storage -// drivers may be defined to use a different version of the storagedriver.StorageDriver interface, -// we use an additional version check to determine compatiblity. -type StorageDriver interface { - // Version returns the storagedriver.StorageDriver interface version which this storage driver - // implements, which is used to determine driver compatibility - Version() (storagedriver.Version, error) -} - -// IncompatibleVersionError is returned when a storage driver is using an incompatible version of -// the storagedriver.StorageDriver api -type IncompatibleVersionError struct { - version storagedriver.Version -} - -func (e IncompatibleVersionError) Error() string { - return fmt.Sprintf("Incompatible storage driver version: %s", e.version) -} - -// Request defines a remote method call request -// A return value struct is to be sent over the ResponseChannel -type Request struct { - Type string `codec:",omitempty"` - Parameters map[string]interface{} `codec:",omitempty"` - ResponseChannel libchan.Sender `codec:",omitempty"` -} - -// ResponseError is a serializable error type. -// The Type and Parameters may be used to reconstruct the same error on the -// client side, falling back to using the Type and Message if this cannot be -// done. -type ResponseError struct { - Type string `codec:",omitempty"` - Message string `codec:",omitempty"` - Parameters map[string]interface{} `codec:",omitempty"` -} - -// WrapError wraps an error in a serializable struct containing the error's type -// and message. -func WrapError(err error) *ResponseError { - if err == nil { - return nil - } - v := reflect.ValueOf(err) - re := ResponseError{ - Type: v.Type().String(), - Message: err.Error(), - } - - if v.Kind() == reflect.Struct { - re.Parameters = make(map[string]interface{}) - for i := 0; i < v.NumField(); i++ { - field := v.Type().Field(i) - re.Parameters[field.Name] = v.Field(i).Interface() - } - } - return &re -} - -// Unwrap returns the underlying error if it can be reconstructed, or the -// original ResponseError otherwise. -func (err *ResponseError) Unwrap() error { - var errVal reflect.Value - var zeroVal reflect.Value - - switch err.Type { - case "storagedriver.PathNotFoundError": - errVal = reflect.ValueOf(&storagedriver.PathNotFoundError{}) - case "storagedriver.InvalidOffsetError": - errVal = reflect.ValueOf(&storagedriver.InvalidOffsetError{}) - } - if errVal == zeroVal { - return err - } - - for k, v := range err.Parameters { - fieldVal := errVal.Elem().FieldByName(k) - if fieldVal == zeroVal { - return err - } - fieldVal.Set(reflect.ValueOf(v)) - } - - if unwrapped, ok := errVal.Elem().Interface().(error); ok { - return unwrapped - } - - return err - -} - -func (err *ResponseError) Error() string { - return fmt.Sprintf("%s: %s", err.Type, err.Message) -} - -// IPC method call response object definitions - -// VersionResponse is a response for a Version request -type VersionResponse struct { - Version storagedriver.Version `codec:",omitempty"` - Error *ResponseError `codec:",omitempty"` -} - -// ReadStreamResponse is a response for a ReadStream request -type ReadStreamResponse struct { - Reader io.ReadCloser `codec:",omitempty"` - Error *ResponseError `codec:",omitempty"` -} - -// WriteStreamResponse is a response for a WriteStream request -type WriteStreamResponse struct { - Error *ResponseError `codec:",omitempty"` -} - -// CurrentSizeResponse is a response for a CurrentSize request -type CurrentSizeResponse struct { - Position uint64 `codec:",omitempty"` - Error *ResponseError `codec:",omitempty"` -} - -// ListResponse is a response for a List request -type ListResponse struct { - Keys []string `codec:",omitempty"` - Error *ResponseError `codec:",omitempty"` -} - -// MoveResponse is a response for a Move request -type MoveResponse struct { - Error *ResponseError `codec:",omitempty"` -} - -// DeleteResponse is a response for a Delete request -type DeleteResponse struct { - Error *ResponseError `codec:",omitempty"` -} diff --git a/registry/storage/driver/ipc/server.go b/registry/storage/driver/ipc/server.go deleted file mode 100644 index 1752f12b..00000000 --- a/registry/storage/driver/ipc/server.go +++ /dev/null @@ -1,178 +0,0 @@ -// +build ignore - -package ipc - -import ( - "bytes" - "io" - "io/ioutil" - "net" - "os" - "reflect" - - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/libchan" - "github.com/docker/libchan/spdy" -) - -// StorageDriverServer runs a new IPC server handling requests for the given -// storagedriver.StorageDriver -// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in -// client.go -// -// To create a new out-of-process driver, create a main package which calls StorageDriverServer with -// a storagedriver.StorageDriver -func StorageDriverServer(driver storagedriver.StorageDriver) error { - childSocket := os.NewFile(3, "childSocket") - defer childSocket.Close() - conn, err := net.FileConn(childSocket) - if err != nil { - panic(err) - } - defer conn.Close() - if transport, err := spdy.NewServerTransport(conn); err != nil { - panic(err) - } else { - for { - receiver, err := transport.WaitReceiveChannel() - if err == io.EOF { - return nil - } else if err != nil { - panic(err) - } - go receive(driver, receiver) - } - } -} - -// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to -// handle each request -// Requests are expected to be of type ipc.Request as the parameters are unknown until the request -// type is deserialized -func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { - for { - var request Request - err := receiver.Receive(&request) - if err == io.EOF { - return - } else if err != nil { - panic(err) - } - go handleRequest(driver, request) - } -} - -// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go -// Responds to requests using the Request.ResponseChannel -func handleRequest(driver storagedriver.StorageDriver, request Request) { - switch request.Type { - case "Version": - err := request.ResponseChannel.Send(&VersionResponse{Version: storagedriver.CurrentVersion}) - if err != nil { - panic(err) - } - case "GetContent": - path, _ := request.Parameters["Path"].(string) - content, err := driver.GetContent(path) - var response ReadStreamResponse - if err != nil { - response = ReadStreamResponse{Error: WrapError(err)} - } else { - response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))} - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "PutContent": - path, _ := request.Parameters["Path"].(string) - reader, _ := request.Parameters["Reader"].(io.ReadCloser) - contents, err := ioutil.ReadAll(reader) - defer reader.Close() - if err == nil { - err = driver.PutContent(path, contents) - } - response := WriteStreamResponse{ - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "ReadStream": - path, _ := request.Parameters["Path"].(string) - // Depending on serialization method, Offset may be converted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() - reader, err := driver.ReadStream(path, offset) - var response ReadStreamResponse - if err != nil { - response = ReadStreamResponse{Error: WrapError(err)} - } else { - response = ReadStreamResponse{Reader: reader} - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "WriteStream": - path, _ := request.Parameters["Path"].(string) - // Depending on serialization method, Offset may be converted to any int/uint type - offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int() - // Depending on serialization method, Size may be converted to any int/uint type - size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int() - reader, _ := request.Parameters["Reader"].(io.ReadCloser) - err := driver.WriteStream(path, offset, size, reader) - response := WriteStreamResponse{ - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "CurrentSize": - path, _ := request.Parameters["Path"].(string) - position, err := driver.CurrentSize(path) - response := CurrentSizeResponse{ - Position: position, - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "List": - path, _ := request.Parameters["Path"].(string) - keys, err := driver.List(path) - response := ListResponse{ - Keys: keys, - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "Move": - sourcePath, _ := request.Parameters["SourcePath"].(string) - destPath, _ := request.Parameters["DestPath"].(string) - err := driver.Move(sourcePath, destPath) - response := MoveResponse{ - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - case "Delete": - path, _ := request.Parameters["Path"].(string) - err := driver.Delete(path) - response := DeleteResponse{ - Error: WrapError(err), - } - err = request.ResponseChannel.Send(&response) - if err != nil { - panic(err) - } - default: - panic(request) - } -} diff --git a/registry/storage/driver/rados/rados_test.go b/registry/storage/driver/rados/rados_test.go index d408519b..ce367fb5 100644 --- a/registry/storage/driver/rados/rados_test.go +++ b/registry/storage/driver/rados/rados_test.go @@ -36,5 +36,5 @@ func init() { return "" } - testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) + testsuites.RegisterSuite(driverConstructor, skipCheck) } diff --git a/registry/storage/driver/s3/s3_test.go b/registry/storage/driver/s3/s3_test.go index c608e454..70172a6d 100644 --- a/registry/storage/driver/s3/s3_test.go +++ b/registry/storage/driver/s3/s3_test.go @@ -17,7 +17,8 @@ import ( // Hook up gocheck into the "go test" runner. func Test(t *testing.T) { check.TestingT(t) } -type S3DriverConstructor func(rootDirectory string) (*Driver, error) +var s3DriverConstructor func(rootDirectory string) (*Driver, error) +var skipS3 func() string func init() { accessKey := os.Getenv("AWS_ACCESS_KEY") @@ -33,7 +34,7 @@ func init() { } defer os.Remove(root) - s3DriverConstructor := func(rootDirectory string) (*Driver, error) { + s3DriverConstructor = func(rootDirectory string) (*Driver, error) { encryptBool := false if encrypt != "" { encryptBool, err = strconv.ParseBool(encrypt) @@ -74,79 +75,64 @@ func init() { } // Skip S3 storage driver tests if environment variable parameters are not provided - skipCheck := func() string { + skipS3 = func() string { if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" { return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests" } return "" } - driverConstructor := func() (storagedriver.StorageDriver, error) { + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { return s3DriverConstructor(root) + }, skipS3) +} + +func TestEmptyRootList(t *testing.T) { + if skipS3() != "" { + t.Skip(skipS3()) } - testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) - - // s3Constructor := func() (*Driver, error) { - // return s3DriverConstructor(aws.GetRegion(region)) - // } - - RegisterS3DriverSuite(s3DriverConstructor, skipCheck) - - // testsuites.RegisterIPCSuite(driverName, map[string]string{ - // "accesskey": accessKey, - // "secretkey": secretKey, - // "region": region.Name, - // "bucket": bucket, - // "encrypt": encrypt, - // }, skipCheck) - // } -} - -func RegisterS3DriverSuite(s3DriverConstructor S3DriverConstructor, skipCheck testsuites.SkipCheck) { - check.Suite(&S3DriverSuite{ - Constructor: s3DriverConstructor, - SkipCheck: skipCheck, - }) -} - -type S3DriverSuite struct { - Constructor S3DriverConstructor - testsuites.SkipCheck -} - -func (suite *S3DriverSuite) SetUpSuite(c *check.C) { - if reason := suite.SkipCheck(); reason != "" { - c.Skip(reason) - } -} - -func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) { validRoot, err := ioutil.TempDir("", "driver-") - c.Assert(err, check.IsNil) + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } defer os.Remove(validRoot) - rootedDriver, err := suite.Constructor(validRoot) - c.Assert(err, check.IsNil) - emptyRootDriver, err := suite.Constructor("") - c.Assert(err, check.IsNil) - slashRootDriver, err := suite.Constructor("/") - c.Assert(err, check.IsNil) + rootedDriver, err := s3DriverConstructor(validRoot) + if err != nil { + t.Fatalf("unexpected error creating rooted driver: %v", err) + } + + emptyRootDriver, err := s3DriverConstructor("") + if err != nil { + t.Fatalf("unexpected error creating empty root driver: %v", err) + } + + slashRootDriver, err := s3DriverConstructor("/") + if err != nil { + t.Fatalf("unexpected error creating slash root driver: %v", err) + } filename := "/test" contents := []byte("contents") ctx := context.Background() err = rootedDriver.PutContent(ctx, filename, contents) - c.Assert(err, check.IsNil) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } defer rootedDriver.Delete(ctx, filename) keys, err := emptyRootDriver.List(ctx, "/") for _, path := range keys { - c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } } keys, err = slashRootDriver.List(ctx, "/") for _, path := range keys { - c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } } } diff --git a/registry/storage/driver/testsuites/testsuites.go b/registry/storage/driver/testsuites/testsuites.go index 9185ebbc..96231480 100644 --- a/registry/storage/driver/testsuites/testsuites.go +++ b/registry/storage/driver/testsuites/testsuites.go @@ -22,9 +22,9 @@ import ( // Test hooks up gocheck into the "go test" runner. func Test(t *testing.T) { check.TestingT(t) } -// RegisterInProcessSuite registers an in-process storage driver test suite with +// RegisterSuite registers an in-process storage driver test suite with // the go test runner. -func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { +func RegisterSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) { check.Suite(&DriverSuite{ Constructor: driverConstructor, SkipCheck: skipCheck, @@ -32,39 +32,6 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC }) } -// RegisterIPCSuite registers a storage driver test suite which runs the named -// driver as a child process with the given parameters. -func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) { - panic("ipc testing is disabled for now") - - // NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code - // block before and remove the panic when we phase it back in. - - // suite := &DriverSuite{ - // Constructor: func() (storagedriver.StorageDriver, error) { - // d, err := ipc.NewDriverClient(driverName, ipcParams) - // if err != nil { - // return nil, err - // } - // err = d.Start() - // if err != nil { - // return nil, err - // } - // return d, nil - // }, - // SkipCheck: skipCheck, - // } - // suite.Teardown = func() error { - // if suite.StorageDriver == nil { - // return nil - // } - - // driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) - // return driverClient.Stop() - // } - // check.Suite(suite) -} - // SkipCheck is a function used to determine if a test suite should be skipped. // If a SkipCheck returns a non-empty skip reason, the suite is skipped with // the given reason. @@ -82,9 +49,8 @@ type DriverConstructor func() (storagedriver.StorageDriver, error) type DriverTeardown func() error // DriverSuite is a gocheck test suite designed to test a -// storagedriver.StorageDriver. -// The intended way to create a DriverSuite is with RegisterInProcessSuite or -// RegisterIPCSuite. +// storagedriver.StorageDriver. The intended way to create a DriverSuite is +// with RegisterSuite. type DriverSuite struct { Constructor DriverConstructor Teardown DriverTeardown @@ -841,10 +807,6 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) { // TestConcurrentFileStreams checks that multiple *os.File objects can be passed // in to WriteStream concurrently without hanging. func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { - // if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC { - // c.Skip("Need to fix out-of-process concurrency") - // } - numStreams := 32 if testing.Short() {