Adds sliding-window parallelization to Push/Pull operations

A layer can only be pushed/pulled if the layer preceding it by the
length of the push/pull window has been successfully pushed.

An error returned from pushing or pulling any layer will cause the full
operation to be aborted.
This commit is contained in:
Brian Bland 2014-11-17 17:33:03 -08:00
parent a2d232aaec
commit 28b7b82e2d
2 changed files with 178 additions and 103 deletions

View File

@ -4,9 +4,16 @@ import (
"fmt" "fmt"
"io" "io"
"github.com/docker/docker-registry"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
// A layer may not be pulled until the layer preceeding it by the length of the
// pull window has been successfully pulled.
const simultaneousLayerPullWindow = 4
// Pull implements a client pull workflow for the image defined by the given // Pull implements a client pull workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer // name and tag pair, using the given ObjectStore for local manifest and layer
// storage // storage
@ -24,59 +31,28 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error {
return fmt.Errorf("Image has no layers") return fmt.Errorf("Image has no layers")
} }
for _, fsLayer := range manifest.FSLayers { errChans := make([]chan error, len(manifest.FSLayers))
layer, err := objectStore.Layer(fsLayer.BlobSum) for i := range manifest.FSLayers {
if err != nil { errChans[i] = make(chan error)
log.WithFields(log.Fields{ }
"error": err,
"layer": fsLayer, // Iterate over each layer in the manifest, simultaneously pulling no more
}).Warn("Unable to write local layer") // than simultaneousLayerPullWindow layers at a time. If an error is
return err // received from a layer pull, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
dependentLayer := i - simultaneousLayerPullWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Pull aborted")
return err
}
} }
writer, err := layer.Writer() if i < len(manifest.FSLayers) {
if err == ErrLayerAlreadyExists { go func(i int) {
log.WithField("layer", fsLayer).Info("Layer already exists") errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i])
continue }(i)
}
if err == ErrLayerLocked {
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
layer.Wait()
continue
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
defer writer.Close()
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
defer layerReader.Close()
copied, err := io.Copy(writer, layerReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
if copied != int64(length) {
log.WithFields(log.Fields{
"expected": length,
"written": copied,
"layer": fsLayer,
}).Warn("Wrote incorrect number of bytes for layer")
} }
} }
@ -91,3 +67,66 @@ func Pull(c Client, objectStore ObjectStore, name, tag string) error {
return nil return nil
} }
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pulling layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
writer, err := layer.Writer()
if err == ErrLayerAlreadyExists {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
if err == ErrLayerLocked {
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
layer.Wait()
return nil
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to write local layer")
return err
}
defer writer.Close()
layerReader, length, err := c.GetImageLayer(name, fsLayer.BlobSum, 0)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
defer layerReader.Close()
copied, err := io.Copy(writer, layerReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to download layer")
return err
}
if copied != int64(length) {
log.WithFields(log.Fields{
"expected": length,
"written": copied,
"layer": fsLayer,
}).Warn("Wrote incorrect number of bytes for layer")
return fmt.Errorf(
"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
fsLayer, length, copied,
)
}
return nil
}

View File

@ -11,6 +11,13 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
// simultaneousLayerPushWindow is the size of the parallel layer push window.
// A layer may not be pushed until the layer preceeding it by the length of the
// push window has been successfully pushed.
const simultaneousLayerPushWindow = 4
type pushFunction func(fsLayer registry.FSLayer) error
// Push implements a client push workflow for the image defined by the given // Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer // name and tag pair, using the given ObjectStore for local manifest and layer
// storage // storage
@ -25,60 +32,28 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error {
return err return err
} }
for _, fsLayer := range manifest.FSLayers { errChans := make([]chan error, len(manifest.FSLayers))
layer, err := objectStore.Layer(fsLayer.BlobSum) for i := range manifest.FSLayers {
if err != nil { errChans[i] = make(chan error)
log.WithFields(log.Fields{ }
"error": err,
"layer": fsLayer, // Iterate over each layer in the manifest, simultaneously pushing no more
}).Warn("Unable to read local layer") // than simultaneousLayerPushWindow layers at a time. If an error is
return err // received from a layer push, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
dependentLayer := i - simultaneousLayerPushWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Push aborted")
return err
}
} }
layerReader, err := layer.Reader() if i < len(manifest.FSLayers) {
if err != nil { go func(i int) {
log.WithFields(log.Fields{ errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i])
"error": err, }(i)
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
log.WithField("layer", fsLayer).Info("Layer already exists")
continue
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
layerBuffer := new(bytes.Buffer)
checksum := sha1.New()
teeReader := io.TeeReader(layerReader, checksum)
_, err = io.Copy(layerBuffer, teeReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
&registry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
} }
} }
@ -93,3 +68,64 @@ func Push(c Client, objectStore ObjectStore, name, tag string) error {
return nil return nil
} }
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pushing layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
layerReader, err := layer.Reader()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
location, err := c.InitiateLayerUpload(name, fsLayer.BlobSum)
if _, ok := err.(*registry.LayerAlreadyExistsError); ok {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
layerBuffer := new(bytes.Buffer)
checksum := sha1.New()
teeReader := io.TeeReader(layerReader, checksum)
_, err = io.Copy(layerBuffer, teeReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
err = c.UploadLayer(location, ioutil.NopCloser(layerBuffer), layerBuffer.Len(),
&registry.Checksum{HashAlgorithm: "sha1", Sum: string(checksum.Sum(nil))},
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
return nil
}