Remove half-baked Storage Driver IPC support
This removes documentation and code related to IPC based storage driver plugins. The existence of this functionality was an original feature goal but is now not maintained and actively confusing incoming contributions. We will likely explore some driver plugin mechanism in the future but we don't need this laying around in the meantime. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
d745c5a2c9
commit
6167220cdd
@ -59,11 +59,5 @@ func init() {
|
||||
return ""
|
||||
}
|
||||
|
||||
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck)
|
||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
||||
// paramAccountName: accountName,
|
||||
// paramAccountKey: accountKey,
|
||||
// paramContainer: container,
|
||||
// paramRealm: realm,
|
||||
// }, skipCheck)
|
||||
testsuites.RegisterSuite(azureDriverConstructor, skipCheck)
|
||||
}
|
||||
|
@ -33,30 +33,14 @@ func Register(name string, factory StorageDriverFactory) {
|
||||
driverFactories[name] = factory
|
||||
}
|
||||
|
||||
// Create a new storagedriver.StorageDriver with the given name and parameters
|
||||
// To run in-process, the StorageDriverFactory must first be registered with the given name
|
||||
// If no in-process drivers are found with the given name, this attempts to create an IPC driver
|
||||
// If no in-process or external drivers are found, an InvalidStorageDriverError is returned
|
||||
// Create a new storagedriver.StorageDriver with the given name and
|
||||
// parameters. To use a driver, the StorageDriverFactory must first be
|
||||
// registered with the given name. If no drivers are found, an
|
||||
// InvalidStorageDriverError is returned
|
||||
func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
driverFactory, ok := driverFactories[name]
|
||||
if !ok {
|
||||
return nil, InvalidStorageDriverError{name}
|
||||
|
||||
// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
|
||||
// server and client need to be updated for the changed API calls and
|
||||
// there were some problems libchan hanging. We'll phase this
|
||||
// functionality back in over the next few weeks.
|
||||
|
||||
// No registered StorageDriverFactory found, try ipc
|
||||
// driverClient, err := ipc.NewDriverClient(name, parameters)
|
||||
// if err != nil {
|
||||
// return nil, InvalidStorageDriverError{name}
|
||||
// }
|
||||
// err = driverClient.Start()
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return driverClient, nil
|
||||
}
|
||||
return driverFactory.Create(parameters)
|
||||
}
|
||||
|
@ -20,10 +20,7 @@ func init() {
|
||||
}
|
||||
defer os.Remove(root)
|
||||
|
||||
testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) {
|
||||
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||
return New(root), nil
|
||||
}, testsuites.NeverSkip)
|
||||
|
||||
// BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later.
|
||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip)
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||
|
||||
"gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
@ -16,9 +15,5 @@ func init() {
|
||||
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||
return New(), nil
|
||||
}
|
||||
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
|
||||
|
||||
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
|
||||
// the problems with libchan.
|
||||
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
|
||||
testsuites.RegisterSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
|
||||
}
|
||||
|
@ -1,454 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package ipc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/libchan"
|
||||
"github.com/docker/libchan/spdy"
|
||||
)
|
||||
|
||||
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver
|
||||
// loader expects driver executables to begin with. For example, the s3 driver
|
||||
// should be named "registry-storagedriver-s3".
|
||||
const StorageDriverExecutablePrefix = "registry-storagedriver-"
|
||||
|
||||
// StorageDriverClient is a storagedriver.StorageDriver implementation using a
|
||||
// managed child process communicating over IPC using libchan with a unix domain
|
||||
// socket
|
||||
type StorageDriverClient struct {
|
||||
subprocess *exec.Cmd
|
||||
exitChan chan error
|
||||
exitErr error
|
||||
stopChan chan struct{}
|
||||
socket *os.File
|
||||
transport *spdy.Transport
|
||||
sender libchan.Sender
|
||||
version storagedriver.Version
|
||||
}
|
||||
|
||||
// NewDriverClient constructs a new out-of-process storage driver using the
|
||||
// driver name and configuration parameters
|
||||
// A user must call Start on this driver client before remote method calls can
|
||||
// be made
|
||||
//
|
||||
// Looks for drivers in the following locations in order:
|
||||
// - Storage drivers directory (to be determined, yet not implemented)
|
||||
// - $GOPATH/bin
|
||||
// - $PATH
|
||||
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
|
||||
paramsBytes, err := json.Marshal(parameters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
driverExecName := StorageDriverExecutablePrefix + name
|
||||
driverPath, err := exec.LookPath(driverExecName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
command := exec.Command(driverPath, string(paramsBytes))
|
||||
|
||||
return &StorageDriverClient{
|
||||
subprocess: command,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start starts the designated child process storage driver and binds a socket
|
||||
// to this process for IPC method calls
|
||||
func (driver *StorageDriverClient) Start() error {
|
||||
driver.exitErr = nil
|
||||
driver.exitChan = make(chan error)
|
||||
driver.stopChan = make(chan struct{})
|
||||
|
||||
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
|
||||
driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
|
||||
|
||||
driver.subprocess.Stdout = os.Stdout
|
||||
driver.subprocess.Stderr = os.Stderr
|
||||
driver.subprocess.ExtraFiles = []*os.File{childSocket}
|
||||
|
||||
if err = driver.subprocess.Start(); err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
go driver.handleSubprocessExit()
|
||||
|
||||
if err = childSocket.Close(); err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
connection, err := net.FileConn(driver.socket)
|
||||
if err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
driver.transport, err = spdy.NewClientTransport(connection)
|
||||
if err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
driver.sender, err = driver.transport.NewSendChannel()
|
||||
if err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the driver's version to determine compatibility
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
var response VersionResponse
|
||||
err = receiver.Receive(&response)
|
||||
if err != nil {
|
||||
driver.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return response.Error.Unwrap()
|
||||
}
|
||||
|
||||
driver.version = response.Version
|
||||
|
||||
if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() {
|
||||
return IncompatibleVersionError{driver.version}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the child process storage driver
|
||||
// storagedriver.StorageDriver methods called after Stop will fail
|
||||
func (driver *StorageDriverClient) Stop() error {
|
||||
var closeSenderErr, closeTransportErr, closeSocketErr, killErr error
|
||||
|
||||
if driver.sender != nil {
|
||||
closeSenderErr = driver.sender.Close()
|
||||
}
|
||||
if driver.transport != nil {
|
||||
closeTransportErr = driver.transport.Close()
|
||||
}
|
||||
if driver.socket != nil {
|
||||
closeSocketErr = driver.socket.Close()
|
||||
}
|
||||
if driver.subprocess != nil {
|
||||
killErr = driver.subprocess.Process.Kill()
|
||||
}
|
||||
if driver.stopChan != nil {
|
||||
close(driver.stopChan)
|
||||
}
|
||||
|
||||
if closeSenderErr != nil {
|
||||
return closeSenderErr
|
||||
} else if closeTransportErr != nil {
|
||||
return closeTransportErr
|
||||
} else if closeSocketErr != nil {
|
||||
return closeSocketErr
|
||||
}
|
||||
|
||||
return killErr
|
||||
}
|
||||
|
||||
// Implement the storagedriver.StorageDriver interface over IPC
|
||||
|
||||
// GetContent retrieves the content stored at "path" as a []byte.
|
||||
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
|
||||
if err := driver.exited(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
|
||||
params := map[string]interface{}{"Path": path}
|
||||
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := new(ReadStreamResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return nil, response.Error.Unwrap()
|
||||
}
|
||||
|
||||
defer response.Reader.Close()
|
||||
contents, err := ioutil.ReadAll(response.Reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
|
||||
if err := driver.exited(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
|
||||
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
|
||||
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response := new(WriteStreamResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
||||
// given byte offset.
|
||||
func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
||||
if err := driver.exited(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"Path": path, "Offset": offset}
|
||||
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := new(ReadStreamResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return nil, response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return response.Reader, nil
|
||||
}
|
||||
|
||||
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
||||
// designated by the given path.
|
||||
func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
|
||||
if err := driver.exited(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": reader}
|
||||
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response := new(WriteStreamResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
||||
// path.
|
||||
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
|
||||
if err := driver.exited(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"Path": path}
|
||||
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
response := new(CurrentSizeResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return 0, response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return response.Position, nil
|
||||
}
|
||||
|
||||
// List returns a list of the objects that are direct descendants of the given
|
||||
// path.
|
||||
func (driver *StorageDriverClient) List(path string) ([]string, error) {
|
||||
if err := driver.exited(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"Path": path}
|
||||
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := new(ListResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return nil, response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return response.Keys, nil
|
||||
}
|
||||
|
||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||
// object.
|
||||
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
|
||||
if err := driver.exited(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
|
||||
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response := new(MoveResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||
func (driver *StorageDriverClient) Delete(path string) error {
|
||||
if err := driver.exited(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
receiver, remoteSender := libchan.Pipe()
|
||||
params := map[string]interface{}{"Path": path}
|
||||
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response := new(DeleteResponse)
|
||||
err = driver.receiveResponse(receiver, response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.Error != nil {
|
||||
return response.Error.Unwrap()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleSubprocessExit populates the exit channel until we have explicitly
|
||||
// stopped the storage driver subprocess
|
||||
// Requests can select on driver.exitChan and response receiving and not hang if
|
||||
// the process exits
|
||||
func (driver *StorageDriverClient) handleSubprocessExit() {
|
||||
exitErr := driver.subprocess.Wait()
|
||||
if exitErr == nil {
|
||||
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
|
||||
} else {
|
||||
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
|
||||
}
|
||||
|
||||
driver.exitErr = exitErr
|
||||
|
||||
for {
|
||||
select {
|
||||
case driver.exitChan <- exitErr:
|
||||
case <-driver.stopChan:
|
||||
close(driver.exitChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// receiveResponse populates the response value with the next result from the
|
||||
// given receiver, or returns an error if receiving failed or the driver has
|
||||
// stopped
|
||||
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
|
||||
receiveChan := make(chan error, 1)
|
||||
go func(receiver libchan.Receiver, receiveChan chan<- error) {
|
||||
receiveChan <- receiver.Receive(response)
|
||||
}(receiver, receiveChan)
|
||||
|
||||
var err error
|
||||
var ok bool
|
||||
select {
|
||||
case err = <-receiveChan:
|
||||
case err, ok = <-driver.exitChan:
|
||||
if !ok {
|
||||
err = driver.exitErr
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// exited returns an exit error if the driver has exited or nil otherwise
|
||||
func (driver *StorageDriverClient) exited() error {
|
||||
select {
|
||||
case err, ok := <-driver.exitChan:
|
||||
if !ok {
|
||||
return driver.exitErr
|
||||
}
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
@ -1,148 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package ipc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/libchan"
|
||||
)
|
||||
|
||||
// StorageDriver is the interface which IPC storage drivers must implement. As external storage
|
||||
// drivers may be defined to use a different version of the storagedriver.StorageDriver interface,
|
||||
// we use an additional version check to determine compatiblity.
|
||||
type StorageDriver interface {
|
||||
// Version returns the storagedriver.StorageDriver interface version which this storage driver
|
||||
// implements, which is used to determine driver compatibility
|
||||
Version() (storagedriver.Version, error)
|
||||
}
|
||||
|
||||
// IncompatibleVersionError is returned when a storage driver is using an incompatible version of
|
||||
// the storagedriver.StorageDriver api
|
||||
type IncompatibleVersionError struct {
|
||||
version storagedriver.Version
|
||||
}
|
||||
|
||||
func (e IncompatibleVersionError) Error() string {
|
||||
return fmt.Sprintf("Incompatible storage driver version: %s", e.version)
|
||||
}
|
||||
|
||||
// Request defines a remote method call request
|
||||
// A return value struct is to be sent over the ResponseChannel
|
||||
type Request struct {
|
||||
Type string `codec:",omitempty"`
|
||||
Parameters map[string]interface{} `codec:",omitempty"`
|
||||
ResponseChannel libchan.Sender `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// ResponseError is a serializable error type.
|
||||
// The Type and Parameters may be used to reconstruct the same error on the
|
||||
// client side, falling back to using the Type and Message if this cannot be
|
||||
// done.
|
||||
type ResponseError struct {
|
||||
Type string `codec:",omitempty"`
|
||||
Message string `codec:",omitempty"`
|
||||
Parameters map[string]interface{} `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// WrapError wraps an error in a serializable struct containing the error's type
|
||||
// and message.
|
||||
func WrapError(err error) *ResponseError {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
v := reflect.ValueOf(err)
|
||||
re := ResponseError{
|
||||
Type: v.Type().String(),
|
||||
Message: err.Error(),
|
||||
}
|
||||
|
||||
if v.Kind() == reflect.Struct {
|
||||
re.Parameters = make(map[string]interface{})
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
field := v.Type().Field(i)
|
||||
re.Parameters[field.Name] = v.Field(i).Interface()
|
||||
}
|
||||
}
|
||||
return &re
|
||||
}
|
||||
|
||||
// Unwrap returns the underlying error if it can be reconstructed, or the
|
||||
// original ResponseError otherwise.
|
||||
func (err *ResponseError) Unwrap() error {
|
||||
var errVal reflect.Value
|
||||
var zeroVal reflect.Value
|
||||
|
||||
switch err.Type {
|
||||
case "storagedriver.PathNotFoundError":
|
||||
errVal = reflect.ValueOf(&storagedriver.PathNotFoundError{})
|
||||
case "storagedriver.InvalidOffsetError":
|
||||
errVal = reflect.ValueOf(&storagedriver.InvalidOffsetError{})
|
||||
}
|
||||
if errVal == zeroVal {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, v := range err.Parameters {
|
||||
fieldVal := errVal.Elem().FieldByName(k)
|
||||
if fieldVal == zeroVal {
|
||||
return err
|
||||
}
|
||||
fieldVal.Set(reflect.ValueOf(v))
|
||||
}
|
||||
|
||||
if unwrapped, ok := errVal.Elem().Interface().(error); ok {
|
||||
return unwrapped
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func (err *ResponseError) Error() string {
|
||||
return fmt.Sprintf("%s: %s", err.Type, err.Message)
|
||||
}
|
||||
|
||||
// IPC method call response object definitions
|
||||
|
||||
// VersionResponse is a response for a Version request
|
||||
type VersionResponse struct {
|
||||
Version storagedriver.Version `codec:",omitempty"`
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// ReadStreamResponse is a response for a ReadStream request
|
||||
type ReadStreamResponse struct {
|
||||
Reader io.ReadCloser `codec:",omitempty"`
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// WriteStreamResponse is a response for a WriteStream request
|
||||
type WriteStreamResponse struct {
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// CurrentSizeResponse is a response for a CurrentSize request
|
||||
type CurrentSizeResponse struct {
|
||||
Position uint64 `codec:",omitempty"`
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// ListResponse is a response for a List request
|
||||
type ListResponse struct {
|
||||
Keys []string `codec:",omitempty"`
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// MoveResponse is a response for a Move request
|
||||
type MoveResponse struct {
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
||||
|
||||
// DeleteResponse is a response for a Delete request
|
||||
type DeleteResponse struct {
|
||||
Error *ResponseError `codec:",omitempty"`
|
||||
}
|
@ -1,178 +0,0 @@
|
||||
// +build ignore
|
||||
|
||||
package ipc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/libchan"
|
||||
"github.com/docker/libchan/spdy"
|
||||
)
|
||||
|
||||
// StorageDriverServer runs a new IPC server handling requests for the given
|
||||
// storagedriver.StorageDriver
|
||||
// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in
|
||||
// client.go
|
||||
//
|
||||
// To create a new out-of-process driver, create a main package which calls StorageDriverServer with
|
||||
// a storagedriver.StorageDriver
|
||||
func StorageDriverServer(driver storagedriver.StorageDriver) error {
|
||||
childSocket := os.NewFile(3, "childSocket")
|
||||
defer childSocket.Close()
|
||||
conn, err := net.FileConn(childSocket)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
if transport, err := spdy.NewServerTransport(conn); err != nil {
|
||||
panic(err)
|
||||
} else {
|
||||
for {
|
||||
receiver, err := transport.WaitReceiveChannel()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go receive(driver, receiver)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to
|
||||
// handle each request
|
||||
// Requests are expected to be of type ipc.Request as the parameters are unknown until the request
|
||||
// type is deserialized
|
||||
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
|
||||
for {
|
||||
var request Request
|
||||
err := receiver.Receive(&request)
|
||||
if err == io.EOF {
|
||||
return
|
||||
} else if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go handleRequest(driver, request)
|
||||
}
|
||||
}
|
||||
|
||||
// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go
|
||||
// Responds to requests using the Request.ResponseChannel
|
||||
func handleRequest(driver storagedriver.StorageDriver, request Request) {
|
||||
switch request.Type {
|
||||
case "Version":
|
||||
err := request.ResponseChannel.Send(&VersionResponse{Version: storagedriver.CurrentVersion})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "GetContent":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
content, err := driver.GetContent(path)
|
||||
var response ReadStreamResponse
|
||||
if err != nil {
|
||||
response = ReadStreamResponse{Error: WrapError(err)}
|
||||
} else {
|
||||
response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))}
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "PutContent":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
||||
contents, err := ioutil.ReadAll(reader)
|
||||
defer reader.Close()
|
||||
if err == nil {
|
||||
err = driver.PutContent(path, contents)
|
||||
}
|
||||
response := WriteStreamResponse{
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "ReadStream":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
// Depending on serialization method, Offset may be converted to any int/uint type
|
||||
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
|
||||
reader, err := driver.ReadStream(path, offset)
|
||||
var response ReadStreamResponse
|
||||
if err != nil {
|
||||
response = ReadStreamResponse{Error: WrapError(err)}
|
||||
} else {
|
||||
response = ReadStreamResponse{Reader: reader}
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "WriteStream":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
// Depending on serialization method, Offset may be converted to any int/uint type
|
||||
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
|
||||
// Depending on serialization method, Size may be converted to any int/uint type
|
||||
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
|
||||
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
||||
err := driver.WriteStream(path, offset, size, reader)
|
||||
response := WriteStreamResponse{
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "CurrentSize":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
position, err := driver.CurrentSize(path)
|
||||
response := CurrentSizeResponse{
|
||||
Position: position,
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "List":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
keys, err := driver.List(path)
|
||||
response := ListResponse{
|
||||
Keys: keys,
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "Move":
|
||||
sourcePath, _ := request.Parameters["SourcePath"].(string)
|
||||
destPath, _ := request.Parameters["DestPath"].(string)
|
||||
err := driver.Move(sourcePath, destPath)
|
||||
response := MoveResponse{
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "Delete":
|
||||
path, _ := request.Parameters["Path"].(string)
|
||||
err := driver.Delete(path)
|
||||
response := DeleteResponse{
|
||||
Error: WrapError(err),
|
||||
}
|
||||
err = request.ResponseChannel.Send(&response)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
default:
|
||||
panic(request)
|
||||
}
|
||||
}
|
@ -36,5 +36,5 @@ func init() {
|
||||
return ""
|
||||
}
|
||||
|
||||
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
|
||||
testsuites.RegisterSuite(driverConstructor, skipCheck)
|
||||
}
|
||||
|
@ -17,7 +17,8 @@ import (
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
type S3DriverConstructor func(rootDirectory string) (*Driver, error)
|
||||
var s3DriverConstructor func(rootDirectory string) (*Driver, error)
|
||||
var skipS3 func() string
|
||||
|
||||
func init() {
|
||||
accessKey := os.Getenv("AWS_ACCESS_KEY")
|
||||
@ -33,7 +34,7 @@ func init() {
|
||||
}
|
||||
defer os.Remove(root)
|
||||
|
||||
s3DriverConstructor := func(rootDirectory string) (*Driver, error) {
|
||||
s3DriverConstructor = func(rootDirectory string) (*Driver, error) {
|
||||
encryptBool := false
|
||||
if encrypt != "" {
|
||||
encryptBool, err = strconv.ParseBool(encrypt)
|
||||
@ -74,79 +75,64 @@ func init() {
|
||||
}
|
||||
|
||||
// Skip S3 storage driver tests if environment variable parameters are not provided
|
||||
skipCheck := func() string {
|
||||
skipS3 = func() string {
|
||||
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
driverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||
return s3DriverConstructor(root)
|
||||
}, skipS3)
|
||||
}
|
||||
|
||||
func TestEmptyRootList(t *testing.T) {
|
||||
if skipS3() != "" {
|
||||
t.Skip(skipS3())
|
||||
}
|
||||
|
||||
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
|
||||
|
||||
// s3Constructor := func() (*Driver, error) {
|
||||
// return s3DriverConstructor(aws.GetRegion(region))
|
||||
// }
|
||||
|
||||
RegisterS3DriverSuite(s3DriverConstructor, skipCheck)
|
||||
|
||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
||||
// "accesskey": accessKey,
|
||||
// "secretkey": secretKey,
|
||||
// "region": region.Name,
|
||||
// "bucket": bucket,
|
||||
// "encrypt": encrypt,
|
||||
// }, skipCheck)
|
||||
// }
|
||||
}
|
||||
|
||||
func RegisterS3DriverSuite(s3DriverConstructor S3DriverConstructor, skipCheck testsuites.SkipCheck) {
|
||||
check.Suite(&S3DriverSuite{
|
||||
Constructor: s3DriverConstructor,
|
||||
SkipCheck: skipCheck,
|
||||
})
|
||||
}
|
||||
|
||||
type S3DriverSuite struct {
|
||||
Constructor S3DriverConstructor
|
||||
testsuites.SkipCheck
|
||||
}
|
||||
|
||||
func (suite *S3DriverSuite) SetUpSuite(c *check.C) {
|
||||
if reason := suite.SkipCheck(); reason != "" {
|
||||
c.Skip(reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) {
|
||||
validRoot, err := ioutil.TempDir("", "driver-")
|
||||
c.Assert(err, check.IsNil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||
}
|
||||
defer os.Remove(validRoot)
|
||||
|
||||
rootedDriver, err := suite.Constructor(validRoot)
|
||||
c.Assert(err, check.IsNil)
|
||||
emptyRootDriver, err := suite.Constructor("")
|
||||
c.Assert(err, check.IsNil)
|
||||
slashRootDriver, err := suite.Constructor("/")
|
||||
c.Assert(err, check.IsNil)
|
||||
rootedDriver, err := s3DriverConstructor(validRoot)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||
}
|
||||
|
||||
emptyRootDriver, err := s3DriverConstructor("")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||
}
|
||||
|
||||
slashRootDriver, err := s3DriverConstructor("/")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||
}
|
||||
|
||||
filename := "/test"
|
||||
contents := []byte("contents")
|
||||
ctx := context.Background()
|
||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||
c.Assert(err, check.IsNil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating content: %v", err)
|
||||
}
|
||||
defer rootedDriver.Delete(ctx, filename)
|
||||
|
||||
keys, err := emptyRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
|
||||
keys, err = slashRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,9 +22,9 @@ import (
|
||||
// Test hooks up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
// RegisterInProcessSuite registers an in-process storage driver test suite with
|
||||
// RegisterSuite registers an in-process storage driver test suite with
|
||||
// the go test runner.
|
||||
func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
|
||||
func RegisterSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
|
||||
check.Suite(&DriverSuite{
|
||||
Constructor: driverConstructor,
|
||||
SkipCheck: skipCheck,
|
||||
@ -32,39 +32,6 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterIPCSuite registers a storage driver test suite which runs the named
|
||||
// driver as a child process with the given parameters.
|
||||
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
|
||||
panic("ipc testing is disabled for now")
|
||||
|
||||
// NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code
|
||||
// block before and remove the panic when we phase it back in.
|
||||
|
||||
// suite := &DriverSuite{
|
||||
// Constructor: func() (storagedriver.StorageDriver, error) {
|
||||
// d, err := ipc.NewDriverClient(driverName, ipcParams)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// err = d.Start()
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return d, nil
|
||||
// },
|
||||
// SkipCheck: skipCheck,
|
||||
// }
|
||||
// suite.Teardown = func() error {
|
||||
// if suite.StorageDriver == nil {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
|
||||
// return driverClient.Stop()
|
||||
// }
|
||||
// check.Suite(suite)
|
||||
}
|
||||
|
||||
// SkipCheck is a function used to determine if a test suite should be skipped.
|
||||
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with
|
||||
// the given reason.
|
||||
@ -82,9 +49,8 @@ type DriverConstructor func() (storagedriver.StorageDriver, error)
|
||||
type DriverTeardown func() error
|
||||
|
||||
// DriverSuite is a gocheck test suite designed to test a
|
||||
// storagedriver.StorageDriver.
|
||||
// The intended way to create a DriverSuite is with RegisterInProcessSuite or
|
||||
// RegisterIPCSuite.
|
||||
// storagedriver.StorageDriver. The intended way to create a DriverSuite is
|
||||
// with RegisterSuite.
|
||||
type DriverSuite struct {
|
||||
Constructor DriverConstructor
|
||||
Teardown DriverTeardown
|
||||
@ -841,10 +807,6 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
|
||||
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
|
||||
// in to WriteStream concurrently without hanging.
|
||||
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
|
||||
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
|
||||
// c.Skip("Need to fix out-of-process concurrency")
|
||||
// }
|
||||
|
||||
numStreams := 32
|
||||
|
||||
if testing.Short() {
|
||||
|
Loading…
Reference in New Issue
Block a user