78d0660319
See #2077 for background. The PR #1438 which was not reviewed by azure folks basically introduced a race condition around uploads to the same blob by multiple clients concurrently as it used the "writer" type for PutContent(), introduced in #1438. This does chunked upload of blobs using "AppendBlob" type, which was not atomic. Usage of "writer" type and thus AppendBlobs on metadata files is currently not concurrency-safe and generally, they are not the right type of blob for the job. This patch fixes PutContent() to use the atomic upload operation that works for uploads smaller than 64 MB and creates blobs with "BlockBlob" type. To be backwards compatible, we query the type of the blob first and if it is not a "BlockBlob" we delete the blob first before doing an atomic PUT. This creates a small inconsistency/race window "only once". Once the blob is made "BlockBlob", it is overwritten with a single PUT atomicallly next time. Therefore, going forward, PutContent() will be producing BlockBlobs and it will silently migrate the AppendBlobs introduced in #1438 to BlockBlobs with this patch. Tested with existing code side by side, both registries with and without this patch work fine without breaking each other. So this should be good from a backwards/forward compatiblity perspective, with a cost of doing an extra HEAD checking the blob type. Fixes #2077. Signed-off-by: Ahmet Alp Balkan <ahmetalpbalkan@gmail.com>
506 lines
14 KiB
Go
506 lines
14 KiB
Go
// Package azure provides a storagedriver.StorageDriver implementation to
|
|
// store blobs in Microsoft Azure Blob Storage Service.
|
|
package azure
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/docker/distribution/context"
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
"github.com/docker/distribution/registry/storage/driver/base"
|
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
|
|
|
azure "github.com/Azure/azure-sdk-for-go/storage"
|
|
)
|
|
|
|
const driverName = "azure"
|
|
|
|
const (
|
|
paramAccountName = "accountname"
|
|
paramAccountKey = "accountkey"
|
|
paramContainer = "container"
|
|
paramRealm = "realm"
|
|
maxChunkSize = 4 * 1024 * 1024
|
|
)
|
|
|
|
type driver struct {
|
|
client azure.BlobStorageClient
|
|
container string
|
|
}
|
|
|
|
type baseEmbed struct{ base.Base }
|
|
|
|
// Driver is a storagedriver.StorageDriver implementation backed by
|
|
// Microsoft Azure Blob Storage Service.
|
|
type Driver struct{ baseEmbed }
|
|
|
|
func init() {
|
|
factory.Register(driverName, &azureDriverFactory{})
|
|
}
|
|
|
|
type azureDriverFactory struct{}
|
|
|
|
func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
|
return FromParameters(parameters)
|
|
}
|
|
|
|
// FromParameters constructs a new Driver with a given parameters map.
|
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
|
accountName, ok := parameters[paramAccountName]
|
|
if !ok || fmt.Sprint(accountName) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
|
|
}
|
|
|
|
accountKey, ok := parameters[paramAccountKey]
|
|
if !ok || fmt.Sprint(accountKey) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
|
|
}
|
|
|
|
container, ok := parameters[paramContainer]
|
|
if !ok || fmt.Sprint(container) == "" {
|
|
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
|
|
}
|
|
|
|
realm, ok := parameters[paramRealm]
|
|
if !ok || fmt.Sprint(realm) == "" {
|
|
realm = azure.DefaultBaseURL
|
|
}
|
|
|
|
return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container), fmt.Sprint(realm))
|
|
}
|
|
|
|
// New constructs a new Driver with the given Azure Storage Account credentials
|
|
func New(accountName, accountKey, container, realm string) (*Driver, error) {
|
|
api, err := azure.NewClient(accountName, accountKey, realm, azure.DefaultAPIVersion, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blobClient := api.GetBlobService()
|
|
|
|
// Create registry container
|
|
if _, err = blobClient.CreateContainerIfNotExists(container, azure.ContainerAccessTypePrivate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
d := &driver{
|
|
client: blobClient,
|
|
container: container}
|
|
return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface.
|
|
func (d *driver) Name() string {
|
|
return driverName
|
|
}
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
|
blob, err := d.client.GetBlob(d.container, path)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
defer blob.Close()
|
|
return ioutil.ReadAll(blob)
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
|
if limit := 64 * 1024 * 1024; len(contents) > limit { // max size for block blobs uploaded via single "Put Blob"
|
|
return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
|
|
}
|
|
|
|
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
|
|
// (https://github.com/docker/distribution/pull/1438). We can't replace
|
|
// these blobs atomically via a single "Put Blob" operation without
|
|
// deleting them first. Once we detect they are BlockBlob type, we can
|
|
// overwrite them with an atomically "Put Blob" operation.
|
|
//
|
|
// While we delete the blob and create a new one, there will be a small
|
|
// window of inconsistency and if the Put Blob fails, we may end up with
|
|
// losing the existing data while migrating it to BlockBlob type. However,
|
|
// expectation is the clients pushing will be retrying when they get an error
|
|
// response.
|
|
props, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil && !is404(err) {
|
|
return fmt.Errorf("failed to get blob properties: %v", err)
|
|
}
|
|
if err == nil && props.BlobType != azure.BlobTypeBlock {
|
|
if err := d.client.DeleteBlob(d.container, path, nil); err != nil {
|
|
return fmt.Errorf("failed to delete legacy blob (%s): %v", props.BlobType, err)
|
|
}
|
|
}
|
|
|
|
r := bytes.NewReader(contents)
|
|
return d.client.CreateBlockBlobFromReader(d.container, path, uint64(len(contents)), r, nil)
|
|
}
|
|
|
|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return nil, err
|
|
} else if !ok {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
info, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
size := int64(info.ContentLength)
|
|
if offset >= size {
|
|
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
|
}
|
|
|
|
bytesRange := fmt.Sprintf("%v-", offset)
|
|
resp, err := d.client.GetBlobRange(d.container, path, bytesRange, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// Writer returns a FileWriter which will store the content written to it
|
|
// at the location designated by "path" after the call to Commit.
|
|
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
|
blobExists, err := d.client.BlobExists(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var size int64
|
|
if blobExists {
|
|
if append {
|
|
blobProperties, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
size = blobProperties.ContentLength
|
|
} else {
|
|
err := d.client.DeleteBlob(d.container, path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
} else {
|
|
if append {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
err := d.client.PutAppendBlob(d.container, path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return d.newWriter(path, size), nil
|
|
}
|
|
|
|
// Stat retrieves the FileInfo for the given path, including the current size
|
|
// in bytes and the creation time.
|
|
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
|
// Check if the path is a blob
|
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
|
return nil, err
|
|
} else if ok {
|
|
blob, err := d.client.GetBlobProperties(d.container, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mtim, err := time.Parse(http.TimeFormat, blob.LastModified)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
|
Path: path,
|
|
Size: int64(blob.ContentLength),
|
|
ModTime: mtim,
|
|
IsDir: false,
|
|
}}, nil
|
|
}
|
|
|
|
// Check if path is a virtual container
|
|
virtContainerPath := path
|
|
if !strings.HasSuffix(virtContainerPath, "/") {
|
|
virtContainerPath += "/"
|
|
}
|
|
blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
|
Prefix: virtContainerPath,
|
|
MaxResults: 1,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(blobs.Blobs) > 0 {
|
|
// path is a virtual container
|
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
|
Path: path,
|
|
IsDir: true,
|
|
}}, nil
|
|
}
|
|
|
|
// path is not a blob or virtual container
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given
|
|
// path.
|
|
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
|
if path == "/" {
|
|
path = ""
|
|
}
|
|
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return blobs, err
|
|
}
|
|
|
|
list := directDescendants(blobs, path)
|
|
if path != "" && len(list) == 0 {
|
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return list, nil
|
|
}
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
|
sourceBlobURL := d.client.GetBlobURL(d.container, sourcePath)
|
|
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
|
|
if err != nil {
|
|
if is404(err) {
|
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
|
}
|
|
return err
|
|
}
|
|
|
|
return d.client.DeleteBlob(d.container, sourcePath, nil)
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (d *driver) Delete(ctx context.Context, path string) error {
|
|
ok, err := d.client.DeleteBlobIfExists(d.container, path, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
return nil // was a blob and deleted, return
|
|
}
|
|
|
|
// Not a blob, see if path is a virtual container with blobs
|
|
blobs, err := d.listBlobs(d.container, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, b := range blobs {
|
|
if err = d.client.DeleteBlob(d.container, b, nil); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(blobs) == 0 {
|
|
return storagedriver.PathNotFoundError{Path: path}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// URLFor returns a publicly accessible URL for the blob stored at given path
|
|
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
|
|
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
|
|
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
|
expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
|
|
expires, ok := options["expiry"]
|
|
if ok {
|
|
t, ok := expires.(time.Time)
|
|
if ok {
|
|
expiresTime = t
|
|
}
|
|
}
|
|
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
|
|
}
|
|
|
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
|
// of from list of blob paths and will return their full paths. Elements in blobs
|
|
// list must be prefixed with a "/" and
|
|
//
|
|
// Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is
|
|
// {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"}
|
|
func directDescendants(blobs []string, prefix string) []string {
|
|
if !strings.HasPrefix(prefix, "/") { // add trailing '/'
|
|
prefix = "/" + prefix
|
|
}
|
|
if !strings.HasSuffix(prefix, "/") { // containerify the path
|
|
prefix += "/"
|
|
}
|
|
|
|
out := make(map[string]bool)
|
|
for _, b := range blobs {
|
|
if strings.HasPrefix(b, prefix) {
|
|
rel := b[len(prefix):]
|
|
c := strings.Count(rel, "/")
|
|
if c == 0 {
|
|
out[b] = true
|
|
} else {
|
|
out[prefix+rel[:strings.Index(rel, "/")]] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
var keys []string
|
|
for k := range out {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}
|
|
|
|
func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
|
|
if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path
|
|
virtPath += "/"
|
|
}
|
|
|
|
out := []string{}
|
|
marker := ""
|
|
for {
|
|
resp, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
|
Marker: marker,
|
|
Prefix: virtPath,
|
|
})
|
|
|
|
if err != nil {
|
|
return out, err
|
|
}
|
|
|
|
for _, b := range resp.Blobs {
|
|
out = append(out, b.Name)
|
|
}
|
|
|
|
if len(resp.Blobs) == 0 || resp.NextMarker == "" {
|
|
break
|
|
}
|
|
marker = resp.NextMarker
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func is404(err error) bool {
|
|
// handle the case when the request was a HEAD and service error could not
|
|
// be parsed, such as "storage: service returned without a response body
|
|
// (404 The specified blob does not exist.)"
|
|
if strings.Contains(fmt.Sprintf("%v", err), "404 The specified blob does not exist") {
|
|
return true
|
|
}
|
|
|
|
// common case
|
|
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
|
|
return ok && statusCodeErr.StatusCode == http.StatusNotFound
|
|
}
|
|
|
|
type writer struct {
|
|
driver *driver
|
|
path string
|
|
size int64
|
|
bw *bufio.Writer
|
|
closed bool
|
|
committed bool
|
|
cancelled bool
|
|
}
|
|
|
|
func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter {
|
|
return &writer{
|
|
driver: d,
|
|
path: path,
|
|
size: size,
|
|
bw: bufio.NewWriterSize(&blockWriter{
|
|
client: d.client,
|
|
container: d.container,
|
|
path: path,
|
|
}, maxChunkSize),
|
|
}
|
|
}
|
|
|
|
func (w *writer) Write(p []byte) (int, error) {
|
|
if w.closed {
|
|
return 0, fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return 0, fmt.Errorf("already committed")
|
|
} else if w.cancelled {
|
|
return 0, fmt.Errorf("already cancelled")
|
|
}
|
|
|
|
n, err := w.bw.Write(p)
|
|
w.size += int64(n)
|
|
return n, err
|
|
}
|
|
|
|
func (w *writer) Size() int64 {
|
|
return w.size
|
|
}
|
|
|
|
func (w *writer) Close() error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
}
|
|
w.closed = true
|
|
return w.bw.Flush()
|
|
}
|
|
|
|
func (w *writer) Cancel() error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return fmt.Errorf("already committed")
|
|
}
|
|
w.cancelled = true
|
|
return w.driver.client.DeleteBlob(w.driver.container, w.path, nil)
|
|
}
|
|
|
|
func (w *writer) Commit() error {
|
|
if w.closed {
|
|
return fmt.Errorf("already closed")
|
|
} else if w.committed {
|
|
return fmt.Errorf("already committed")
|
|
} else if w.cancelled {
|
|
return fmt.Errorf("already cancelled")
|
|
}
|
|
w.committed = true
|
|
return w.bw.Flush()
|
|
}
|
|
|
|
type blockWriter struct {
|
|
client azure.BlobStorageClient
|
|
container string
|
|
path string
|
|
}
|
|
|
|
func (bw *blockWriter) Write(p []byte) (int, error) {
|
|
n := 0
|
|
for offset := 0; offset < len(p); offset += maxChunkSize {
|
|
chunkSize := maxChunkSize
|
|
if offset+chunkSize > len(p) {
|
|
chunkSize = len(p) - offset
|
|
}
|
|
err := bw.client.AppendBlock(bw.container, bw.path, p[offset:offset+chunkSize], nil)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
|
|
n += chunkSize
|
|
}
|
|
|
|
return n, nil
|
|
}
|