65b0d73cb7
This change is slightly more complex than previous package maves in that the package name changed. To address this, we simply always reference the package driver as storagedriver to avoid compatbility issues with existing code. While unfortunate, this can be cleaned up over time. Signed-off-by: Stephen J Day <stephen.day@docker.com>
348 lines
9.2 KiB
Go
348 lines
9.2 KiB
Go
// Package azure provides a storagedriver.StorageDriver implementation to
|
|
// store blobs in Microsoft Azure Blob Storage Service.
|
|
package azure
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
"github.com/docker/distribution/registry/storage/driver/base"
|
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
|
|
|
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
|
|
)
|
|
|
|
const driverName = "azure"
|
|
|
|
const (
|
|
paramAccountName = "accountname"
|
|
paramAccountKey = "accountkey"
|
|
paramContainer = "container"
|
|
)
|
|
|
|
type driver struct {
|
|
client azure.BlobStorageClient
|
|
container string
|
|
}
|
|
|
|
type baseEmbed struct{ base.Base }
|
|
|
|
// Driver is a storagedriver.StorageDriver implementation backed by
|
|
// Microsoft Azure Blob Storage Service.
|
|
type Driver struct{ baseEmbed }
|
|
|
|
func init() {
|
|
factory.Register(driverName, &azureDriverFactory{})
|
|
}
|
|
|
|
type azureDriverFactory struct{}
|
|
|
|
func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
|
return FromParameters(parameters)
|
|
}
|
|
|
|
// FromParameters constructs a new Driver with a given parameters map.
|
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
|
accountName, ok := parameters[paramAccountName]
|
|
if !ok || fmt.Sprint(accountName) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
|
|
}
|
|
|
|
accountKey, ok := parameters[paramAccountKey]
|
|
if !ok || fmt.Sprint(accountKey) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
|
|
}
|
|
|
|
container, ok := parameters[paramContainer]
|
|
if !ok || fmt.Sprint(container) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
|
|
}
|
|
|
|
return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container))
|
|
}
|
|
|
|
// New constructs a new Driver with the given Azure Storage Account credentials
|
|
func New(accountName, accountKey, container string) (*Driver, error) {
|
|
api, err := azure.NewBasicClient(accountName, accountKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blobClient := api.GetBlobService()
|
|
|
|
// Create registry container
|
|
if _, err = blobClient.CreateContainerIfNotExists(container, azure.ContainerAccessTypePrivate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
d := &driver{
|
|
client: *blobClient,
|
|
container: container}
|
|
return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface.
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (d *driver) GetContent(path string) ([]byte, error) {
|
|
blob, err := d.client.GetBlob(d.container, path)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return ioutil.ReadAll(blob)
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (d *driver) PutContent(path string, contents []byte) error {
|
|
return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents)))
|
|
}
|
|
|
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return nil, err
|
|
} else if !ok {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
info, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
size := int64(info.ContentLength)
|
|
if offset >= size {
|
|
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
|
}
|
|
|
|
bytesRange := fmt.Sprintf("%v-", offset)
|
|
resp, err := d.client.GetBlobRange(d.container, path, bytesRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
|
// designated by the given path.
|
|
func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) {
|
|
if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return 0, err
|
|
} else if !blobExists {
|
|
err := d.client.CreateBlockBlob(d.container, path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
if offset < 0 {
|
|
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
|
}
|
|
|
|
bs := newAzureBlockStorage(d.client)
|
|
bw := newRandomBlobWriter(&bs, azure.MaxBlobBlockSize)
|
|
zw := newZeroFillWriter(&bw)
|
|
return zw.Write(d.container, path, offset, reader)
|
|
}
|
|
|
|
// Stat retrieves the FileInfo for the given path, including the current size
|
|
// in bytes and the creation time.
|
|
func (d *driver) Stat(path string) (storagedriver.FileInfo, error) {
|
|
// Check if the path is a blob
|
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return nil, err
|
|
} else if ok {
|
|
blob, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mtim, err := time.Parse(http.TimeFormat, blob.LastModified)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
|
Path: path,
|
|
Size: int64(blob.ContentLength),
|
|
ModTime: mtim,
|
|
IsDir: false,
|
|
}}, nil
|
|
}
|
|
|
|
// Check if path is a virtual container
|
|
virtContainerPath := path
|
|
if !strings.HasSuffix(virtContainerPath, "/") {
|
|
virtContainerPath += "/"
|
|
}
|
|
blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
|
Prefix: virtContainerPath,
|
|
MaxResults: 1,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(blobs.Blobs) > 0 {
|
|
// path is a virtual container
|
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
|
Path: path,
|
|
IsDir: true,
|
|
}}, nil
|
|
}
|
|
|
|
// path is not a blob or virtual container
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given
|
|
// path.
|
|
func (d *driver) List(path string) ([]string, error) {
|
|
if path == "/" {
|
|
path = ""
|
|
}
|
|
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return blobs, err
|
|
}
|
|
|
|
list := directDescendants(blobs, path)
|
|
return list, nil
|
|
}
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (d *driver) Move(sourcePath string, destPath string) error {
|
|
sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath)
|
|
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
|
}
|
|
return err
|
|
}
|
|
|
|
return d.client.DeleteBlob(d.container, sourcePath)
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (d *driver) Delete(path string) error {
|
|
ok, err := d.client.DeleteBlobIfExists(d.container, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
return nil // was a blob and deleted, return
|
|
}
|
|
|
|
// Not a blob, see if path is a virtual container with blobs
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, b := range blobs {
|
|
if err = d.client.DeleteBlob(d.container, b); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(blobs) == 0 {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// URLFor returns a publicly accessible URL for the blob stored at given path
|
|
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
|
|
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
|
|
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
|
|
expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
|
|
expires, ok := options["expiry"]
|
|
if ok {
|
|
t, ok := expires.(time.Time)
|
|
if ok {
|
|
expiresTime = t
|
|
}
|
|
}
|
|
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
|
|
}
|
|
|
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
|
// of from list of blob paths and will return their full paths. Elements in blobs
|
|
// list must be prefixed with a "/" and
|
|
//
|
|
// Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is
|
|
// {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"}
|
|
func directDescendants(blobs []string, prefix string) []string {
|
|
if !strings.HasPrefix(prefix, "/") { // add trailing '/'
|
|
prefix = "/" + prefix
|
|
}
|
|
if !strings.HasSuffix(prefix, "/") { // containerify the path
|
|
prefix += "/"
|
|
}
|
|
|
|
out := make(map[string]bool)
|
|
for _, b := range blobs {
|
|
if strings.HasPrefix(b, prefix) {
|
|
rel := b[len(prefix):]
|
|
c := strings.Count(rel, "/")
|
|
if c == 0 {
|
|
out[b] = true
|
|
} else {
|
|
out[prefix+rel[:strings.Index(rel, "/")]] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
var keys []string
|
|
for k := range out {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
|
|
if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path
|
|
virtPath += "/"
|
|
}
|
|
|
|
out := []string{}
|
|
marker := ""
|
|
for {
|
|
resp, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
|
Marker: marker,
|
|
Prefix: virtPath,
|
|
})
|
|
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
|
|
for _, b := range resp.Blobs {
|
|
out = append(out, b.Name)
|
|
}
|
|
|
|
if len(resp.Blobs) == 0 || resp.NextMarker == "" {
|
|
break
|
|
}
|
|
marker = resp.NextMarker
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func is404(err error) bool {
|
|
e, ok := err.(azure.StorageServiceError)
|
|
return ok && e.StatusCode == 404
|
|
}
|