Merge pull request #330 from stevvooe/parallelize-signature-fetch

registry/storage: parallelize signature fetch in signature store
This commit is contained in:
Olivier Gambier 2015-04-06 22:30:35 -07:00
commit 7721fe5a65

View File

@ -2,8 +2,10 @@ package storage
import ( import (
"path" "path"
"sync"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
) )
@ -33,18 +35,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
return nil, err return nil, err
} }
var signatures [][]byte var wg sync.WaitGroup
for _, sigPath := range signaturePaths { signatures := make([][]byte, len(signaturePaths)) // make space for everything
errCh := make(chan error, 1) // buffered chan so one proceeds
for i, sigPath := range signaturePaths {
// Append the link portion // Append the link portion
sigPath = path.Join(sigPath, "link") sigPath = path.Join(sigPath, "link")
// TODO(stevvooe): These fetches should be parallelized for performance. wg.Add(1)
p, err := s.blobStore.linked(sigPath) go func(idx int, sigPath string) {
if err != nil { defer wg.Done()
return nil, err context.GetLogger(s.ctx).
} Debugf("fetching signature from %q", sigPath)
p, err := s.blobStore.linked(sigPath)
if err != nil {
context.GetLogger(s.ctx).
Errorf("error fetching signature from %q: %v", sigPath, err)
signatures = append(signatures, p) // try to send an error, if it hasn't already been sent.
select {
case errCh <- err:
default:
}
return
}
signatures[idx] = p
}(i, sigPath)
}
wg.Wait()
select {
case err := <-errCh:
// just return the first error, similar to single threaded code.
return nil, err
default:
// pass
} }
return signatures, nil return signatures, nil