Update to track refactor updates

Added use of cache blob statter

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
Derek McGowan 2015-05-15 16:25:00 -07:00
parent fdf7c8ff15
commit 98836d6267
5 changed files with 201 additions and 173 deletions

View File

@ -1,159 +0,0 @@
package client
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
)
type httpBlob struct {
*repository
desc distribution.Descriptor
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64
err error
}
func (hb *httpBlob) Read(p []byte) (n int, err error) {
if hb.err != nil {
return 0, hb.err
}
rd, err := hb.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
hb.offset += int64(n)
// Simulate io.EOF error if we reach filesize.
if err == nil && hb.offset >= hb.desc.Length {
err = io.EOF
}
return n, err
}
func (hb *httpBlob) Seek(offset int64, whence int) (int64, error) {
if hb.err != nil {
return 0, hb.err
}
var err error
newOffset := hb.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = hb.desc.Length + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
if hb.offset != newOffset {
hb.reset()
}
// No problems, set the offset.
hb.offset = newOffset
}
return hb.offset, err
}
func (hb *httpBlob) Close() error {
if hb.err != nil {
return hb.err
}
// close and release reader chain
if hb.rc != nil {
hb.rc.Close()
}
hb.rc = nil
hb.brd = nil
hb.err = fmt.Errorf("httpBlob: closed")
return nil
}
func (hb *httpBlob) reset() {
if hb.err != nil {
return
}
if hb.rc != nil {
hb.rc.Close()
hb.rc = nil
}
}
func (hb *httpBlob) reader() (io.Reader, error) {
if hb.err != nil {
return nil, hb.err
}
if hb.rc != nil {
return hb.brd, nil
}
// If the offset is great than or equal to size, return a empty, noop reader.
if hb.offset >= hb.desc.Length {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
blobURL, err := hb.ub.BuildBlobURL(hb.name, hb.desc.Digest)
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", blobURL, nil)
if err != nil {
return nil, err
}
if hb.offset > 0 {
// TODO(stevvooe): Get this working correctly.
// If we are at different offset, issue a range request from there.
req.Header.Add("Range", fmt.Sprintf("1-"))
context.GetLogger(hb.context).Infof("Range: %s", req.Header.Get("Range"))
}
resp, err := hb.client.Do(req)
if err != nil {
return nil, err
}
switch {
case resp.StatusCode == 200:
hb.rc = resp.Body
default:
defer resp.Body.Close()
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
}
if hb.brd == nil {
hb.brd = bufio.NewReader(hb.rc)
} else {
hb.brd.Reset(hb.rc)
}
return hb.brd, nil
}

View File

@ -151,7 +151,7 @@ func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descrip
return hbu.repo.Blobs(ctx).Stat(ctx, desc.Digest) return hbu.repo.Blobs(ctx).Stat(ctx, desc.Digest)
} }
func (hbu *httpBlobUpload) Rollback(ctx context.Context) error { func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
panic("not implemented") panic("not implemented")
} }

View File

