94935f39bc
with a new `proxy` section in the configuration file. Create a new registry type which delegates storage to a proxyBlobStore and proxyManifestStore. These stores will pull through data if not present locally. proxyBlobStore takes care not to write duplicate data to disk. Add a scheduler to cleanup expired content. The scheduler runs as a background goroutine. When a blob or manifest is pulled through from the remote registry, an entry is added to the scheduler with a TTL. When the TTL expires the scheduler calls a pre-specified function to remove the fetched resource. Add token authentication to the registry middleware. Get a token at startup and preload the credential store with the username and password supplied in the config file. Allow resumable digest functionality to be disabled at runtime and disable it when the registry is a pull through cache. Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
177 lines
3.9 KiB
Go
177 lines
3.9 KiB
Go
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/context"
|
|
)
|
|
|
|
type httpBlobUpload struct {
|
|
statter distribution.BlobStatter
|
|
client *http.Client
|
|
|
|
uuid string
|
|
startedAt time.Time
|
|
|
|
location string // always the last value of the location header.
|
|
offset int64
|
|
closed bool
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
|
|
panic("Not implemented")
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return distribution.ErrBlobUploadUnknown
|
|
}
|
|
return handleErrorResponse(resp)
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
|
|
req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer req.Body.Close()
|
|
|
|
resp, err := hbu.client.Do(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if !SuccessStatus(resp.StatusCode) {
|
|
return 0, hbu.handleErrorResponse(resp)
|
|
}
|
|
|
|
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
|
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
rng := resp.Header.Get("Range")
|
|
var start, end int64
|
|
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
|
|
return 0, err
|
|
} else if n != 2 || end < start {
|
|
return 0, fmt.Errorf("bad range format: %s", rng)
|
|
}
|
|
|
|
return (end - start + 1), nil
|
|
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
|
|
req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
|
|
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
|
|
req.Header.Set("Content-Type", "application/octet-stream")
|
|
|
|
resp, err := hbu.client.Do(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if !SuccessStatus(resp.StatusCode) {
|
|
return 0, hbu.handleErrorResponse(resp)
|
|
}
|
|
|
|
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
|
|
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
rng := resp.Header.Get("Range")
|
|
var start, end int
|
|
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
|
|
return 0, err
|
|
} else if n != 2 || end < start {
|
|
return 0, fmt.Errorf("bad range format: %s", rng)
|
|
}
|
|
|
|
return (end - start + 1), nil
|
|
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Seek(offset int64, whence int) (int64, error) {
|
|
newOffset := hbu.offset
|
|
|
|
switch whence {
|
|
case os.SEEK_CUR:
|
|
newOffset += int64(offset)
|
|
case os.SEEK_END:
|
|
newOffset += int64(offset)
|
|
case os.SEEK_SET:
|
|
newOffset = int64(offset)
|
|
}
|
|
|
|
hbu.offset = newOffset
|
|
|
|
return hbu.offset, nil
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) ID() string {
|
|
return hbu.uuid
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) StartedAt() time.Time {
|
|
return hbu.startedAt
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
|
// TODO(dmcgowan): Check if already finished, if so just fetch
|
|
req, err := http.NewRequest("PUT", hbu.location, nil)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
values := req.URL.Query()
|
|
values.Set("digest", desc.Digest.String())
|
|
req.URL.RawQuery = values.Encode()
|
|
|
|
resp, err := hbu.client.Do(req)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if !SuccessStatus(resp.StatusCode) {
|
|
return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
|
|
}
|
|
|
|
return hbu.statter.Stat(ctx, desc.Digest)
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
|
|
req, err := http.NewRequest("DELETE", hbu.location, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := hbu.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode == http.StatusNotFound || SuccessStatus(resp.StatusCode) {
|
|
return nil
|
|
}
|
|
return hbu.handleErrorResponse(resp)
|
|
}
|
|
|
|
func (hbu *httpBlobUpload) Close() error {
|
|
hbu.closed = true
|
|
return nil
|
|
}
|