Merge pull request #1276 from denverdino/oss-test
Support large layer for OSS driver
This commit is contained in:
commit
e5e0e2cc2a
@ -39,6 +39,7 @@ const driverName = "oss"
|
|||||||
const minChunkSize = 5 << 20
|
const minChunkSize = 5 << 20
|
||||||
|
|
||||||
const defaultChunkSize = 2 * minChunkSize
|
const defaultChunkSize = 2 * minChunkSize
|
||||||
|
const defaultTimeout = 2 * time.Minute // 2 minute timeout per chunk
|
||||||
|
|
||||||
// listMax is the largest amount of objects you can request from OSS in a list call
|
// listMax is the largest amount of objects you can request from OSS in a list call
|
||||||
const listMax = 1000
|
const listMax = 1000
|
||||||
@ -195,13 +196,14 @@ func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
|||||||
return New(params)
|
return New(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
|
// New constructs a new Driver with the given Aliyun credentials, region, encryption flag, and
|
||||||
// bucketName
|
// bucketName
|
||||||
func New(params DriverParameters) (*Driver, error) {
|
func New(params DriverParameters) (*Driver, error) {
|
||||||
|
|
||||||
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
|
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
|
||||||
client.SetEndpoint(params.Endpoint)
|
client.SetEndpoint(params.Endpoint)
|
||||||
bucket := client.Bucket(params.Bucket)
|
bucket := client.Bucket(params.Bucket)
|
||||||
|
client.SetDebug(false)
|
||||||
|
|
||||||
// Validate that the given credentials have at least read permissions in the
|
// Validate that the given credentials have at least read permissions in the
|
||||||
// given bucket scope.
|
// given bucket scope.
|
||||||
@ -403,35 +405,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||||||
var err error
|
var err error
|
||||||
var part oss.Part
|
var part oss.Part
|
||||||
|
|
||||||
loop:
|
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]), defaultTimeout)
|
||||||
for retries := 0; retries < 5; retries++ {
|
|
||||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
|
|
||||||
if err == nil {
|
|
||||||
break // success!
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE(stevvooe): This retry code tries to only retry under
|
|
||||||
// conditions where the OSS package does not. We may add oss
|
|
||||||
// error codes to the below if we see others bubble up in the
|
|
||||||
// application. Right now, the most troubling is
|
|
||||||
// RequestTimeout, which seems to only triggered when a tcp
|
|
||||||
// connection to OSS slows to a crawl. If the RequestTimeout
|
|
||||||
// ends up getting added to the OSS library and we don't see
|
|
||||||
// other errors, this retry loop can be removed.
|
|
||||||
switch err := err.(type) {
|
|
||||||
case *oss.Error:
|
|
||||||
switch err.Code {
|
|
||||||
case "RequestTimeout":
|
|
||||||
// allow retries on only this error.
|
|
||||||
default:
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
backoff := 100 * time.Millisecond * time.Duration(retries+1)
|
|
||||||
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
|
|
||||||
time.Sleep(backoff)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("error putting part, aborting: %v", err)
|
logrus.Errorf("error putting part, aborting: %v", err)
|
||||||
@ -456,7 +430,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
resp, err := d.Bucket.Head(d.ossPath(path), nil)
|
resp, err := d.Bucket.Head(d.ossPath(path), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ossErr, ok := err.(*oss.Error); !ok || ossErr.Code != "NoSuchKey" {
|
if ossErr, ok := err.(*oss.Error); !ok || ossErr.StatusCode != http.StatusNotFound {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -511,7 +485,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||||||
fromZeroFillLarge := func(from, to int64) error {
|
fromZeroFillLarge := func(from, to int64) error {
|
||||||
bytesRead64 := int64(0)
|
bytesRead64 := int64(0)
|
||||||
for to-(from+bytesRead64) >= d.ChunkSize {
|
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||||
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
|
part, err := multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(d.zeros), defaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -553,7 +527,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
|||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
|
|
||||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
|
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf), defaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return totalRead, err
|
return totalRead, err
|
||||||
}
|
}
|
||||||
@ -706,15 +680,14 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
|
|||||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||||
// object.
|
// object.
|
||||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||||
logrus.Infof("Move from %s to %s", d.Bucket.Path("/"+d.ossPath(sourcePath)), d.ossPath(destPath))
|
logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath))
|
||||||
/* This is terrible, but aws doesn't have an actual move. */
|
|
||||||
_, err := d.Bucket.PutCopy(d.ossPath(destPath), getPermissions(),
|
err := d.Bucket.CopyLargeFile(d.ossPath(sourcePath), d.ossPath(destPath),
|
||||||
oss.CopyOptions{
|
d.getContentType(),
|
||||||
//Options: d.getOptions(),
|
getPermissions(),
|
||||||
//ContentType: d.getContentType()
|
oss.Options{})
|
||||||
},
|
|
||||||
d.Bucket.Path(d.ossPath(sourcePath)))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err)
|
||||||
return parseError(sourcePath, err)
|
return parseError(sourcePath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -756,13 +729,12 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
|||||||
method, ok := options["method"]
|
method, ok := options["method"]
|
||||||
if ok {
|
if ok {
|
||||||
methodString, ok = method.(string)
|
methodString, ok = method.(string)
|
||||||
if !ok || (methodString != "GET" && methodString != "HEAD") {
|
if !ok || (methodString != "GET") {
|
||||||
return "", storagedriver.ErrUnsupportedMethod{}
|
return "", storagedriver.ErrUnsupportedMethod{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
expiresTime := time.Now().Add(20 * time.Minute)
|
expiresTime := time.Now().Add(20 * time.Minute)
|
||||||
logrus.Infof("expiresTime: %d", expiresTime)
|
|
||||||
|
|
||||||
expires, ok := options["expiry"]
|
expires, ok := options["expiry"]
|
||||||
if ok {
|
if ok {
|
||||||
@ -771,7 +743,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
|
|||||||
expiresTime = et
|
expiresTime = et
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logrus.Infof("expiresTime: %d", expiresTime)
|
logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
|
||||||
testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
|
testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
|
||||||
logrus.Infof("testURL: %s", testURL)
|
logrus.Infof("testURL: %s", testURL)
|
||||||
return testURL, nil
|
return testURL, nil
|
||||||
@ -781,13 +753,8 @@ func (d *driver) ossPath(path string) string {
|
|||||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3BucketKey returns the OSS bucket key for the given storage driver path.
|
|
||||||
func (d *Driver) S3BucketKey(path string) string {
|
|
||||||
return d.StorageDriver.(*driver).ossPath(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseError(path string, err error) error {
|
func parseError(path string, err error) error {
|
||||||
if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "NoSuchKey" {
|
if ossErr, ok := err.(*oss.Error); ok && ossErr.StatusCode == http.StatusNotFound && (ossErr.Code == "NoSuchKey" || ossErr.Code == "") {
|
||||||
return storagedriver.PathNotFoundError{Path: path}
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user