@ -0,0 +1,164 @@
package client
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"github.com/docker/distribution"
)
func NewHTTPReadSeeker(client *http.Client, url string, size int64) distribution.ReadSeekCloser {
return &httpReadSeeker{
client: client,
url: url,
size: size,
}
}
type httpReadSeeker struct {
client *http.Client
url string
size int64
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64
err error
}
func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
if hrs.err != nil {
return 0, hrs.err
}
rd, err := hrs.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
hrs.offset += int64(n)
// Simulate io.EOF error if we reach filesize.
if err == nil && hrs.offset >= hrs.size {
err = io.EOF
}
return n, err
}
func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
if hrs.err != nil {
return 0, hrs.err
}
var err error
newOffset := hrs.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = hrs.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = errors.New("cannot seek to negative position")
} else {
if hrs.offset != newOffset {
hrs.reset()
}
// No problems, set the offset.
hrs.offset = newOffset
}
return hrs.offset, err
}
func (hrs *httpReadSeeker) Close() error {
if hrs.err != nil {
return hrs.err
}
// close and release reader chain
if hrs.rc != nil {
hrs.rc.Close()
}
hrs.rc = nil
hrs.brd = nil
hrs.err = errors.New("httpLayer: closed")
return nil
}
func (hrs *httpReadSeeker) reset() {
if hrs.err != nil {
return
}
if hrs.rc != nil {
hrs.rc.Close()
hrs.rc = nil
}
}
func (hrs *httpReadSeeker) reader() (io.Reader, error) {
if hrs.err != nil {
return nil, hrs.err
}
if hrs.rc != nil {
return hrs.brd, nil
}
// If the offset is great than or equal to size, return a empty, noop reader.
if hrs.offset >= hrs.size {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
req, err := http.NewRequest("GET", hrs.url, nil)
if err != nil {
return nil, err
}
if hrs.offset > 0 {
// TODO(stevvooe): Get this working correctly.
// If we are at different offset, issue a range request from there.
req.Header.Add("Range", "1-")
// TODO: get context in here
// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
}
resp, err := hrs.client.Do(req)
if err != nil {
return nil, err
}
switch {
case resp.StatusCode == 200:
hrs.rc = resp.Body
default:
defer resp.Body.Close()
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
}
if hrs.brd == nil {
hrs.brd = bufio.NewReader(hrs.rc)
} else {
hrs.brd.Reset(hrs.rc)
}
return hrs.brd, nil
}

View File

@ -18,6 +18,7 @@ import (
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage/cache"
) )
// NewRepository creates a new Repository for the given repository name and endpoint // NewRepository creates a new Repository for the given repository name and endpoint
@ -56,9 +57,13 @@ func (r *repository) Name() string {
return r.name return r.name
} }
func (r *repository) Blobs(ctx context.Context) distribution.BlobService { func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
statter := &blobStatter{
repository: r,
}
return &blobs{ return &blobs{
repository: r, repository: r,
statter: cache.NewCachedBlobStatter(cache.NewInMemoryBlobDescriptorCacheProvider(), statter),
} }
} }
@ -232,6 +237,8 @@ func (ms *manifests) Delete(dgst digest.Digest) error {
type blobs struct { type blobs struct {
*repository *repository
statter distribution.BlobStatter
} }
func sanitizeLocation(location, source string) (string, error) { func sanitizeLocation(location, source string) (string, error) {
@ -255,12 +262,17 @@ func sanitizeLocation(location, source string) (string, error) {
return location, nil return location, nil
} }
func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return ls.statter.Stat(ctx, dgst)
}
func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
desc, err := ls.Stat(ctx, dgst) desc, err := ls.Stat(ctx, dgst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
reader, err := ls.Open(ctx, desc) reader, err := ls.Open(ctx, desc.Digest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -269,19 +281,26 @@ func (ls *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
return ioutil.ReadAll(reader) return ioutil.ReadAll(reader)
} }
func (ls *blobs) Open(ctx context.Context, desc distribution.Descriptor) (distribution.ReadSeekCloser, error) { func (ls *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return &httpBlob{ stat, err := ls.statter.Stat(ctx, dgst)
repository: ls.repository, if err != nil {
desc: desc, return nil, err
}, nil
} }
func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, desc distribution.Descriptor) error { blobURL, err := ls.ub.BuildBlobURL(ls.Name(), stat.Digest)
if err != nil {
return nil, err
}
return NewHTTPReadSeeker(ls.repository.client, blobURL, stat.Length), nil
}
func (ls *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
return nil return nil
} }
func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
writer, err := ls.Writer(ctx) writer, err := ls.Create(ctx)
if err != nil { if err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
@ -303,7 +322,7 @@ func (ls *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
return writer.Commit(ctx, desc) return writer.Commit(ctx, desc)
} }
func (ls *blobs) Writer(ctx context.Context) (distribution.BlobWriter, error) { func (ls *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
u, err := ls.ub.BuildBlobUploadURL(ls.name) u, err := ls.ub.BuildBlobUploadURL(ls.name)
resp, err := ls.client.Post(u, "", nil) resp, err := ls.client.Post(u, "", nil)
@ -337,7 +356,11 @@ func (ls *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented") panic("not implemented")
} }
func (ls *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { type blobStatter struct {
*repository
}
func (ls *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := ls.ub.BuildBlobURL(ls.name, dgst) u, err := ls.ub.BuildBlobURL(ls.name, dgst)
if err != nil { if err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err

View File

@ -237,7 +237,7 @@ func TestBlobUploadChunked(t *testing.T) {
} }
l := r.Blobs(ctx) l := r.Blobs(ctx)
upload, err := l.Writer(ctx) upload, err := l.Create(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -348,7 +348,7 @@ func TestBlobUploadMonolithic(t *testing.T) {
} }
l := r.Blobs(ctx) l := r.Blobs(ctx)
upload, err := l.Writer(ctx) upload, err := l.Create(ctx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }