Most places in the registry were using string types to refer to repository names. This changes them to use reference.Named, so the type system can enforce validation of the naming rules. Signed-off-by: Aaron Lehmann <>
188 lines
4.8 KiB
188 lines
4.8 KiB
package proxy
import (
// todo(richardscothern): from cache control header or config file
const blobTTL = time.Duration(24 * 7 * time.Hour)
type proxyBlobStore struct {
localStore distribution.BlobStore
remoteStore distribution.BlobService
scheduler *scheduler.TTLExpirationScheduler
var _ distribution.BlobStore = &proxyBlobStore{}
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]struct{})
// mu protects inflight
var mu sync.Mutex
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
desc, err := pbs.remoteStore.Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
if w, ok := writer.(http.ResponseWriter); ok {
setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
_, err = io.CopyN(writer, remoteReader, desc.Size)
if err != nil {
return distribution.Descriptor{}, err
return desc, nil
func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
localDesc, err := pbs.localStore.Stat(ctx, dgst)
if err != nil {
// Stat can report a zero sized file here if it's checked between creation
// and population. Return nil error, and continue
return false, nil
if err == nil {
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
return false, nil
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
defer func() {
delete(inflight, dgst)
var desc distribution.Descriptor
var err error
var bw distribution.BlobWriter
bw, err = pbs.localStore.Create(ctx)
if err != nil {
return err
desc, err = pbs.copyContent(ctx, dgst, bw)
if err != nil {
return err
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
return nil
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
served, err := pbs.serveLocal(ctx, w, r, dgst)
if err != nil {
context.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
return err
if served {
return nil
_, ok := inflight[dgst]
if ok {
_, err := pbs.copyContent(ctx, dgst, w)
return err
inflight[dgst] = struct{}{}
go func(dgst digest.Digest) {
if err := pbs.storeLocal(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
pbs.scheduler.AddBlob(dgst, repositoryTTL)
_, err = pbs.copyContent(ctx, dgst, w)
if err != nil {
return err
return nil
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := pbs.localStore.Stat(ctx, dgst)
if err == nil {
return desc, err
if err != distribution.ErrBlobUnknown {
return distribution.Descriptor{}, err
return pbs.remoteStore.Stat(ctx, dgst)
// Unsupported functions
func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
return nil, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
return nil, distribution.ErrUnsupported
func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported