Refactor layerReader into fileReader
This change separates out the remote file reader functionality from layer reprsentation data. More importantly, issues with seeking have been fixed and thoroughly tested.
This commit is contained in:
parent
74279723c0
commit
3f479b62b4
163
storage/filereader.go
Normal file
163
storage/filereader.go
Normal file
@ -0,0 +1,163 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/docker/docker-registry/storagedriver"
|
||||
)
|
||||
|
||||
// remoteFileReader provides a read seeker interface to files stored in
|
||||
// storagedriver. Used to implement part of layer interface and will be used
|
||||
// to implement read side of LayerUpload.
|
||||
type fileReader struct {
|
||||
driver storagedriver.StorageDriver
|
||||
|
||||
// identifying fields
|
||||
path string
|
||||
size int64 // size is the total layer size, must be set.
|
||||
|
||||
// mutable fields
|
||||
rc io.ReadCloser // remote read closer
|
||||
brd *bufio.Reader // internal buffered io
|
||||
offset int64 // offset is the current read offset
|
||||
err error // terminal error, if set, reader is closed
|
||||
}
|
||||
|
||||
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
|
||||
// Grab the size of the layer file, ensuring existence.
|
||||
size, err := driver.CurrentSize(path)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &fileReader{
|
||||
driver: driver,
|
||||
path: path,
|
||||
size: int64(size),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fr *fileReader) Read(p []byte) (n int, err error) {
|
||||
if fr.err != nil {
|
||||
return 0, fr.err
|
||||
}
|
||||
|
||||
rd, err := fr.reader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err = rd.Read(p)
|
||||
fr.offset += int64(n)
|
||||
|
||||
// Simulate io.EOR error if we reach filesize.
|
||||
if err == nil && fr.offset >= fr.size {
|
||||
err = io.EOF
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
|
||||
if fr.err != nil {
|
||||
return 0, fr.err
|
||||
}
|
||||
|
||||
var err error
|
||||
newOffset := fr.offset
|
||||
|
||||
switch whence {
|
||||
case os.SEEK_CUR:
|
||||
newOffset += int64(offset)
|
||||
case os.SEEK_END:
|
||||
newOffset = fr.size + int64(offset)
|
||||
case os.SEEK_SET:
|
||||
newOffset = int64(offset)
|
||||
}
|
||||
|
||||
if newOffset < 0 {
|
||||
err = fmt.Errorf("cannot seek to negative position")
|
||||
} else if newOffset > fr.size {
|
||||
err = fmt.Errorf("cannot seek passed end of file")
|
||||
} else {
|
||||
if fr.offset != newOffset {
|
||||
fr.reset()
|
||||
}
|
||||
|
||||
// No problems, set the offset.
|
||||
fr.offset = newOffset
|
||||
}
|
||||
|
||||
return fr.offset, err
|
||||
}
|
||||
|
||||
// Close the layer. Should be called when the resource is no longer needed.
|
||||
func (fr *fileReader) Close() error {
|
||||
if fr.err != nil {
|
||||
return fr.err
|
||||
}
|
||||
|
||||
fr.err = ErrLayerClosed
|
||||
|
||||
// close and release reader chain
|
||||
if fr.rc != nil {
|
||||
fr.rc.Close()
|
||||
}
|
||||
|
||||
fr.rc = nil
|
||||
fr.brd = nil
|
||||
|
||||
return fr.err
|
||||
}
|
||||
|
||||
// reader prepares the current reader at the lrs offset, ensuring its buffered
|
||||
// and ready to go.
|
||||
func (fr *fileReader) reader() (io.Reader, error) {
|
||||
if fr.err != nil {
|
||||
return nil, fr.err
|
||||
}
|
||||
|
||||
if fr.rc != nil {
|
||||
return fr.brd, nil
|
||||
}
|
||||
|
||||
// If we don't have a reader, open one up.
|
||||
rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fr.rc = rc
|
||||
|
||||
if fr.brd == nil {
|
||||
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
|
||||
// understand the latency characteristics of the underlying network to
|
||||
// set this correctly, so we may want to leave it to the driver. For
|
||||
// out of process drivers, we'll have to optimize this buffer size for
|
||||
// local communication.
|
||||
fr.brd = bufio.NewReader(fr.rc)
|
||||
} else {
|
||||
fr.brd.Reset(fr.rc)
|
||||
}
|
||||
|
||||
return fr.brd, nil
|
||||
}
|
||||
|
||||
// resetReader resets the reader, forcing the read method to open up a new
|
||||
// connection and rebuild the buffered reader. This should be called when the
|
||||
// offset and the reader will become out of sync, such as during a seek
|
||||
// operation.
|
||||
func (fr *fileReader) reset() {
|
||||
if fr.err != nil {
|
||||
return
|
||||
}
|
||||
if fr.rc != nil {
|
||||
fr.rc.Close()
|
||||
fr.rc = nil
|
||||
}
|
||||
}
|
158
storage/filereader_test.go
Normal file
158
storage/filereader_test.go
Normal file
@ -0,0 +1,158 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"io"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker-registry/digest"
|
||||
|
||||
"github.com/docker/docker-registry/storagedriver/inmemory"
|
||||
)
|
||||
|
||||
func TestSimpleRead(t *testing.T) {
|
||||
content := make([]byte, 1<<20)
|
||||
n, err := rand.Read(content)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error building random data: %v", err)
|
||||
}
|
||||
|
||||
if n != len(content) {
|
||||
t.Fatalf("random read did't fill buffer")
|
||||
}
|
||||
|
||||
dgst, err := digest.FromReader(bytes.NewReader(content))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error digesting random content: %v", err)
|
||||
}
|
||||
|
||||
driver := inmemory.New()
|
||||
path := "/random"
|
||||
|
||||
if err := driver.PutContent(path, content); err != nil {
|
||||
t.Fatalf("error putting patterned content: %v", err)
|
||||
}
|
||||
|
||||
fr, err := newFileReader(driver, path)
|
||||
if err != nil {
|
||||
t.Fatalf("error allocating file reader: %v", err)
|
||||
}
|
||||
|
||||
verifier := digest.NewDigestVerifier(dgst)
|
||||
io.Copy(verifier, fr)
|
||||
|
||||
if !verifier.Verified() {
|
||||
t.Fatalf("unable to verify read data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileReaderSeek(t *testing.T) {
|
||||
driver := inmemory.New()
|
||||
pattern := "01234567890ab" // prime length block
|
||||
repititions := 1024
|
||||
path := "/patterned"
|
||||
content := bytes.Repeat([]byte(pattern), repititions)
|
||||
|
||||
if err := driver.PutContent(path, content); err != nil {
|
||||
t.Fatalf("error putting patterned content: %v", err)
|
||||
}
|
||||
|
||||
fr, err := newFileReader(driver, path)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating file reader: %v", err)
|
||||
}
|
||||
|
||||
// Seek all over the place, in blocks of pattern size and make sure we get
|
||||
// the right data.
|
||||
for _, repitition := range mrand.Perm(repititions - 1) {
|
||||
targetOffset := int64(len(pattern) * repitition)
|
||||
// Seek to a multiple of pattern size and read pattern size bytes
|
||||
offset, err := fr.Seek(targetOffset, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error seeking: %v", err)
|
||||
}
|
||||
|
||||
if offset != targetOffset {
|
||||
t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset)
|
||||
}
|
||||
|
||||
p := make([]byte, len(pattern))
|
||||
|
||||
n, err := fr.Read(p)
|
||||
if err != nil {
|
||||
t.Fatalf("error reading pattern: %v", err)
|
||||
}
|
||||
|
||||
if n != len(pattern) {
|
||||
t.Fatalf("incorrect read length: %d != %d", n, len(pattern))
|
||||
}
|
||||
|
||||
if string(p) != pattern {
|
||||
t.Fatalf("incorrect read content: %q != %q", p, pattern)
|
||||
}
|
||||
|
||||
// Check offset
|
||||
current, err := fr.Seek(0, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
t.Fatalf("error checking current offset: %v", err)
|
||||
}
|
||||
|
||||
if current != targetOffset+int64(len(pattern)) {
|
||||
t.Fatalf("unexpected offset after read: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
start, err := fr.Seek(0, os.SEEK_SET)
|
||||
if err != nil {
|
||||
t.Fatalf("error seeking to start: %v", err)
|
||||
}
|
||||
|
||||
if start != 0 {
|
||||
t.Fatalf("expected to seek to start: %v != 0", start)
|
||||
}
|
||||
|
||||
end, err := fr.Seek(0, os.SEEK_END)
|
||||
if err != nil {
|
||||
t.Fatalf("error checking current offset: %v", err)
|
||||
}
|
||||
|
||||
if end != int64(len(content)) {
|
||||
t.Fatalf("expected to seek to end: %v != %v", end, len(content))
|
||||
}
|
||||
|
||||
// 4. Seek past end and before start, ensure error.
|
||||
|
||||
// seek before start
|
||||
before, err := fr.Seek(-1, os.SEEK_SET)
|
||||
if err == nil {
|
||||
t.Fatalf("error expected, returned offset=%v", before)
|
||||
}
|
||||
|
||||
after, err := fr.Seek(int64(len(content)+1), os.SEEK_END)
|
||||
if err == nil {
|
||||
t.Fatalf("error expected, returned offset=%v", after)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLayerReadErrors covers the various error return type for different
|
||||
// conditions that can arise when reading a layer.
|
||||
func TestFileReaderErrors(t *testing.T) {
|
||||
// TODO(stevvooe): We need to cover error return types, driven by the
|
||||
// errors returned via the HTTP API. For now, here is a incomplete list:
|
||||
//
|
||||
// 1. Layer Not Found: returned when layer is not found or access is
|
||||
// denied.
|
||||
// 2. Layer Unavailable: returned when link references are unresolved,
|
||||
// but layer is known to the registry.
|
||||
// 3. Layer Invalid: This may more split into more errors, but should be
|
||||
// returned when name or tarsum does not reference a valid error. We
|
||||
// may also need something to communication layer verification errors
|
||||
// for the inline tarsum check.
|
||||
// 4. Timeout: timeouts to backend. Need to better understand these
|
||||
// failure cases and how the storage driver propagates these errors
|
||||
// up the stack.
|
||||
}
|
@ -87,4 +87,8 @@ var (
|
||||
|
||||
// ErrLayerInvalidLength returned when length check fails.
|
||||
ErrLayerInvalidLength = fmt.Errorf("invalid layer length")
|
||||
|
||||
// ErrLayerClosed returned when an operation is attempted on a closed
|
||||
// Layer or LayerUpload.
|
||||
ErrLayerClosed = fmt.Errorf("layer closed")
|
||||
)
|
||||
|
@ -241,31 +241,6 @@ func TestSimpleLayerRead(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLayerReaderSeek(t *testing.T) {
|
||||
// TODO(stevvooe): Ensure that all relative seeks work as advertised.
|
||||
// Readers must close and re-open on command. This is important to support
|
||||
// resumable and concurrent downloads via HTTP range requests.
|
||||
}
|
||||
|
||||
// TestLayerReadErrors covers the various error return type for different
|
||||
// conditions that can arise when reading a layer.
|
||||
func TestLayerReadErrors(t *testing.T) {
|
||||
// TODO(stevvooe): We need to cover error return types, driven by the
|
||||
// errors returned via the HTTP API. For now, here is a incomplete list:
|
||||
//
|
||||
// 1. Layer Not Found: returned when layer is not found or access is
|
||||
// denied.
|
||||
// 2. Layer Unavailable: returned when link references are unresolved,
|
||||
// but layer is known to the registry.
|
||||
// 3. Layer Invalid: This may more split into more errors, but should be
|
||||
// returned when name or tarsum does not reference a valid error. We
|
||||
// may also need something to communication layer verification errors
|
||||
// for the inline tarsum check.
|
||||
// 4. Timeout: timeouts to backend. Need to better understand these
|
||||
// failure cases and how the storage driver propagates these errors
|
||||
// up the stack.
|
||||
}
|
||||
|
||||
// writeRandomLayer creates a random layer under name and tarSum using driver
|
||||
// and pathMapper. An io.ReadSeeker with the data is returned, along with the
|
||||
// sha256 hex digest.
|
||||
|
@ -1,10 +1,6 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker-registry/digest"
|
||||
@ -13,22 +9,11 @@ import (
|
||||
// layerReadSeeker implements Layer and provides facilities for reading and
|
||||
// seeking.
|
||||
type layerReader struct {
|
||||
layerStore *layerStore
|
||||
rc io.ReadCloser
|
||||
brd *bufio.Reader
|
||||
fileReader
|
||||
|
||||
name string // repo name of this layer
|
||||
digest digest.Digest
|
||||
path string
|
||||
createdAt time.Time
|
||||
|
||||
// offset is the current read offset
|
||||
offset int64
|
||||
|
||||
// size is the total layer size, if available.
|
||||
size int64
|
||||
|
||||
closedErr error // terminal error, if set, reader is closed
|
||||
}
|
||||
|
||||
var _ Layer = &layerReader{}
|
||||
@ -44,131 +29,3 @@ func (lrs *layerReader) Digest() digest.Digest {
|
||||
func (lrs *layerReader) CreatedAt() time.Time {
|
||||
return lrs.createdAt
|
||||
}
|
||||
|
||||
func (lrs *layerReader) Read(p []byte) (n int, err error) {
|
||||
if err := lrs.closed(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
rd, err := lrs.reader()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err = rd.Read(p)
|
||||
lrs.offset += int64(n)
|
||||
|
||||
// Simulate io.EOR error if we reach filesize.
|
||||
if err == nil && lrs.offset >= lrs.size {
|
||||
err = io.EOF
|
||||
}
|
||||
|
||||
// TODO(stevvooe): More error checking is required here. If the reader
|
||||
// times out for some reason, we should reset the reader so we re-open the
|
||||
// connection.
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) {
|
||||
if err := lrs.closed(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var err error
|
||||
newOffset := lrs.offset
|
||||
|
||||
switch whence {
|
||||
case os.SEEK_CUR:
|
||||
newOffset += int64(whence)
|
||||
case os.SEEK_END:
|
||||
newOffset = lrs.size + int64(whence)
|
||||
case os.SEEK_SET:
|
||||
newOffset = int64(whence)
|
||||
}
|
||||
|
||||
if newOffset < 0 {
|
||||
err = fmt.Errorf("cannot seek to negative position")
|
||||
} else if newOffset >= lrs.size {
|
||||
err = fmt.Errorf("cannot seek passed end of layer")
|
||||
} else {
|
||||
if lrs.offset != newOffset {
|
||||
lrs.resetReader()
|
||||
}
|
||||
|
||||
// No problems, set the offset.
|
||||
lrs.offset = newOffset
|
||||
}
|
||||
|
||||
return lrs.offset, err
|
||||
}
|
||||
|
||||
// Close the layer. Should be called when the resource is no longer needed.
|
||||
func (lrs *layerReader) Close() error {
|
||||
if lrs.closedErr != nil {
|
||||
return lrs.closedErr
|
||||
}
|
||||
// TODO(sday): Must export this error.
|
||||
lrs.closedErr = fmt.Errorf("layer closed")
|
||||
|
||||
// close and release reader chain
|
||||
if lrs.rc != nil {
|
||||
lrs.rc.Close()
|
||||
lrs.rc = nil
|
||||
}
|
||||
lrs.brd = nil
|
||||
|
||||
return lrs.closedErr
|
||||
}
|
||||
|
||||
// reader prepares the current reader at the lrs offset, ensuring its buffered
|
||||
// and ready to go.
|
||||
func (lrs *layerReader) reader() (io.Reader, error) {
|
||||
if err := lrs.closed(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lrs.rc != nil {
|
||||
return lrs.brd, nil
|
||||
}
|
||||
|
||||
// If we don't have a reader, open one up.
|
||||
rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lrs.rc = rc
|
||||
|
||||
if lrs.brd == nil {
|
||||
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
|
||||
// understand the latency characteristics of the underlying network to
|
||||
// set this correctly, so we may want to leave it to the driver. For
|
||||
// out of process drivers, we'll have to optimize this buffer size for
|
||||
// local communication.
|
||||
lrs.brd = bufio.NewReader(lrs.rc)
|
||||
} else {
|
||||
lrs.brd.Reset(lrs.rc)
|
||||
}
|
||||
|
||||
return lrs.brd, nil
|
||||
}
|
||||
|
||||
// resetReader resets the reader, forcing the read method to open up a new
|
||||
// connection and rebuild the buffered reader. This should be called when the
|
||||
// offset and the reader will become out of sync, such as during a seek
|
||||
// operation.
|
||||
func (lrs *layerReader) resetReader() {
|
||||
if err := lrs.closed(); err != nil {
|
||||
return
|
||||
}
|
||||
if lrs.rc != nil {
|
||||
lrs.rc.Close()
|
||||
lrs.rc = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (lrs *layerReader) closed() error {
|
||||
return lrs.closedErr
|
||||
}
|
||||
|
@ -57,33 +57,26 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Grab the size of the layer file, ensuring that it exists, among other
|
||||
// things.
|
||||
size, err := ls.driver.CurrentSize(p)
|
||||
|
||||
fr, err := newFileReader(ls.driver, p)
|
||||
if err != nil {
|
||||
// TODO(stevvooe): Handle blob/path does not exist here.
|
||||
// TODO(stevvooe): Get a better understanding of the error cases here
|
||||
// that don't stem from unknown path.
|
||||
return nil, err
|
||||
switch err := err.(type) {
|
||||
case storagedriver.PathNotFoundError, *storagedriver.PathNotFoundError:
|
||||
return nil, ErrLayerUnknown
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Build the layer reader and return to the client.
|
||||
layer := &layerReader{
|
||||
layerStore: ls,
|
||||
path: p,
|
||||
return &layerReader{
|
||||
fileReader: *fr,
|
||||
name: name,
|
||||
digest: digest,
|
||||
|
||||
// TODO(stevvooe): Storage backend does not support modification time
|
||||
// queries yet. Layers "never" change, so just return the zero value.
|
||||
createdAt: time.Time{},
|
||||
|
||||
offset: 0,
|
||||
size: int64(size),
|
||||
}
|
||||
|
||||
return layer, nil
|
||||
// queries yet. Layers "never" change, so just return the zero value
|
||||
// plus a nano-second.
|
||||
createdAt: (time.Time{}).Add(time.Nanosecond),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Upload begins a layer upload, returning a handle. If the layer upload
|
||||
|
@ -429,6 +429,10 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error)
|
||||
return lus, err
|
||||
}
|
||||
|
||||
if err := llufs.SaveState(lus); err != nil {
|
||||
return lus, err
|
||||
}
|
||||
|
||||
return lus, nil
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@ type Services struct {
|
||||
// NewServices creates a new Services object to access docker objects stored
|
||||
// in the underlying driver.
|
||||
func NewServices(driver storagedriver.StorageDriver) *Services {
|
||||
|
||||
layerUploadStore, err := newTemporaryLocalFSLayerUploadStore()
|
||||
|
||||
if err != nil {
|
||||
@ -40,5 +39,5 @@ func NewServices(driver storagedriver.StorageDriver) *Services {
|
||||
// may be context sensitive in the future. The instance should be used similar
|
||||
// to a request local.
|
||||
func (ss *Services) Layers() LayerService {
|
||||
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper}
|
||||
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper, uploadStore: ss.layerUploadStore}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user