Merge pull request #430 from stevvooe/address-s3-timeout-error
Attempt to address intermittent s3 RequestTimeout error
This commit is contained in:
commit
9b6d49fdb0
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
"github.com/AdRoll/goamz/aws"
|
"github.com/AdRoll/goamz/aws"
|
||||||
"github.com/AdRoll/goamz/s3"
|
"github.com/AdRoll/goamz/s3"
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
"github.com/docker/distribution/registry/storage/driver/base"
|
"github.com/docker/distribution/registry/storage/driver/base"
|
||||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||||
@ -398,18 +399,64 @@ func (d *driver) WriteStream(path string, offset int64, reader io.Reader) (total
|
|||||||
go func(bytesRead int, from int64, buf []byte) {
|
go func(bytesRead int, from int64, buf []byte) {
|
||||||
defer d.putbuf(buf) // this buffer gets dropped after this call
|
defer d.putbuf(buf) // this buffer gets dropped after this call
|
||||||
|
|
||||||
// parts and partNumber are safe, because this function is the only one modifying them and we
|
// DRAGONS(stevvooe): There are few things one might want to know
|
||||||
// force it to be executed serially.
|
// about this section. First, the putErrChan is expecting an error
|
||||||
if bytesRead > 0 {
|
// and a nil or just a nil to come through the channel. This is
|
||||||
part, putErr := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
|
// covered by the silly defer below. The other aspect is the s3
|
||||||
if putErr != nil {
|
// retry backoff to deal with RequestTimeout errors. Even though
|
||||||
putErrChan <- putErr
|
// the underlying s3 library should handle it, it doesn't seem to
|
||||||
|
// be part of the shouldRetry function (see AdRoll/goamz/s3).
|
||||||
|
defer func() {
|
||||||
|
putErrChan <- nil // for some reason, we do this no matter what.
|
||||||
|
}()
|
||||||
|
|
||||||
|
if bytesRead <= 0 {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var part s3.Part
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for retries := 0; retries < 5; retries++ {
|
||||||
|
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
|
||||||
|
if err == nil {
|
||||||
|
break // success!
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE(stevvooe): This retry code tries to only retry under
|
||||||
|
// conditions where the s3 package does not. We may add s3
|
||||||
|
// error codes to the below if we see others bubble up in the
|
||||||
|
// application. Right now, the most troubling is
|
||||||
|
// RequestTimeout, which seems to only triggered when a tcp
|
||||||
|
// connection to s3 slows to a crawl. If the RequestTimeout
|
||||||
|
// ends up getting added to the s3 library and we don't see
|
||||||
|
// other errors, this retry loop can be removed.
|
||||||
|
switch err := err.(type) {
|
||||||
|
case *s3.Error:
|
||||||
|
switch err.Code {
|
||||||
|
case "RequestTimeout":
|
||||||
|
// allow retries on only this error.
|
||||||
|
default:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
backoff := 100 * time.Millisecond * time.Duration(retries+1)
|
||||||
|
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("error putting part, aborting: %v", err)
|
||||||
|
putErrChan <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
// parts and partNumber are safe, because this function is the
|
||||||
|
// only one modifying them and we force it to be executed
|
||||||
|
// serially.
|
||||||
parts = append(parts, part)
|
parts = append(parts, part)
|
||||||
partNumber++
|
partNumber++
|
||||||
}
|
|
||||||
putErrChan <- nil
|
|
||||||
}(bytesRead, from, buf)
|
}(bytesRead, from, buf)
|
||||||
|
|
||||||
buf = d.getbuf() // use a new buffer for the next call
|
buf = d.getbuf() // use a new buffer for the next call
|
||||||
|
Loading…
Reference in New Issue
Block a user