Merge pull request #339 from stevvooe/signature-fetch-prevent-false-sharing
registry/storage: prevent false sharing in signature fetch
This commit is contained in:
commit
64a1aa89bd
@ -36,8 +36,13 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
signatures := make([][]byte, len(signaturePaths)) // make space for everything
|
type result struct {
|
||||||
errCh := make(chan error, 1) // buffered chan so one proceeds
|
index int
|
||||||
|
signature []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
ch := make(chan result)
|
||||||
|
|
||||||
for i, sigPath := range signaturePaths {
|
for i, sigPath := range signaturePaths {
|
||||||
// Append the link portion
|
// Append the link portion
|
||||||
sigPath = path.Join(sigPath, "link")
|
sigPath = path.Join(sigPath, "link")
|
||||||
@ -47,33 +52,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
context.GetLogger(s.ctx).
|
context.GetLogger(s.ctx).
|
||||||
Debugf("fetching signature from %q", sigPath)
|
Debugf("fetching signature from %q", sigPath)
|
||||||
p, err := s.blobStore.linked(sigPath)
|
|
||||||
if err != nil {
|
r := result{index: idx}
|
||||||
|
if p, err := s.blobStore.linked(sigPath); err != nil {
|
||||||
context.GetLogger(s.ctx).
|
context.GetLogger(s.ctx).
|
||||||
Errorf("error fetching signature from %q: %v", sigPath, err)
|
Errorf("error fetching signature from %q: %v", sigPath, err)
|
||||||
|
r.err = err
|
||||||
// try to send an error, if it hasn't already been sent.
|
} else {
|
||||||
select {
|
r.signature = p
|
||||||
case errCh <- err:
|
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
ch <- r
|
||||||
}
|
|
||||||
signatures[idx] = p
|
|
||||||
}(i, sigPath)
|
}(i, sigPath)
|
||||||
}
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// aggregrate the results
|
||||||
|
signatures := make([][]byte, len(signaturePaths))
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case result := <-ch:
|
||||||
// just return the first error, similar to single threaded code.
|
signatures[result.index] = result.signature
|
||||||
return nil, err
|
if result.err != nil && err == nil {
|
||||||
default:
|
// only set the first one.
|
||||||
// pass
|
err = result.err
|
||||||
|
}
|
||||||
|
case <-done:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return signatures, nil
|
return signatures, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
|
func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user