Merge pull request #756 from aibaars/storage-gcs
Storage driver for Google Cloud Storage
This commit is contained in:
commit
a31e93e1c4
3
docs/storage/driver/gcs/doc.go
Normal file
3
docs/storage/driver/gcs/doc.go
Normal file
@ -0,0 +1,3 @@
|
||||
// Package gcs implements the Google Cloud Storage driver backend. Support can be
|
||||
// enabled by including the "include_gcs" build tag.
|
||||
package gcs
|
623
docs/storage/driver/gcs/gcs.go
Normal file
623
docs/storage/driver/gcs/gcs.go
Normal file
@ -0,0 +1,623 @@
|
||||
// Package gcs provides a storagedriver.StorageDriver implementation to
|
||||
// store blobs in Google cloud storage.
|
||||
//
|
||||
// This package leverages the google.golang.org/cloud/storage client library
|
||||
//for interfacing with gcs.
|
||||
//
|
||||
// Because gcs is a key, value store the Stat call does not support last modification
|
||||
// time for directories (directories are an abstraction for key, value stores)
|
||||
//
|
||||
// Keep in mind that gcs guarantees only eventual consistency, so do not assume
|
||||
// that a successful write will mean immediate access to the data written (although
|
||||
// in most regions a new object put has guaranteed read after write). The only true
|
||||
// guarantee is that once you call Stat and receive a certain file size, that much of
|
||||
// the file is already accessible.
|
||||
//
|
||||
// +build include_gcs
|
||||
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
"google.golang.org/api/googleapi"
|
||||
storageapi "google.golang.org/api/storage/v1"
|
||||
"google.golang.org/cloud"
|
||||
"google.golang.org/cloud/storage"
|
||||
|
||||
ctx "github.com/docker/distribution/context"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/base"
|
||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||
)
|
||||
|
||||
const driverName = "gcs"
|
||||
const dummyProjectID = "<unknown>"
|
||||
|
||||
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
|
||||
type driverParameters struct {
|
||||
bucket string
|
||||
keyfile string
|
||||
rootDirectory string
|
||||
}
|
||||
|
||||
func init() {
|
||||
factory.Register(driverName, &gcsDriverFactory{})
|
||||
}
|
||||
|
||||
// gcsDriverFactory implements the factory.StorageDriverFactory interface
|
||||
type gcsDriverFactory struct{}
|
||||
|
||||
// Create StorageDriver from parameters
|
||||
func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
return FromParameters(parameters)
|
||||
}
|
||||
|
||||
// driver is a storagedriver.StorageDriver implementation backed by GCS
|
||||
// Objects are stored at absolute keys in the provided bucket.
|
||||
type driver struct {
|
||||
client *http.Client
|
||||
bucket string
|
||||
email string
|
||||
privateKey []byte
|
||||
rootDirectory string
|
||||
}
|
||||
|
||||
// FromParameters constructs a new Driver with a given parameters map
|
||||
// Required parameters:
|
||||
// - bucket
|
||||
func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
|
||||
bucket, ok := parameters["bucket"]
|
||||
if !ok || fmt.Sprint(bucket) == "" {
|
||||
return nil, fmt.Errorf("No bucket parameter provided")
|
||||
}
|
||||
|
||||
keyfile, ok := parameters["keyfile"]
|
||||
if !ok {
|
||||
keyfile = ""
|
||||
}
|
||||
|
||||
rootDirectory, ok := parameters["rootdirectory"]
|
||||
if !ok {
|
||||
rootDirectory = ""
|
||||
}
|
||||
params := driverParameters{
|
||||
fmt.Sprint(bucket),
|
||||
fmt.Sprint(keyfile),
|
||||
fmt.Sprint(rootDirectory),
|
||||
}
|
||||
|
||||
return New(params)
|
||||
}
|
||||
|
||||
// New constructs a new driver
|
||||
func New(params driverParameters) (storagedriver.StorageDriver, error) {
|
||||
var ts oauth2.TokenSource
|
||||
var err error
|
||||
rootDirectory := strings.Trim(params.rootDirectory, "/")
|
||||
if rootDirectory != "" {
|
||||
rootDirectory += "/"
|
||||
}
|
||||
d := &driver{
|
||||
bucket: params.bucket,
|
||||
rootDirectory: rootDirectory,
|
||||
}
|
||||
if params.keyfile == "" {
|
||||
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
jsonKey, err := ioutil.ReadFile(params.keyfile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conf, err := google.JWTConfigFromJSON(
|
||||
jsonKey,
|
||||
storage.ScopeFullControl,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ts = conf.TokenSource(context.Background())
|
||||
d.email = conf.Email
|
||||
d.privateKey = conf.PrivateKey
|
||||
}
|
||||
client := oauth2.NewClient(context.Background(), ts)
|
||||
d.client = client
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &base.Base{
|
||||
StorageDriver: d,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Implement the storagedriver.StorageDriver interface
|
||||
|
||||
func (d *driver) Name() string {
|
||||
return driverName
|
||||
}
|
||||
|
||||
// GetContent retrieves the content stored at "path" as a []byte.
|
||||
// This should primarily be used for small objects.
|
||||
func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) {
|
||||
rc, err := d.ReadStream(context, path, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
p, err := ioutil.ReadAll(rc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
// This should primarily be used for small objects.
|
||||
func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error {
|
||||
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
|
||||
wc.ContentType = "application/octet-stream"
|
||||
defer wc.Close()
|
||||
_, err := wc.Write(contents)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path"
|
||||
// with a given byte offset.
|
||||
// May be used to resume reading a stream by providing a nonzero offset.
|
||||
func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
name := d.pathToKey(path)
|
||||
|
||||
// copied from google.golang.org/cloud/storage#NewReader :
|
||||
// to set the additional "Range" header
|
||||
u := &url.URL{
|
||||
Scheme: "https",
|
||||
Host: "storage.googleapis.com",
|
||||
Path: fmt.Sprintf("/%s/%s", d.bucket, name),
|
||||
}
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if offset > 0 {
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
|
||||
}
|
||||
res, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
res.Body.Close()
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
||||
res.Body.Close()
|
||||
obj, err := storage.StatObject(d.context(context), d.bucket, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if offset == int64(obj.Size) {
|
||||
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
||||
}
|
||||
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
||||
}
|
||||
if res.StatusCode < 200 || res.StatusCode > 299 {
|
||||
res.Body.Close()
|
||||
return nil, fmt.Errorf("storage: can't read object %v/%v, status code: %v", d.bucket, name, res.Status)
|
||||
}
|
||||
return res.Body, nil
|
||||
}
|
||||
|
||||
// WriteStream stores the contents of the provided io.ReadCloser at a
|
||||
// location designated by the given path.
|
||||
// May be used to resume writing a stream by providing a nonzero offset.
|
||||
// The offset must be no larger than the CurrentSize for this path.
|
||||
func (d *driver) WriteStream(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
||||
if offset < 0 {
|
||||
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
||||
}
|
||||
|
||||
if offset == 0 {
|
||||
return d.writeCompletely(context, path, 0, reader)
|
||||
}
|
||||
|
||||
service, err := storageapi.New(d.client)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
objService := storageapi.NewObjectsService(service)
|
||||
var obj *storageapi.Object
|
||||
err = retry(5, func() error {
|
||||
o, err := objService.Get(d.bucket, d.pathToKey(path)).Do()
|
||||
obj = o
|
||||
return err
|
||||
})
|
||||
// obj, err := retry(5, objService.Get(d.bucket, d.pathToKey(path)).Do)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// cannot append more chunks, so redo from scratch
|
||||
if obj.ComponentCount >= 1023 {
|
||||
return d.writeCompletely(context, path, offset, reader)
|
||||
}
|
||||
|
||||
// skip from reader
|
||||
objSize := int64(obj.Size)
|
||||
nn, err := skip(reader, objSize-offset)
|
||||
if err != nil {
|
||||
return nn, err
|
||||
}
|
||||
|
||||
// Size <= offset
|
||||
partName := fmt.Sprintf("%v#part-%d#", d.pathToKey(path), obj.ComponentCount)
|
||||
gcsContext := d.context(context)
|
||||
wc := storage.NewWriter(gcsContext, d.bucket, partName)
|
||||
wc.ContentType = "application/octet-stream"
|
||||
|
||||
if objSize < offset {
|
||||
err = writeZeros(wc, offset-objSize)
|
||||
if err != nil {
|
||||
wc.CloseWithError(err)
|
||||
return nn, err
|
||||
}
|
||||
}
|
||||
n, err := io.Copy(wc, reader)
|
||||
if err != nil {
|
||||
wc.CloseWithError(err)
|
||||
return nn, err
|
||||
}
|
||||
err = wc.Close()
|
||||
if err != nil {
|
||||
return nn, err
|
||||
}
|
||||
// wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end
|
||||
// of the function
|
||||
defer storage.DeleteObject(gcsContext, d.bucket, partName)
|
||||
|
||||
req := &storageapi.ComposeRequest{
|
||||
Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType},
|
||||
SourceObjects: []*storageapi.ComposeRequestSourceObjects{
|
||||
{
|
||||
Name: obj.Name,
|
||||
Generation: obj.Generation,
|
||||
}, {
|
||||
Name: partName,
|
||||
Generation: wc.Object().Generation,
|
||||
}},
|
||||
}
|
||||
|
||||
err = retry(5, func() error { _, err := objService.Compose(d.bucket, obj.Name, req).Do(); return err })
|
||||
if err == nil {
|
||||
nn = nn + n
|
||||
}
|
||||
|
||||
return nn, err
|
||||
}
|
||||
|
||||
type request func() error
|
||||
|
||||
func retry(maxTries int, req request) error {
|
||||
backoff := time.Second
|
||||
var err error
|
||||
for i := 0; i < maxTries; i++ {
|
||||
err := req()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
status := err.(*googleapi.Error)
|
||||
if status == nil || (status.Code != 429 && status.Code < http.StatusInternalServerError) {
|
||||
return err
|
||||
}
|
||||
|
||||
time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
|
||||
if i <= 4 {
|
||||
backoff = backoff * 2
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *driver) writeCompletely(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
||||
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
|
||||
wc.ContentType = "application/octet-stream"
|
||||
defer wc.Close()
|
||||
|
||||
// Copy the first offset bytes of the existing contents
|
||||
// (padded with zeros if needed) into the writer
|
||||
if offset > 0 {
|
||||
existing, err := d.ReadStream(context, path, 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer existing.Close()
|
||||
n, err := io.CopyN(wc, existing, offset)
|
||||
if err == io.EOF {
|
||||
err = writeZeros(wc, offset-n)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return io.Copy(wc, reader)
|
||||
}
|
||||
|
||||
func skip(reader io.Reader, count int64) (int64, error) {
|
||||
if count <= 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return io.CopyN(ioutil.Discard, reader, count)
|
||||
}
|
||||
|
||||
func writeZeros(wc io.Writer, count int64) error {
|
||||
buf := make([]byte, 32*1024)
|
||||
for count > 0 {
|
||||
size := cap(buf)
|
||||
if int64(size) > count {
|
||||
size = int(count)
|
||||
}
|
||||
n, err := wc.Write(buf[0:size])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count = count - int64(n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat retrieves the FileInfo for the given path, including the current
|
||||
// size in bytes and the creation time.
|
||||
func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) {
|
||||
var fi storagedriver.FileInfoFields
|
||||
//try to get as file
|
||||
gcsContext := d.context(context)
|
||||
obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||
if err == nil {
|
||||
fi = storagedriver.FileInfoFields{
|
||||
Path: path,
|
||||
Size: obj.Size,
|
||||
ModTime: obj.Updated,
|
||||
IsDir: false,
|
||||
}
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
}
|
||||
//try to get as folder
|
||||
dirpath := d.pathToDirKey(path)
|
||||
|
||||
var query *storage.Query
|
||||
query = &storage.Query{}
|
||||
query.Prefix = dirpath
|
||||
query.MaxResults = 1
|
||||
|
||||
objects, err := storage.ListObjects(gcsContext, d.bucket, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(objects.Results) < 1 {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
fi = storagedriver.FileInfoFields{
|
||||
Path: path,
|
||||
IsDir: true,
|
||||
}
|
||||
obj = objects.Results[0]
|
||||
if obj.Name == dirpath {
|
||||
fi.Size = obj.Size
|
||||
fi.ModTime = obj.Updated
|
||||
}
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
}
|
||||
|
||||
// List returns a list of the objects that are direct descendants of the
|
||||
//given path.
|
||||
func (d *driver) List(context ctx.Context, path string) ([]string, error) {
|
||||
var query *storage.Query
|
||||
query = &storage.Query{}
|
||||
query.Delimiter = "/"
|
||||
query.Prefix = d.pathToDirKey(path)
|
||||
list := make([]string, 0, 64)
|
||||
for {
|
||||
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, object := range objects.Results {
|
||||
// GCS does not guarantee strong consistency between
|
||||
// DELETE and LIST operationsCheck that the object is not deleted,
|
||||
// so filter out any objects with a non-zero time-deleted
|
||||
if object.Deleted.IsZero() {
|
||||
name := object.Name
|
||||
// Ignore objects with names that end with '#' (these are uploaded parts)
|
||||
if name[len(name)-1] != '#' {
|
||||
name = d.keyToPath(name)
|
||||
list = append(list, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, subpath := range objects.Prefixes {
|
||||
subpath = d.keyToPath(subpath)
|
||||
list = append(list, subpath)
|
||||
}
|
||||
query = objects.Next
|
||||
if query == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
// Move moves an object stored at sourcePath to destPath, removing the
|
||||
// original object.
|
||||
func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error {
|
||||
prefix := d.pathToDirKey(sourcePath)
|
||||
gcsContext := d.context(context)
|
||||
keys, err := d.listAll(gcsContext, prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(keys) > 0 {
|
||||
destPrefix := d.pathToDirKey(destPath)
|
||||
copies := make([]string, 0, len(keys))
|
||||
sort.Strings(keys)
|
||||
var err error
|
||||
for _, key := range keys {
|
||||
dest := destPrefix + key[len(prefix):]
|
||||
_, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil)
|
||||
if err == nil {
|
||||
copies = append(copies, dest)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
// if an error occurred, attempt to cleanup the copies made
|
||||
if err != nil {
|
||||
for i := len(copies) - 1; i >= 0; i-- {
|
||||
_ = storage.DeleteObject(gcsContext, d.bucket, copies[i])
|
||||
}
|
||||
return err
|
||||
}
|
||||
// delete originals
|
||||
for i := len(keys) - 1; i >= 0; i-- {
|
||||
err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i])
|
||||
if err2 != nil {
|
||||
err = err2
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
_, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
|
||||
if err != nil {
|
||||
if status := err.(*googleapi.Error); status != nil {
|
||||
if status.Code == http.StatusNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: sourcePath}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
|
||||
}
|
||||
|
||||
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
|
||||
func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
|
||||
list := make([]string, 0, 64)
|
||||
query := &storage.Query{}
|
||||
query.Prefix = prefix
|
||||
query.Versions = false
|
||||
for {
|
||||
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range objects.Results {
|
||||
// GCS does not guarantee strong consistency between
|
||||
// DELETE and LIST operationsCheck that the object is not deleted,
|
||||
// so filter out any objects with a non-zero time-deleted
|
||||
if obj.Deleted.IsZero() {
|
||||
list = append(list, obj.Name)
|
||||
}
|
||||
}
|
||||
query = objects.Next
|
||||
if query == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||
func (d *driver) Delete(context ctx.Context, path string) error {
|
||||
prefix := d.pathToDirKey(path)
|
||||
gcsContext := d.context(context)
|
||||
keys, err := d.listAll(gcsContext, prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(keys) > 0 {
|
||||
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
|
||||
for _, key := range keys {
|
||||
if err := storage.DeleteObject(gcsContext, d.bucket, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path))
|
||||
if err != nil {
|
||||
if status := err.(*googleapi.Error); status != nil {
|
||||
if status.Code == http.StatusNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// URLFor returns a URL which may be used to retrieve the content stored at
|
||||
// the given path, possibly using the given options.
|
||||
// Returns ErrUnsupportedMethod if this driver has no privateKey
|
||||
func (d *driver) URLFor(context ctx.Context, path string, options map[string]interface{}) (string, error) {
|
||||
if d.privateKey == nil {
|
||||
return "", storagedriver.ErrUnsupportedMethod
|
||||
}
|
||||
|
||||
name := d.pathToKey(path)
|
||||
methodString := "GET"
|
||||
method, ok := options["method"]
|
||||
if ok {
|
||||
methodString, ok = method.(string)
|
||||
if !ok || (methodString != "GET" && methodString != "HEAD") {
|
||||
return "", storagedriver.ErrUnsupportedMethod
|
||||
}
|
||||
}
|
||||
|
||||
expiresTime := time.Now().Add(20 * time.Minute)
|
||||
expires, ok := options["expiry"]
|
||||
if ok {
|
||||
et, ok := expires.(time.Time)
|
||||
if ok {
|
||||
expiresTime = et
|
||||
}
|
||||
}
|
||||
|
||||
opts := &storage.SignedURLOptions{
|
||||
GoogleAccessID: d.email,
|
||||
PrivateKey: d.privateKey,
|
||||
Method: methodString,
|
||||
Expires: expiresTime,
|
||||
}
|
||||
return storage.SignedURL(d.bucket, name, opts)
|
||||
}
|
||||
|
||||
func (d *driver) context(context ctx.Context) context.Context {
|
||||
return cloud.WithContext(context, dummyProjectID, d.client)
|
||||
}
|
||||
|
||||
func (d *driver) pathToKey(path string) string {
|
||||
return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")
|
||||
}
|
||||
|
||||
func (d *driver) pathToDirKey(path string) string {
|
||||
return d.pathToKey(path) + "/"
|
||||
}
|
||||
|
||||
func (d *driver) keyToPath(key string) string {
|
||||
return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
|
||||
}
|
106
docs/storage/driver/gcs/gcs_test.go
Normal file
106
docs/storage/driver/gcs/gcs_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
// +build include_gcs
|
||||
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
ctx "github.com/docker/distribution/context"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||
|
||||
"gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
var gcsDriverConstructor func(rootDirectory string) (storagedriver.StorageDriver, error)
|
||||
var skipGCS func() string
|
||||
|
||||
func init() {
|
||||
bucket := os.Getenv("REGISTRY_STORAGE_GCS_BUCKET")
|
||||
keyfile := os.Getenv("REGISTRY_STORAGE_GCS_KEYFILE")
|
||||
credentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
|
||||
|
||||
root, err := ioutil.TempDir("", "driver-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer os.Remove(root)
|
||||
|
||||
gcsDriverConstructor = func(rootDirectory string) (storagedriver.StorageDriver, error) {
|
||||
|
||||
parameters := driverParameters{
|
||||
bucket,
|
||||
keyfile,
|
||||
rootDirectory,
|
||||
}
|
||||
|
||||
return New(parameters)
|
||||
}
|
||||
|
||||
// Skip GCS storage driver tests if environment variable parameters are not provided
|
||||
skipGCS = func() string {
|
||||
if bucket == "" || (credentials == "" && keyfile == "") {
|
||||
return "Must set REGISTRY_STORAGE_GCS_BUCKET and (GOOGLE_APPLICATION_CREDENTIALS or REGISTRY_STORAGE_GCS_KEYFILE) to run GCS tests"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||
return gcsDriverConstructor(root)
|
||||
}, skipGCS)
|
||||
}
|
||||
|
||||
func TestEmptyRootList(t *testing.T) {
|
||||
if skipGCS() != "" {
|
||||
t.Skip(skipGCS())
|
||||
}
|
||||
|
||||
validRoot, err := ioutil.TempDir("", "driver-")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||
}
|
||||
defer os.Remove(validRoot)
|
||||
|
||||
rootedDriver, err := gcsDriverConstructor(validRoot)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||
}
|
||||
|
||||
emptyRootDriver, err := gcsDriverConstructor("")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||
}
|
||||
|
||||
slashRootDriver, err := gcsDriverConstructor("/")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||
}
|
||||
|
||||
filename := "/test"
|
||||
contents := []byte("contents")
|
||||
ctx := ctx.Background()
|
||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating content: %v", err)
|
||||
}
|
||||
defer rootedDriver.Delete(ctx, filename)
|
||||
|
||||
keys, err := emptyRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
|
||||
keys, err = slashRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user