50d64ac63a
Adds a sort of contrived test for resumable pulls
152 lines
4.0 KiB
Go
152 lines
4.0 KiB
Go
package client
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/docker/docker-registry"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
)
|
|
|
|
// simultaneousLayerPullWindow is the size of the parallel layer pull window.
|
|
// A layer may not be pulled until the layer preceeding it by the length of the
|
|
// pull window has been successfully pulled.
|
|
const simultaneousLayerPullWindow = 4
|
|
|
|
// Pull implements a client pull workflow for the image defined by the given
|
|
// name and tag pair, using the given ObjectStore for local manifest and layer
|
|
// storage
|
|
func Pull(c Client, objectStore ObjectStore, name, tag string) error {
|
|
manifest, err := c.GetImageManifest(name, tag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.WithField("manifest", manifest).Info("Pulled manifest")
|
|
|
|
if len(manifest.FSLayers) != len(manifest.History) {
|
|
return fmt.Errorf("Length of history not equal to number of layers")
|
|
}
|
|
if len(manifest.FSLayers) == 0 {
|
|
return fmt.Errorf("Image has no layers")
|
|
}
|
|
|
|
errChans := make([]chan error, len(manifest.FSLayers))
|
|
for i := range manifest.FSLayers {
|
|
errChans[i] = make(chan error)
|
|
}
|
|
|
|
// To avoid leak of goroutines we must notify
|
|
// pullLayer goroutines about a cancelation,
|
|
// otherwise they will lock forever.
|
|
cancelCh := make(chan struct{})
|
|
|
|
// Iterate over each layer in the manifest, simultaneously pulling no more
|
|
// than simultaneousLayerPullWindow layers at a time. If an error is
|
|
// received from a layer pull, we abort the push.
|
|
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPullWindow; i++ {
|
|
dependentLayer := i - simultaneousLayerPullWindow
|
|
if dependentLayer >= 0 {
|
|
err := <-errChans[dependentLayer]
|
|
if err != nil {
|
|
log.WithField("error", err).Warn("Pull aborted")
|
|
close(cancelCh)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if i < len(manifest.FSLayers) {
|
|
go func(i int) {
|
|
select {
|
|
case errChans[i] <- pullLayer(c, objectStore, name, manifest.FSLayers[i]):
|
|
case <-cancelCh: // no chance to recv until cancelCh's closed
|
|
}
|
|
}(i)
|
|
}
|
|
}
|
|
|
|
err = objectStore.WriteManifest(name, tag, manifest)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"manifest": manifest,
|
|
}).Warn("Unable to write image manifest")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func pullLayer(c Client, objectStore ObjectStore, name string, fsLayer registry.FSLayer) error {
|
|
log.WithField("layer", fsLayer).Info("Pulling layer")
|
|
|
|
layer, err := objectStore.Layer(fsLayer.BlobSum)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to write local layer")
|
|
return err
|
|
}
|
|
|
|
layerWriter, err := layer.Writer()
|
|
if err == ErrLayerAlreadyExists {
|
|
log.WithField("layer", fsLayer).Info("Layer already exists")
|
|
return nil
|
|
}
|
|
if err == ErrLayerLocked {
|
|
log.WithField("layer", fsLayer).Info("Layer download in progress, waiting")
|
|
layer.Wait()
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to write local layer")
|
|
return err
|
|
}
|
|
defer layerWriter.Close()
|
|
|
|
if layerWriter.CurrentSize() > 0 {
|
|
log.WithFields(log.Fields{
|
|
"layer": fsLayer,
|
|
"currentSize": layerWriter.CurrentSize(),
|
|
"size": layerWriter.Size(),
|
|
}).Info("Layer partially downloaded, resuming")
|
|
}
|
|
|
|
layerReader, length, err := c.GetBlob(name, fsLayer.BlobSum, layerWriter.CurrentSize())
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to download layer")
|
|
return err
|
|
}
|
|
defer layerReader.Close()
|
|
|
|
layerWriter.SetSize(layerWriter.CurrentSize() + length)
|
|
|
|
_, err = io.Copy(layerWriter, layerReader)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"layer": fsLayer,
|
|
}).Warn("Unable to download layer")
|
|
return err
|
|
}
|
|
if layerWriter.CurrentSize() != layerWriter.Size() {
|
|
log.WithFields(log.Fields{
|
|
"size": layerWriter.Size(),
|
|
"currentSize": layerWriter.CurrentSize(),
|
|
"layer": fsLayer,
|
|
}).Warn("Layer invalid size")
|
|
return fmt.Errorf(
|
|
"Wrote incorrect number of bytes for layer %v. Expected %d, Wrote %d",
|
|
fsLayer, layerWriter.Size(), layerWriter.CurrentSize(),
|
|
)
|
|
}
|
|
return nil
|
|
}
|