2015-02-11 02:41:09 +01:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"bytes"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"os"
|
|
|
|
"time"
|
|
|
|
|
2015-02-11 03:14:23 +01:00
|
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
2015-02-11 02:41:09 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
|
|
|
|
// understand the latency characteristics of the underlying network to
|
|
|
|
// set this correctly, so we may want to leave it to the driver. For
|
|
|
|
// out of process drivers, we'll have to optimize this buffer size for
|
|
|
|
// local communication.
|
|
|
|
const fileReaderBufferSize = 4 << 20
|
|
|
|
|
|
|
|
// remoteFileReader provides a read seeker interface to files stored in
|
|
|
|
// storagedriver. Used to implement part of layer interface and will be used
|
|
|
|
// to implement read side of LayerUpload.
|
|
|
|
type fileReader struct {
|
|
|
|
driver storagedriver.StorageDriver
|
|
|
|
|
|
|
|
// identifying fields
|
|
|
|
path string
|
2015-04-02 01:41:33 +02:00
|
|
|
size int64 // size is the total size, must be set.
|
|
|
|
modtime time.Time // TODO(stevvooe): This is not needed anymore.
|
2015-02-11 02:41:09 +01:00
|
|
|
|
|
|
|
// mutable fields
|
|
|
|
rc io.ReadCloser // remote read closer
|
|
|
|
brd *bufio.Reader // internal buffered io
|
|
|
|
offset int64 // offset is the current read offset
|
|
|
|
err error // terminal error, if set, reader is closed
|
|
|
|
}
|
|
|
|
|
|
|
|
// newFileReader initializes a file reader for the remote file. The read takes
|
|
|
|
// on the offset and size at the time the reader is created. If the underlying
|
|
|
|
// file changes, one must create a new fileReader.
|
|
|
|
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
|
|
|
|
rd := &fileReader{
|
|
|
|
driver: driver,
|
|
|
|
path: path,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Grab the size of the layer file, ensuring existence.
|
|
|
|
if fi, err := driver.Stat(path); err != nil {
|
|
|
|
switch err := err.(type) {
|
|
|
|
case storagedriver.PathNotFoundError:
|
|
|
|
// NOTE(stevvooe): We really don't care if the file is not
|
|
|
|
// actually present for the reader. If the caller needs to know
|
|
|
|
// whether or not the file exists, they should issue a stat call
|
|
|
|
// on the path. There is still no guarantee, since the file may be
|
|
|
|
// gone by the time the reader is created. The only correct
|
|
|
|
// behavior is to return a reader that immediately returns EOF.
|
|
|
|
default:
|
|
|
|
// Any other error we want propagated up the stack.
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if fi.IsDir() {
|
|
|
|
return nil, fmt.Errorf("cannot read a directory")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Fill in file information
|
|
|
|
rd.size = fi.Size()
|
|
|
|
rd.modtime = fi.ModTime()
|
|
|
|
}
|
|
|
|
|
|
|
|
return rd, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (fr *fileReader) Read(p []byte) (n int, err error) {
|
|
|
|
if fr.err != nil {
|
|
|
|
return 0, fr.err
|
|
|
|
}
|
|
|
|
|
|
|
|
rd, err := fr.reader()
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
n, err = rd.Read(p)
|
|
|
|
fr.offset += int64(n)
|
|
|
|
|
|
|
|
// Simulate io.EOR error if we reach filesize.
|
|
|
|
if err == nil && fr.offset >= fr.size {
|
|
|
|
err = io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
if fr.err != nil {
|
|
|
|
return 0, fr.err
|
|
|
|
}
|
|
|
|
|
|
|
|
var err error
|
|
|
|
newOffset := fr.offset
|
|
|
|
|
|
|
|
switch whence {
|
|
|
|
case os.SEEK_CUR:
|
|
|
|
newOffset += int64(offset)
|
|
|
|
case os.SEEK_END:
|
|
|
|
newOffset = fr.size + int64(offset)
|
|
|
|
case os.SEEK_SET:
|
|
|
|
newOffset = int64(offset)
|
|
|
|
}
|
|
|
|
|
|
|
|
if newOffset < 0 {
|
|
|
|
err = fmt.Errorf("cannot seek to negative position")
|
|
|
|
} else {
|
|
|
|
if fr.offset != newOffset {
|
|
|
|
fr.reset()
|
|
|
|
}
|
|
|
|
|
|
|
|
// No problems, set the offset.
|
|
|
|
fr.offset = newOffset
|
|
|
|
}
|
|
|
|
|
|
|
|
return fr.offset, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (fr *fileReader) Close() error {
|
2015-02-12 01:49:49 +01:00
|
|
|
return fr.closeWithErr(fmt.Errorf("fileReader: closed"))
|
2015-02-11 02:41:09 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// reader prepares the current reader at the lrs offset, ensuring its buffered
|
|
|
|
// and ready to go.
|
|
|
|
func (fr *fileReader) reader() (io.Reader, error) {
|
|
|
|
if fr.err != nil {
|
|
|
|
return nil, fr.err
|
|
|
|
}
|
|
|
|
|
|
|
|
if fr.rc != nil {
|
|
|
|
return fr.brd, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we don't have a reader, open one up.
|
|
|
|
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
|
|
|
|
if err != nil {
|
|
|
|
switch err := err.(type) {
|
|
|
|
case storagedriver.PathNotFoundError:
|
|
|
|
// NOTE(stevvooe): If the path is not found, we simply return a
|
|
|
|
// reader that returns io.EOF. However, we do not set fr.rc,
|
|
|
|
// allowing future attempts at getting a reader to possibly
|
|
|
|
// succeed if the file turns up later.
|
|
|
|
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
|
|
|
default:
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fr.rc = rc
|
|
|
|
|
|
|
|
if fr.brd == nil {
|
|
|
|
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
|
|
|
|
// understand the latency characteristics of the underlying network to
|
|
|
|
// set this correctly, so we may want to leave it to the driver. For
|
|
|
|
// out of process drivers, we'll have to optimize this buffer size for
|
|
|
|
// local communication.
|
|
|
|
fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
|
|
|
|
} else {
|
|
|
|
fr.brd.Reset(fr.rc)
|
|
|
|
}
|
|
|
|
|
|
|
|
return fr.brd, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// resetReader resets the reader, forcing the read method to open up a new
|
|
|
|
// connection and rebuild the buffered reader. This should be called when the
|
|
|
|
// offset and the reader will become out of sync, such as during a seek
|
|
|
|
// operation.
|
|
|
|
func (fr *fileReader) reset() {
|
|
|
|
if fr.err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if fr.rc != nil {
|
|
|
|
fr.rc.Close()
|
|
|
|
fr.rc = nil
|
|
|
|
}
|
|
|
|
}
|
2015-02-12 01:49:49 +01:00
|
|
|
|
|
|
|
func (fr *fileReader) closeWithErr(err error) error {
|
|
|
|
if fr.err != nil {
|
|
|
|
return fr.err
|
|
|
|
}
|
|
|
|
|
|
|
|
fr.err = err
|
|
|
|
|
|
|
|
// close and release reader chain
|
|
|
|
if fr.rc != nil {
|
|
|
|
fr.rc.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
fr.rc = nil
|
|
|
|
fr.brd = nil
|
|
|
|
|
|
|
|
return fr.err
|
|
|
|
}
|