66107df1af
This change brings the storagedriver API in line with the Go standard library's use of int64 for offsets. The main benefit is simplicity in interfacing with the io library reducing the number of type conversions in simple code.
353 lines
9.1 KiB
Go
353 lines
9.1 KiB
Go
// Package azure provides a storagedriver.StorageDriver implementation to
|
|
// store blobs in Microsoft Azure Blob Storage Service.
|
|
package azure
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/docker/docker-registry/storagedriver"
|
|
"github.com/docker/docker-registry/storagedriver/factory"
|
|
|
|
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
|
|
)
|
|
|
|
const driverName = "azure"
|
|
|
|
const (
|
|
paramAccountName = "accountname"
|
|
paramAccountKey = "accountkey"
|
|
paramContainer = "container"
|
|
)
|
|
|
|
// Driver is a storagedriver.StorageDriver implementation backed by
|
|
// Microsoft Azure Blob Storage Service.
|
|
type Driver struct {
|
|
client *azure.BlobStorageClient
|
|
container string
|
|
}
|
|
|
|
func init() {
|
|
factory.Register(driverName, &azureDriverFactory{})
|
|
}
|
|
|
|
type azureDriverFactory struct{}
|
|
|
|
func (factory *azureDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
|
return FromParameters(parameters)
|
|
}
|
|
|
|
// FromParameters constructs a new Driver with a given parameters map.
|
|
func FromParameters(parameters map[string]string) (*Driver, error) {
|
|
accountName, ok := parameters[paramAccountName]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
|
|
}
|
|
|
|
accountKey, ok := parameters[paramAccountKey]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
|
|
}
|
|
|
|
container, ok := parameters[paramContainer]
|
|
if !ok {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
|
|
}
|
|
|
|
return New(accountName, accountKey, container)
|
|
}
|
|
|
|
// New constructs a new Driver with the given Azure Storage Account credentials
|
|
func New(accountName, accountKey, container string) (*Driver, error) {
|
|
api, err := azure.NewBasicClient(accountName, accountKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blobClient := api.GetBlobService()
|
|
|
|
// Create registry container
|
|
if _, err = blobClient.CreateContainerIfNotExists(container, azure.ContainerAccessTypePrivate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Driver{
|
|
client: blobClient,
|
|
container: container}, nil
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface.
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (d *Driver) GetContent(path string) ([]byte, error) {
|
|
blob, err := d.client.GetBlob(d.container, path)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return ioutil.ReadAll(blob)
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (d *Driver) PutContent(path string, contents []byte) error {
|
|
return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents)))
|
|
}
|
|
|
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return nil, err
|
|
} else if !ok {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
size, err := d.CurrentSize(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if offset >= int64(size) {
|
|
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
|
}
|
|
|
|
bytesRange := fmt.Sprintf("%v-", offset)
|
|
resp, err := d.client.GetBlobRange(d.container, path, bytesRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
|
// designated by the given path.
|
|
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
|
|
var (
|
|
lastBlockNum int
|
|
resumableOffset int64
|
|
blocks []azure.Block
|
|
)
|
|
|
|
if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return err
|
|
} else if !blobExists { // new blob
|
|
lastBlockNum = 0
|
|
resumableOffset = 0
|
|
} else { // append
|
|
if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil {
|
|
return err
|
|
} else if len(parts.CommittedBlocks) == 0 {
|
|
lastBlockNum = 0
|
|
resumableOffset = 0
|
|
} else {
|
|
lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1]
|
|
if lastBlockNum, err = blockNum(lastBlock.Name); err != nil {
|
|
return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
|
|
}
|
|
|
|
var totalSize int64
|
|
for _, v := range parts.CommittedBlocks {
|
|
blocks = append(blocks, azure.Block{
|
|
Id: v.Name,
|
|
Status: azure.BlockStatusCommitted})
|
|
totalSize += int64(v.Size)
|
|
}
|
|
|
|
// NOTE: Azure driver currently supports only append mode (resumable
|
|
// index is exactly where the committed blocks of the blob end).
|
|
// In order to support writing to offsets other than last index,
|
|
// adjacent blocks overlapping with the [offset:offset+size] area
|
|
// must be fetched, splitted and should be overwritten accordingly.
|
|
// As the current use of this method is append only, that implementation
|
|
// is omitted.
|
|
resumableOffset = totalSize
|
|
}
|
|
}
|
|
|
|
if offset != resumableOffset {
|
|
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
|
}
|
|
|
|
// Put content
|
|
buf := make([]byte, azure.MaxBlobBlockSize)
|
|
for {
|
|
// Read chunks of exactly size N except the last chunk to
|
|
// maximize block size and minimize block count.
|
|
n, err := io.ReadFull(reader, buf)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
data := buf[:n]
|
|
blockID := toBlockID(lastBlockNum + 1)
|
|
if err = d.client.PutBlock(d.container, path, blockID, data); err != nil {
|
|
return err
|
|
}
|
|
blocks = append(blocks, azure.Block{
|
|
Id: blockID,
|
|
Status: azure.BlockStatusLatest})
|
|
lastBlockNum++
|
|
}
|
|
|
|
// Commit block list
|
|
return d.client.PutBlockList(d.container, path, blocks)
|
|
}
|
|
|
|
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
|
// path.
|
|
func (d *Driver) CurrentSize(path string) (uint64, error) {
|
|
props, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return props.ContentLength, nil
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given
|
|
// path.
|
|
func (d *Driver) List(path string) ([]string, error) {
|
|
if path == "/" {
|
|
path = ""
|
|
}
|
|
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return blobs, err
|
|
}
|
|
|
|
list := directDescendants(blobs, path)
|
|
return list, nil
|
|
}
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (d *Driver) Move(sourcePath string, destPath string) error {
|
|
sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath)
|
|
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
|
}
|
|
return err
|
|
}
|
|
|
|
return d.client.DeleteBlob(d.container, sourcePath)
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (d *Driver) Delete(path string) error {
|
|
ok, err := d.client.DeleteBlobIfExists(d.container, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
return nil // was a blob and deleted, return
|
|
}
|
|
|
|
// Not a blob, see if path is a virtual container with blobs
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, b := range blobs {
|
|
if err = d.client.DeleteBlob(d.container, b); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(blobs) == 0 {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
|
// of from list of blob paths and will return their full paths. Elements in blobs
|
|
// list must be prefixed with a "/" and
|
|
//
|
|
// Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is
|
|
// {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"}
|
|
func directDescendants(blobs []string, prefix string) []string {
|
|
if !strings.HasPrefix(prefix, "/") { // add trailing '/'
|
|
prefix = "/" + prefix
|
|
}
|
|
if !strings.HasSuffix(prefix, "/") { // containerify the path
|
|
prefix += "/"
|
|
}
|
|
|
|
out := make(map[string]bool)
|
|
for _, b := range blobs {
|
|
if strings.HasPrefix(b, prefix) {
|
|
rel := b[len(prefix):]
|
|
c := strings.Count(rel, "/")
|
|
if c == 0 {
|
|
out[b] = true
|
|
} else {
|
|
out[prefix+rel[:strings.Index(rel, "/")]] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
var keys []string
|
|
for k := range out {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
func (d *Driver) listBlobs(container, virtPath string) ([]string, error) {
|
|
if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path
|
|
virtPath += "/"
|
|
}
|
|
|
|
out := []string{}
|
|
marker := ""
|
|
for {
|
|
resp, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
|
Marker: marker,
|
|
Prefix: virtPath,
|
|
})
|
|
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
|
|
for _, b := range resp.Blobs {
|
|
out = append(out, b.Name)
|
|
}
|
|
|
|
if len(resp.Blobs) == 0 || resp.NextMarker == "" {
|
|
break
|
|
}
|
|
marker = resp.NextMarker
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func is404(err error) bool {
|
|
e, ok := err.(azure.StorageServiceError)
|
|
return ok && e.StatusCode == 404
|
|
}
|
|
|
|
func blockNum(b64Name string) (int, error) {
|
|
s, err := base64.StdEncoding.DecodeString(b64Name)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return strconv.Atoi(string(s))
|
|
}
|
|
|
|
func toBlockID(i int) string {
|
|
return base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(i)))
|
|
}
|