65b0d73cb7
This change is slightly more complex than previous package maves in that the package name changed. To address this, we simply always reference the package driver as storagedriver to avoid compatbility issues with existing code. While unfortunate, this can be cleaned up over time. Signed-off-by: Stephen J Day <stephen.day@docker.com>
209 lines
6.8 KiB
Go
209 lines
6.8 KiB
Go
package azure
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
|
|
azure "github.com/MSOpenTech/azure-sdk-for-go/clients/storage"
|
|
)
|
|
|
|
// blockStorage is the interface required from a block storage service
|
|
// client implementation
|
|
type blockStorage interface {
|
|
CreateBlockBlob(container, blob string) error
|
|
GetBlob(container, blob string) (io.ReadCloser, error)
|
|
GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error)
|
|
PutBlock(container, blob, blockID string, chunk []byte) error
|
|
GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error)
|
|
PutBlockList(container, blob string, blocks []azure.Block) error
|
|
}
|
|
|
|
// randomBlobWriter enables random access semantics on Azure block blobs
|
|
// by enabling writing arbitrary length of chunks to arbitrary write offsets
|
|
// within the blob. Normally, Azure Blob Storage does not support random
|
|
// access semantics on block blobs; however, this writer can download, split and
|
|
// reupload the overlapping blocks and discards those being overwritten entirely.
|
|
type randomBlobWriter struct {
|
|
bs blockStorage
|
|
blockSize int
|
|
}
|
|
|
|
func newRandomBlobWriter(bs blockStorage, blockSize int) randomBlobWriter {
|
|
return randomBlobWriter{bs: bs, blockSize: blockSize}
|
|
}
|
|
|
|
// WriteBlobAt writes the given chunk to the specified position of an existing blob.
|
|
// The offset must be equals to size of the blob or smaller than it.
|
|
func (r *randomBlobWriter) WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) {
|
|
rand := newBlockIDGenerator()
|
|
|
|
blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
rand.Feed(blocks) // load existing block IDs
|
|
|
|
// Check for write offset for existing blob
|
|
size := getBlobSize(blocks)
|
|
if offset < 0 || offset > size {
|
|
return 0, fmt.Errorf("wrong offset for Write: %v", offset)
|
|
}
|
|
|
|
// Upload the new chunk as blocks
|
|
blockList, nn, err := r.writeChunkToBlocks(container, blob, chunk, rand)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// For non-append operations, existing blocks may need to be splitted
|
|
if offset != size {
|
|
// Split the block on the left end (if any)
|
|
leftBlocks, err := r.blocksLeftSide(container, blob, offset, rand)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
blockList = append(leftBlocks, blockList...)
|
|
|
|
// Split the block on the right end (if any)
|
|
rightBlocks, err := r.blocksRightSide(container, blob, offset, nn, rand)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
blockList = append(blockList, rightBlocks...)
|
|
} else {
|
|
// Use existing block list
|
|
var existingBlocks []azure.Block
|
|
for _, v := range blocks.CommittedBlocks {
|
|
existingBlocks = append(existingBlocks, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
|
|
}
|
|
blockList = append(existingBlocks, blockList...)
|
|
}
|
|
// Put block list
|
|
return nn, r.bs.PutBlockList(container, blob, blockList)
|
|
}
|
|
|
|
func (r *randomBlobWriter) GetSize(container, blob string) (int64, error) {
|
|
blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return getBlobSize(blocks), nil
|
|
}
|
|
|
|
// writeChunkToBlocks writes given chunk to one or multiple blocks within specified
|
|
// blob and returns their block representations. Those blocks are not committed, yet
|
|
func (r *randomBlobWriter) writeChunkToBlocks(container, blob string, chunk io.Reader, rand *blockIDGenerator) ([]azure.Block, int64, error) {
|
|
var newBlocks []azure.Block
|
|
var nn int64
|
|
|
|
// Read chunks of at most size N except the last chunk to
|
|
// maximize block size and minimize block count.
|
|
buf := make([]byte, r.blockSize)
|
|
for {
|
|
n, err := io.ReadFull(chunk, buf)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
nn += int64(n)
|
|
data := buf[:n]
|
|
blockID := rand.Generate()
|
|
if err := r.bs.PutBlock(container, blob, blockID, data); err != nil {
|
|
return newBlocks, nn, err
|
|
}
|
|
newBlocks = append(newBlocks, azure.Block{Id: blockID, Status: azure.BlockStatusUncommitted})
|
|
}
|
|
return newBlocks, nn, nil
|
|
}
|
|
|
|
// blocksLeftSide returns the blocks that are going to be at the left side of
|
|
// the writeOffset: [0, writeOffset) by identifying blocks that will remain
|
|
// the same and splitting blocks and reuploading them as needed.
|
|
func (r *randomBlobWriter) blocksLeftSide(container, blob string, writeOffset int64, rand *blockIDGenerator) ([]azure.Block, error) {
|
|
var left []azure.Block
|
|
bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll)
|
|
if err != nil {
|
|
return left, err
|
|
}
|
|
|
|
o := writeOffset
|
|
elapsed := int64(0)
|
|
for _, v := range bx.CommittedBlocks {
|
|
blkSize := int64(v.Size)
|
|
if o >= blkSize { // use existing block
|
|
left = append(left, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
|
|
o -= blkSize
|
|
elapsed += blkSize
|
|
} else if o > 0 { // current block needs to be splitted
|
|
start := elapsed
|
|
size := o
|
|
part, err := r.bs.GetSectionReader(container, blob, start, size)
|
|
if err != nil {
|
|
return left, err
|
|
}
|
|
newBlockID := rand.Generate()
|
|
|
|
data, err := ioutil.ReadAll(part)
|
|
if err != nil {
|
|
return left, err
|
|
}
|
|
if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil {
|
|
return left, err
|
|
}
|
|
left = append(left, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted})
|
|
break
|
|
}
|
|
}
|
|
return left, nil
|
|
}
|
|
|
|
// blocksRightSide returns the blocks that are going to be at the right side of
|
|
// the written chunk: [writeOffset+size, +inf) by identifying blocks that will remain
|
|
// the same and splitting blocks and reuploading them as needed.
|
|
func (r *randomBlobWriter) blocksRightSide(container, blob string, writeOffset int64, chunkSize int64, rand *blockIDGenerator) ([]azure.Block, error) {
|
|
var right []azure.Block
|
|
|
|
bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
re := writeOffset + chunkSize - 1 // right end of written chunk
|
|
var elapsed int64
|
|
for _, v := range bx.CommittedBlocks {
|
|
var (
|
|
bs = elapsed // left end of current block
|
|
be = elapsed + int64(v.Size) - 1 // right end of current block
|
|
)
|
|
|
|
if bs > re { // take the block as is
|
|
right = append(right, azure.Block{Id: v.Name, Status: azure.BlockStatusCommitted})
|
|
} else if be > re { // current block needs to be splitted
|
|
part, err := r.bs.GetSectionReader(container, blob, re+1, be-(re+1)+1)
|
|
if err != nil {
|
|
return right, err
|
|
}
|
|
newBlockID := rand.Generate()
|
|
|
|
data, err := ioutil.ReadAll(part)
|
|
if err != nil {
|
|
return right, err
|
|
}
|
|
if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil {
|
|
return right, err
|
|
}
|
|
right = append(right, azure.Block{Id: newBlockID, Status: azure.BlockStatusUncommitted})
|
|
}
|
|
elapsed += int64(v.Size)
|
|
}
|
|
return right, nil
|
|
}
|
|
|
|
func getBlobSize(blocks azure.BlockListResponse) int64 {
|
|
var n int64
|
|
for _, v := range blocks.CommittedBlocks {
|
|
n += int64(v.Size)
|
|
}
|
|
return n
|
|
}
|