From d4f01b812c8216b615990ee4ad947465312fac00 Mon Sep 17 00:00:00 2001 From: Misty Stanley-Jones Date: Wed, 28 Sep 2016 14:25:04 -0700 Subject: [PATCH] Initial import of https://github.com/docker/dhe-engine --- docs/.gitignore | 2 + docs/client/client.go | 383 ++++++++++++++++++++ docs/client/doc.go | 8 + docs/container/confd.toml | 10 + docs/container/confs/garant.toml | 21 ++ docs/container/confs/signing_key.toml | 21 ++ docs/container/confs/storage.toml | 21 ++ docs/container/confs/token_roots.toml | 21 ++ docs/container/start.sh | 21 ++ docs/container/templates/garant.tmpl | 1 + docs/container/templates/signing_key.tmpl | 1 + docs/container/templates/storage.tmpl | 1 + docs/container/templates/token_roots.tmpl | 1 + docs/middleware/README.md | 57 +++ docs/middleware/doc.go | 7 + docs/middleware/errors/errors.go | 7 + docs/middleware/manifestlist.go | 42 +++ docs/middleware/manifeststore.go | 130 +++++++ docs/middleware/manifestv1.go | 107 ++++++ docs/middleware/manifestv2.go | 59 +++ docs/middleware/middleware.go | 78 ++++ docs/middleware/migration/README.md | 38 ++ docs/middleware/migration/enumerator.go | 82 +++++ docs/middleware/migration/migration.go | 156 ++++++++ docs/middleware/migration/migration_test.go | 275 ++++++++++++++ docs/middleware/mocks/ManifestStore.go | 36 ++ docs/middleware/mocks/Store.go | 27 ++ docs/middleware/mocks/TagStore.go | 55 +++ docs/middleware/store.go | 74 ++++ docs/middleware/tagstore.go | 72 ++++ docs/registry/registry.go | 186 ++++++++++ 31 files changed, 2000 insertions(+) create mode 100644 docs/.gitignore create mode 100644 docs/client/client.go create mode 100644 docs/client/doc.go create mode 100644 docs/container/confd.toml create mode 100644 docs/container/confs/garant.toml create mode 100644 docs/container/confs/signing_key.toml create mode 100644 docs/container/confs/storage.toml create mode 100644 docs/container/confs/token_roots.toml create mode 100755 docs/container/start.sh create mode 100644 docs/container/templates/garant.tmpl create mode 100644 docs/container/templates/signing_key.tmpl create mode 100644 docs/container/templates/storage.tmpl create mode 100644 docs/container/templates/token_roots.tmpl create mode 100644 docs/middleware/README.md create mode 100644 docs/middleware/doc.go create mode 100644 docs/middleware/errors/errors.go create mode 100644 docs/middleware/manifestlist.go create mode 100644 docs/middleware/manifeststore.go create mode 100644 docs/middleware/manifestv1.go create mode 100644 docs/middleware/manifestv2.go create mode 100644 docs/middleware/middleware.go create mode 100644 docs/middleware/migration/README.md create mode 100644 docs/middleware/migration/enumerator.go create mode 100644 docs/middleware/migration/migration.go create mode 100644 docs/middleware/migration/migration_test.go create mode 100644 docs/middleware/mocks/ManifestStore.go create mode 100644 docs/middleware/mocks/Store.go create mode 100644 docs/middleware/mocks/TagStore.go create mode 100644 docs/middleware/store.go create mode 100644 docs/middleware/tagstore.go create mode 100644 docs/registry/registry.go diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 00000000..4ac790c8 --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,2 @@ +/container/registry +/container/registry-manager diff --git a/docs/client/client.go b/docs/client/client.go new file mode 100644 index 00000000..0fdc4412 --- /dev/null +++ b/docs/client/client.go @@ -0,0 +1,383 @@ +package client + +import ( + "crypto" + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "strings" + "time" + + "github.com/docker/dhe-deploy/garant/authn" + "github.com/docker/dhe-deploy/garant/authz" + "github.com/docker/dhe-deploy/hubconfig" + "github.com/docker/dhe-deploy/manager/schema" + "github.com/docker/dhe-deploy/registry/middleware" + middlewareErrors "github.com/docker/dhe-deploy/registry/middleware/errors" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/factory" + // all storage drivers + _ "github.com/docker/distribution/registry/storage/driver/azure" + _ "github.com/docker/distribution/registry/storage/driver/filesystem" + _ "github.com/docker/distribution/registry/storage/driver/gcs" + _ "github.com/docker/distribution/registry/storage/driver/inmemory" + _ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" + _ "github.com/docker/distribution/registry/storage/driver/oss" + _ "github.com/docker/distribution/registry/storage/driver/s3-aws" + _ "github.com/docker/distribution/registry/storage/driver/swift" + + "github.com/docker/garant/auth" + "github.com/palantir/stacktrace" +) + +// RegistryClient defines all methods for DTR<>Registry API support +type RegistryClient interface { + // DeleteRepository deletes an entire repository + DeleteRepository(named string, r *schema.Repository) error + + // DeleteTag removes a tag from a named repository + DeleteTag(named, tag string) error + + // DeleteManifest removes a manifest from a named repository + DeleteManifest(named, digest string) error + + // CreateJWT creates a jwt representing valid authn and authz for registry actions + // on behalf of a user + CreateJWT(user *authn.User, repo, accessLevel string) (string, error) +} + +// Client is a concrete implementation of RegistryClient +type client struct { + // settings allows us to load DTR and registry settings from the store + settings hubconfig.SettingsReader + // driver is a concrete StorageDriver for registry blobstore ops + driver driver.StorageDriver + // store is a middleware.Store implementation, saving tag info in A DB + store middleware.Store + // repoManager is used when deleting repos + repoManager *schema.RepositoryManager + // ctx represents a context used in initialization + ctx context.Context +} + +// Opts is an exported struct representing options for instantiating a new +// client +type Opts struct { + Settings hubconfig.SettingsReader + Store middleware.Store + RepoManager *schema.RepositoryManager +} + +// Returns a new `client` type with the given configuration. A storage driver +// will also be instantiated from the configuration supplied. +func NewClient(ctx context.Context, opts Opts) (RegistryClient, error) { + config, err := opts.Settings.RegistryConfig() + if err != nil { + return nil, stacktrace.Propagate(err, "error fetching registry config") + } + + // FUCK THIS SHITTY HACK THIS SHOULD NEVER HAVE BEEN ALLOWED TO EXIST + // whoever made this deserves a little seeing to. this is a copypasta + if config.Storage.Type() == "filesystem" { + params := config.Storage["filesystem"] + params["rootdirectory"] = "/storage" + config.Storage["filesystem"] = params + } + + driver, err := factory.Create(config.Storage.Type(), config.Storage.Parameters()) + if err != nil { + return nil, stacktrace.Propagate(err, "error creating distribution storage driver") + } + + return &client{ + ctx: ctx, + settings: opts.Settings, + store: opts.Store, + repoManager: opts.RepoManager, + driver: driver, + }, nil +} + +// DeleteRepository removes an entire repository and all artifacts from DTR. +// To do this we need to remove all repository blobs, all tags from the +// metadata store and the repository from the DTR DB. +// +// In order to keep as consistent as possible with the blobstore the current +// strategy is: +// +// 1. Nuke the entire repo/name directory within blobstore +// 2. Wait for this to happen +// 3. Delete all tags from the database +// +// Note that this does not use the registry client directly; there is no way +// of deleting repositories within the API, plus repositories are created +// within the DTR DB directly. +// +// NOTE: the arguments for this are ridiculous because in order to delete +// a repository we need to: +// 1. Query for the repository namespace to load it's UUID +// 2. Use the namespace UUID to generate the repo's PK (it's part of the +// hash) +// 3. Query for the repository by the generated PK for the repo's UUID +// 4. Use THAT UUID to finally delete the repository. +// TO simplify this we're using arguments from the adminserver's filters. +// +// XXX: (tonyhb) After this has finished schedule a new job for consistency +// checking this repository. TODO: Define how the consistency checker +// guarantees consistency. +// +// XXX: Two-phase commit for deletes would be nice. In this case we'd need to +// delete from the blobstore, then delete from the database. If the database +// delete failed add a job to remove from the database to keep consistency. +// We currently have no notion of failed DB writes to retry later; this needs +// to be added for proper two phase commit. +func (c client) DeleteRepository(named string, r *schema.Repository) (err error) { + // Do this first as it's non-destructive. + repo, err := c.getRepo(named) + if err != nil { + return stacktrace.Propagate(err, "error instantiating distribution.Repository") + } + + // Then look up all tags; this is a prerequisite and should be done before + // destructive actions. + tags, err := c.store.AllTags(c.ctx, repo) + if err != nil { + return stacktrace.Propagate(err, "error fetching tags for repository") + } + + vacuum := storage.NewVacuum(context.Background(), c.driver) + if err = vacuum.RemoveRepository(named); err != nil { + // If this is an ErrPathNotFound error from distribution we can ignore; + // the path is only made when a tag is pushed, and this repository + // may have no tags. + if _, ok := err.(driver.PathNotFoundError); !ok { + return stacktrace.Propagate(err, "error removing repository from blobstore") + } + } + + // If one tag fails we should carry on deleting the remaining tags, returning + // errors at the end of enumeration. This may produce more errors but should + // have closer consistency to the blobstore. + var errors = map[string]error{} + for _, tag := range tags { + if err := c.store.DeleteTag(c.ctx, repo, tag); err != nil { + errors[tag] = err + } + } + if len(errors) > 0 { + return stacktrace.NewError("errors deleting tags from metadata store: %s", errors) + } + + // Delete the repo from rethinkdb. See function notes above for info. + if err := c.repoManager.DeleteRepositoryByPK(r.PK); err != nil { + return stacktrace.Propagate(err, "unable to delete repo from database") + } + + return nil +} + +// DeleteTag attempts to delete a tag from the blobstore and metadata store. +// +// This is done by first deleting from the database using middleware.Store, +// then the blobstore using the storage.Repository +// +// If this is the last tag to reference a manifest the manifest will be left valid +// and in an undeleted state (ie. dangling). The GC should collect and delete +// dangling manifests. +func (c client) DeleteTag(named, tag string) error { + repo, err := c.getRepo(named) + if err != nil { + return stacktrace.Propagate(err, "") + } + + // Delete from the tagstore first; this is our primary source of truth and + // should always be in a consistent state. + if err := c.store.DeleteTag(c.ctx, repo, tag); err != nil && err != middlewareErrors.ErrNotFound { + return stacktrace.Propagate(err, "error deleting tag from metadata store") + } + + // getRepo returns a repository constructed from storage; calling Untag + // on this TagService will remove the tag from the blobstore. + if err := repo.Tags(c.ctx).Untag(c.ctx, tag); err != nil { + // If this is an ErrPathNotFound error from distribution we can ignore; + // the path is only made when a tag is pushed, and this repository + // may have no tags. + if _, ok := err.(driver.PathNotFoundError); !ok { + return stacktrace.Propagate(err, "error deleting tag from blobstore") + } + } + + return nil +} + +// DeleteManifest attempts to delete a manifest from the blobstore and metadata +// store. +// +// This is done by first deleting from the database using middleware.Store, +// then the blobstore using the storage.Repository +// +// This does not delete any tags pointing to this manifest. Instead, when the +// metadata store loads tags it checks to ensure the manifest it refers to is +// valid. +func (c client) DeleteManifest(named, dgst string) error { + repo, err := c.getRepo(named) + if err != nil { + return stacktrace.Propagate(err, "") + } + + mfstSrvc, err := repo.Manifests(c.ctx) + if err != nil { + return stacktrace.Propagate(err, "") + } + + // Delete from the tagstore first; this is our primary source of truth and + // should always be in a consistent state. + err = c.store.DeleteManifest(c.ctx, named+"@"+dgst) + if err != nil && err != middlewareErrors.ErrNotFound { + return stacktrace.Propagate(err, "error deleting manifest from metadata store") + } + + if err = mfstSrvc.Delete(c.ctx, digest.Digest(dgst)); err != nil { + if _, ok := err.(driver.PathNotFoundError); !ok { + return stacktrace.Propagate(err, "error deleting manifest from blobstore") + } + } + + return nil +} + +// getRepo is a utility function which returns a distribution.Repository for a +// given repository name string +func (c client) getRepo(named string) (distribution.Repository, error) { + // Note that this has no options enabled such as disabling v1 signatures or + // middleware. It will ONLY perform operations using the blobstore storage + // driver. + reg, err := storage.NewRegistry(c.ctx, c.driver, storage.EnableDelete) + if err != nil { + return nil, stacktrace.Propagate(err, "error instantiating registry instance for deleting tags") + } + + repoName, err := reference.WithName(named) + if err != nil { + return nil, stacktrace.Propagate(err, "error parsing repository name") + } + + repo, err := reg.Repository(c.ctx, repoName) + if err != nil { + return nil, stacktrace.Propagate(err, "error constructing repository") + } + + return repo, nil +} + +// CreateJWT creates a jwt representing valid authn and authz for registry actions +// on behalf of a user +func (c client) CreateJWT(user *authn.User, repo, accessLevel string) (string, error) { + // We need the DTR config and garant token signing key to generate a valid "iss" and + // "aud" claim and sign the JWT correctly. + uhc, err := c.settings.UserHubConfig() + if err != nil { + return "", stacktrace.Propagate(err, "error getting dtr config") + } + key, err := c.settings.GarantSigningKey() + if err != nil { + return "", stacktrace.Propagate(err, "error getting token signing key") + } + + // service is our domain name which represents the "iss" and "aud" claims + service := uhc.DTRHost + + var actions []string + accessScopeSet := authz.AccessLevelScopeSets[accessLevel] + for action := range accessScopeSet { + actions = append(actions, action) + } + accessEntries := []accessEntry{ + { + Resource: auth.Resource{ + Type: "repository", + Name: repo, + }, + Actions: actions, + }, + } + + // Create a random string for a JTI claim. Garant doesn't yet record JTIs + // to prevent replay attacks in DTR; we should. + // TODO(tonyhb): record JTI claims from garant and prevent replay attacks + byt := make([]byte, 15) + io.ReadFull(rand.Reader, byt) + jti := base64.URLEncoding.EncodeToString(byt) + + now := time.Now() + + joseHeader := map[string]interface{}{ + "typ": "JWT", + "alg": "ES256", + } + + if x5c := key.GetExtendedField("x5c"); x5c != nil { + joseHeader["x5c"] = x5c + } else { + joseHeader["jwk"] = key.PublicKey() + } + + var subject string + if user != nil { + subject = user.Account.Name + } + + claimSet := map[string]interface{}{ + "iss": service, + "sub": subject, + "aud": service, + "exp": now.Add(5 * time.Minute).Unix(), + "nbf": now.Unix(), + "iat": now.Unix(), + "jti": jti, + "access": accessEntries, + } + + var ( + joseHeaderBytes, claimSetBytes []byte + ) + + if joseHeaderBytes, err = json.Marshal(joseHeader); err != nil { + return "", stacktrace.Propagate(err, "error encoding jose header") + } + if claimSetBytes, err = json.Marshal(claimSet); err != nil { + return "", stacktrace.Propagate(err, "error encoding jwt claimset") + } + + encodedJoseHeader := joseBase64Encode(joseHeaderBytes) + encodedClaimSet := joseBase64Encode(claimSetBytes) + encodingToSign := fmt.Sprintf("%s.%s", encodedJoseHeader, encodedClaimSet) + + var signatureBytes []byte + if signatureBytes, _, err = key.Sign(strings.NewReader(encodingToSign), crypto.SHA256); err != nil { + return "", stacktrace.Propagate(err, "error encoding jwt payload") + } + + signature := joseBase64Encode(signatureBytes) + + return fmt.Sprintf("%s.%s", encodingToSign, signature), nil +} + +// joseBase64Encode base64 encodes a byte slice then removes any padding +func joseBase64Encode(data []byte) string { + return strings.TrimRight(base64.URLEncoding.EncodeToString(data), "=") +} + +// accessEntry represents an access entry in a JWT. +type accessEntry struct { + auth.Resource + Actions []string `json:"actions"` +} diff --git a/docs/client/doc.go b/docs/client/doc.go new file mode 100644 index 00000000..c31f61db --- /dev/null +++ b/docs/client/doc.go @@ -0,0 +1,8 @@ +// package client is a helper package for the DTR<>Registry API endpoints. For +// example, deleting a repository within DTR is complex compared to registry as we +// need to delete all tags from blob and metadata store, then delete the repo from +// the DTR DB. +// +// This is compared to plain registry when nuking the entire repository directory +// would suffice. +package client diff --git a/docs/container/confd.toml b/docs/container/confd.toml new file mode 100644 index 00000000..32943734 --- /dev/null +++ b/docs/container/confd.toml @@ -0,0 +1,10 @@ +backend = "etcd" +client_cakeys = "/ca/etcd/cert.pem" +client_cert = "/ca/etcd/cert.pem" +client_key = "/ca/etcd/key.pem" +confdir = "/etc/confd" +log-level = "info" +interval = 600 +noop = false +scheme = "http" +watch = true diff --git a/docs/container/confs/garant.toml b/docs/container/confs/garant.toml new file mode 100644 index 00000000..49753d52 --- /dev/null +++ b/docs/container/confs/garant.toml @@ -0,0 +1,21 @@ +[template] + +# The name of the template that will be used to render the application's configuration file +# Confd will look in `/etc/conf.d/templates` for these files by default +src = "garant.tmpl" + +# The location to place the rendered configuration file +dest = "/config/garant.yml" + +# The etcd keys or directory to watch. This is where the information to fill in +# the template will come from. +keys = [ "/dtr/configs/garant.yml" ] + +# File ownership and mode information +owner = "root" +mode = "0644" + +# These are the commands that will be used to check whether the rendered config is +# valid and to reload the actual service once the new config is in place +# TODO: can registry configs be reloaded without restarting thee container? +reload_cmd = "killall -USR2 registry" diff --git a/docs/container/confs/signing_key.toml b/docs/container/confs/signing_key.toml new file mode 100644 index 00000000..ef51fea0 --- /dev/null +++ b/docs/container/confs/signing_key.toml @@ -0,0 +1,21 @@ +[template] + +# The name of the template that will be used to render the application's configuration file +# Confd will look in `/etc/conf.d/templates` for these files by default +src = "signing_key.tmpl" + +# The location to place the rendered configuration file +dest = "/config/signing_key.json" + +# The etcd keys or directory to watch. This is where the information to fill in +# the template will come from. +keys = [ "/dtr/configs/generatedConfigs/signing_key.json" ] + +# File ownership and mode information +owner = "root" +mode = "0644" + +# These are the commands that will be used to check whether the rendered config is +# valid and to reload the actual service once the new config is in place +# TODO: can registry configs be reloaded without restarting thee container? +reload_cmd = "killall -USR2 registry" diff --git a/docs/container/confs/storage.toml b/docs/container/confs/storage.toml new file mode 100644 index 00000000..d27287eb --- /dev/null +++ b/docs/container/confs/storage.toml @@ -0,0 +1,21 @@ +[template] + +# The name of the template that will be used to render the application's configuration file +# Confd will look in `/etc/conf.d/templates` for these files by default +src = "storage.tmpl" + +# The location to place the rendered configuration file +dest = "/config/storage.yml" + +# The etcd keys or directory to watch. This is where the information to fill in +# the template will come from. +keys = [ "/dtr/configs/storage.yml" ] + +# File ownership and mode information +owner = "root" +mode = "0644" + +# These are the commands that will be used to check whether the rendered config is +# valid and to reload the actual service once the new config is in place +# TODO: can registry configs be reloaded without restarting thee container? +reload_cmd = "killall -USR2 registry" diff --git a/docs/container/confs/token_roots.toml b/docs/container/confs/token_roots.toml new file mode 100644 index 00000000..f0ea702e --- /dev/null +++ b/docs/container/confs/token_roots.toml @@ -0,0 +1,21 @@ +[template] + +# The name of the template that will be used to render the application's configuration file +# Confd will look in `/etc/conf.d/templates` for these files by default +src = "token_roots.tmpl" + +# The location to place the rendered configuration file +dest = "/config/token_roots.pem" + +# The etcd keys or directory to watch. This is where the information to fill in +# the template will come from. +keys = [ "/dtr/configs/generatedConfigs/token_roots.pem" ] + +# File ownership and mode information +owner = "root" +mode = "0644" + +# These are the commands that will be used to check whether the rendered config is +# valid and to reload the actual service once the new config is in place +# TODO: can registry configs be reloaded without restarting thee container? +reload_cmd = "killall -USR2 registry" diff --git a/docs/container/start.sh b/docs/container/start.sh new file mode 100755 index 00000000..c2e2c88e --- /dev/null +++ b/docs/container/start.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +echo "[starter] starting..." + +# Fail hard and fast +set -eo pipefail + +# If this fails, docker will restart the container. Yay, docker. +confd -node https://dtr-etcd-${DTR_REPLICA_ID}.dtr-br:2379 -node https://dtr-etcd-${DTR_REPLICA_ID}.dtr-br:4001 -onetime -config-file /etc/confd/confd.toml + +# Run confd watcher in the background to watch the upstream servers +confd -node https://dtr-etcd-${DTR_REPLICA_ID}.dtr-br:2379 -node https://dtr-etcd-${DTR_REPLICA_ID}.dtr-br:4001 -config-file /etc/confd/confd.toml & +echo "[starter] confd is listening for changes on etcd..." + +# Start registry +echo "[starter] starting registry service..." +while true +do + /bin/registry || true + sleep 1 +done diff --git a/docs/container/templates/garant.tmpl b/docs/container/templates/garant.tmpl new file mode 100644 index 00000000..707daeb5 --- /dev/null +++ b/docs/container/templates/garant.tmpl @@ -0,0 +1 @@ +{{getv "/dtr/configs/garant.yml"}} diff --git a/docs/container/templates/signing_key.tmpl b/docs/container/templates/signing_key.tmpl new file mode 100644 index 00000000..51b3e1b1 --- /dev/null +++ b/docs/container/templates/signing_key.tmpl @@ -0,0 +1 @@ +{{getv "/dtr/configs/generatedConfigs/signing_key.json"}} diff --git a/docs/container/templates/storage.tmpl b/docs/container/templates/storage.tmpl new file mode 100644 index 00000000..c61d9fe9 --- /dev/null +++ b/docs/container/templates/storage.tmpl @@ -0,0 +1 @@ +{{getv "/dtr/configs/storage.yml"}} diff --git a/docs/container/templates/token_roots.tmpl b/docs/container/templates/token_roots.tmpl new file mode 100644 index 00000000..6eb0d81a --- /dev/null +++ b/docs/container/templates/token_roots.tmpl @@ -0,0 +1 @@ +{{getv "/dtr/configs/generatedConfigs/token_roots.pem"}} diff --git a/docs/middleware/README.md b/docs/middleware/README.md new file mode 100644 index 00000000..5ae4c8f0 --- /dev/null +++ b/docs/middleware/README.md @@ -0,0 +1,57 @@ +Metadata Store +============== + +The metadata store middleware saves tag and manifest information to RethinkDB. +This gives us many benefits over distribution's standard method of saving +metadata on the filesystem: + +- Our APIs can be more verbose, showing architecture, OS, author, push time etc. + for each tag and manifest +- Our APIs for listing tags are much faster, as it doens't depend on reads over + a remote distributed filesystem +- GC's mark phase is much quicker; we list layers from the manifest table +- We can delete V2 manifests by tags (CAS dictates that if two tags refer to the + same image they'll use the same manifest. Therefore manifests should only be + deleted if there's one tag pointing to it) + +**NOTE**: The filesystem is still used for all read operations. This guarantees +that pulls work during the migration from 2.x to 2.1 — during this time the +metadata store is empty therefore reading tags/manifests will fail. + +## Spec + +https://docs.google.com/document/d/1hv6bCqIlTb-lyeP5bL1Gy5xK-UgUJuPbD2y-GY21dMQ + + +### Tag deletion + +Requirements for deleting tags: + +- Deleting a tag must delete the tag's manifest *if no other tags refer to the + manifest*. +- Deleting a tag must retain the manifest if other tags refer to the manifest + +Tag deletion is implemented using a tombstone column within rethinkdb (soft +deletion). + +Delete flow: + + 1. Update the tag's deleted column in rethinkDB to `true` + i. if this fails return an error; deletion did not work + 2. Attempt to delete the blob from the blobstore + i. if this fails, attempt to delete from the blobstore during GC + +This means that *the blobstore may be inconsistent with our database*. To +resolve this, all registry operations for reading tags during pulls should +attempt to read from RethinkDB first; if an error is returned *then* we should +attempt to read from the blobstore. + +Affected: + +- Fetching single tags: needs to check deleted column +- Fetching all repo's tags: needs to filter deleted column; only show undeleted +- Deleting tags: if the tag is the last reference to a manifest (last undeleted + tag) we should mark the manifest as deleted +- Creating a tag: we need to upsert on tags. If the tag exists, set `deleted` to + false in an update. Otherwise create a new row. + diff --git a/docs/middleware/doc.go b/docs/middleware/doc.go new file mode 100644 index 00000000..ce81fe01 --- /dev/null +++ b/docs/middleware/doc.go @@ -0,0 +1,7 @@ +// Package middleware provides a Repository middleware for Docker's +// distribution project which allows custom ManifestService and TagService +// implementations to be returned from distribution.Repository. +// +// This is useful for having registry store layer blobs while delegating +// responsibility for metadata to a separate system (ie. a database) +package middleware diff --git a/docs/middleware/errors/errors.go b/docs/middleware/errors/errors.go new file mode 100644 index 00000000..3f2c21eb --- /dev/null +++ b/docs/middleware/errors/errors.go @@ -0,0 +1,7 @@ +package errors + +import ( + "fmt" +) + +var ErrNotFound = fmt.Errorf("key not found") diff --git a/docs/middleware/manifestlist.go b/docs/middleware/manifestlist.go new file mode 100644 index 00000000..9a689959 --- /dev/null +++ b/docs/middleware/manifestlist.go @@ -0,0 +1,42 @@ +package middleware + +import ( + "encoding/json" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/manifestlist" +) + +func (ms *manifestStore) VerifyList(ctx context.Context, mnfst *manifestlist.DeserializedManifestList) error { + var errs distribution.ErrManifestVerification + + for _, manifestDescriptor := range mnfst.References() { + exists, err := ms.Exists(ctx, manifestDescriptor.Digest) + if err != nil && err != distribution.ErrBlobUnknown { + errs = append(errs, err) + } + if err != nil || !exists { + // On error here, we always append unknown blob errors. + errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: manifestDescriptor.Digest}) + } + } + + if len(errs) != 0 { + return errs + } + + return nil +} + +func (ms *manifestStore) UnmarshalList(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) { + context.GetLogger(ms.ctx).Debug("(*manifestListHandler).Unmarshal") + + var m manifestlist.DeserializedManifestList + if err := json.Unmarshal(content, &m); err != nil { + return nil, err + } + + return &m, nil +} diff --git a/docs/middleware/manifeststore.go b/docs/middleware/manifeststore.go new file mode 100644 index 00000000..fe1ad143 --- /dev/null +++ b/docs/middleware/manifeststore.go @@ -0,0 +1,130 @@ +package middleware + +import ( + "fmt" + + middlewareErrors "github.com/docker/dhe-deploy/registry/middleware/errors" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/registry/handlers" + "github.com/docker/libtrust" +) + +// manifestStore provides an alternative backing mechanism for manifests. +// It must implement the ManifestService to store manifests and +// ManifestEnumerator for garbage collection and listing +type manifestStore struct { + // useFilesystemStore is a flag which determines whether to use the default + // filesystem service for all read actions. We need to fall back to the + // filesystem for checking whether manifests exist if the metadata store + // is still syncing. + // + // TODO (tonyhb) Determine whether the metadata store is faster; if it's + // not we can remove this flag and always use distribution's filesystem + // store for read operations + useFilesystemStore bool + + app *handlers.App + ctx context.Context + store Store + signingKey libtrust.PrivateKey + + repo distribution.Repository + blobService distribution.ManifestService +} + +func (m *manifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) { + return m.blobService.Exists(ctx, dgst) +} + +// Get retrieves the manifest specified by the given digest for a repo. +// +// Note that the middleware itself verifies that the manifest is valid; +// the storage backend should only marshal and unmarshal into the correct type. +func (m *manifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { + return m.blobService.Get(ctx, dgst, options...) +} + +// Put creates or updates the given manifest returning the manifest digest +func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (d digest.Digest, err error) { + // First, ensure we write the manifest to the filesystem as per standard + // distribution code. + if d, err = m.blobService.Put(ctx, manifest, options...); err != nil { + context.GetLoggerWithField(ctx, "err", err).Error("error savng manifest to blobstore") + return d, err + } + + // NOTE: we're not allowing skipDependencyVerification here. + // + // skipDependencyVerification is ONLY used when registry is set up as a + // pull-through cache (proxy). In these circumstances this middleware + // should not be used, therefore this verification implementation always + // verifies blobs. + // + // This is the only difference in implementation with storage's + // manifestStore{} + switch manifest.(type) { + case *schema1.SignedManifest: + err = m.VerifyV1(ctx, manifest.(*schema1.SignedManifest)) + case *schema2.DeserializedManifest: + ctx, err = m.VerifyV2(ctx, manifest.(*schema2.DeserializedManifest)) + case *manifestlist.DeserializedManifestList: + err = m.VerifyList(ctx, manifest.(*manifestlist.DeserializedManifestList)) + default: + err = fmt.Errorf("Unknown manifest type: %T", manifest) + } + + if err != nil { + return + } + + // Our storage service needs the digest of the manifest in order to + // store the manifest under the correct key. + _, data, err := manifest.Payload() + if err != nil { + return + } + + // NOTE that for v1 manifests .Payload() returns the entire manifest including + // the randomly generated signature. Digests must always be calculated on the + // canonical manifest without signatures. + if man, ok := manifest.(*schema1.SignedManifest); ok { + data = man.Canonical + } + + dgst := digest.FromBytes(data) + err = m.store.PutManifest(ctx, m.repo.Named().String(), string(dgst), manifest) + return dgst, err +} + +// Delete removes the manifest specified by the given digest. +func (m *manifestStore) Delete(ctx context.Context, dgst digest.Digest) error { + key := m.key(dgst) + + // First delete from the manifest store in rethinkDB. We can silently ignore + // ErrNotFound issues - when deleting a tag from DTR's API the manifest + // will already be removed from the tagstore if no tags reference it. + // Unfortunately, this API call cannot delete manifests from the blobstore + // so this will be called directly. + _, err := m.store.GetManifest(ctx, key) + if err != nil && err != middlewareErrors.ErrNotFound { + context.GetLoggerWithField(ctx, "err", err).Error("error getting manifest from metadata store") + return err + } + if err := m.store.DeleteManifest(ctx, key); err != nil { + context.GetLoggerWithField(ctx, "err", err).Error("error deleting manifest from metadata store") + return err + } + + // Delete this within the blobService + return m.blobService.Delete(ctx, dgst) +} + +func (m *manifestStore) key(dgst digest.Digest) string { + return m.repo.Named().String() + "@" + string(dgst) +} diff --git a/docs/middleware/manifestv1.go b/docs/middleware/manifestv1.go new file mode 100644 index 00000000..e35909c1 --- /dev/null +++ b/docs/middleware/manifestv1.go @@ -0,0 +1,107 @@ +package middleware + +import ( + "encoding/json" + "fmt" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/reference" + "github.com/docker/libtrust" +) + +// VerifyV1 ensures that the v1 signed manifest content is valid from the +// perspective of the registry. It ensures that the signature is valid for the +// enclosed payload. As a policy, the registry only tries to store valid +// content, leaving trust policies of that content up to consumers. +func (ms *manifestStore) VerifyV1(ctx context.Context, mnfst *schema1.SignedManifest) error { + var errs distribution.ErrManifestVerification + + if len(mnfst.Name) > reference.NameTotalLengthMax { + errs = append(errs, + distribution.ErrManifestNameInvalid{ + Name: mnfst.Name, + Reason: fmt.Errorf("manifest name must not be more than %v characters", reference.NameTotalLengthMax), + }) + } + + if !reference.NameRegexp.MatchString(mnfst.Name) { + errs = append(errs, + distribution.ErrManifestNameInvalid{ + Name: mnfst.Name, + Reason: fmt.Errorf("invalid manifest name format"), + }) + } + + if len(mnfst.History) != len(mnfst.FSLayers) { + errs = append(errs, fmt.Errorf("mismatched history and fslayer cardinality %d != %d", + len(mnfst.History), len(mnfst.FSLayers))) + } + + if _, err := schema1.Verify(mnfst); err != nil { + switch err { + case libtrust.ErrMissingSignatureKey, libtrust.ErrInvalidJSONContent, libtrust.ErrMissingSignatureKey: + errs = append(errs, distribution.ErrManifestUnverified{}) + default: + if err.Error() == "invalid signature" { + errs = append(errs, distribution.ErrManifestUnverified{}) + } else { + errs = append(errs, err) + } + } + } + + // No skipDependencyVerification; always verify + for _, fsLayer := range mnfst.References() { + _, err := ms.repo.Blobs(ctx).Stat(ctx, fsLayer.Digest) + if err != nil { + if err != distribution.ErrBlobUnknown { + errs = append(errs, err) + } + + // On error here, we always append unknown blob errors. + errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: fsLayer.Digest}) + } + } + + if len(errs) != 0 { + return errs + } + + return nil +} + +func (ms *manifestStore) UnmarshalV1(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) { + + var ( + err error + sm = &schema1.SignedManifest{} + ) + + if ms.app.Config.Compatibility.Schema1.DisableSignatureStore { + // Attempt to create a new signature + jsig, err := libtrust.NewJSONSignature(content) + if err != nil { + return nil, err + } + if err := jsig.Sign(ms.signingKey); err != nil { + return nil, err + } + + // Extract the pretty JWS + raw, err := jsig.PrettySignature("signatures") + if err != nil { + return nil, err + } + + if err := json.Unmarshal(raw, sm); err != nil { + return nil, err + } + return sm, nil + } + + err = sm.UnmarshalJSON(content) + return sm, err +} diff --git a/docs/middleware/manifestv2.go b/docs/middleware/manifestv2.go new file mode 100644 index 00000000..008394da --- /dev/null +++ b/docs/middleware/manifestv2.go @@ -0,0 +1,59 @@ +package middleware + +import ( + "encoding/json" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema2" +) + +func (m *manifestStore) VerifyV2(ctx context.Context, mnfst *schema2.DeserializedManifest) (context.Context, error) { + var errs distribution.ErrManifestVerification + + // The target refers to the manifest config. We need this in order to store + // metadata such as the OS and architecture of this manifest, so instead of + // calling Stat we'll retrieve this blob and store it in the context for the + // Store to process + target := mnfst.Target() + content, err := m.repo.Blobs(ctx).Get(ctx, target.Digest) + ctx = context.WithValue(ctx, "target", content) + if err != nil { + if err != distribution.ErrBlobUnknown { + errs = append(errs, err) + } + + // On error here, we always append unknown blob errors. + errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: target.Digest}) + } + + for _, fsLayer := range mnfst.References() { + _, err := m.repo.Blobs(ctx).Stat(ctx, fsLayer.Digest) + if err != nil { + if err != distribution.ErrBlobUnknown { + errs = append(errs, err) + } + + // On error here, we always append unknown blob errors. + errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: fsLayer.Digest}) + } + } + + if len(errs) != 0 { + return ctx, errs + } + + return ctx, nil +} + +func (m *manifestStore) UnmarshalV2(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) { + context.GetLogger(m.ctx).Debug("(*schema2ManifestHandler).Unmarshal") + + var man schema2.DeserializedManifest + if err := json.Unmarshal(content, &man); err != nil { + return nil, err + } + + return &man, nil +} diff --git a/docs/middleware/middleware.go b/docs/middleware/middleware.go new file mode 100644 index 00000000..07a5b67a --- /dev/null +++ b/docs/middleware/middleware.go @@ -0,0 +1,78 @@ +package middleware + +import ( + "fmt" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/handlers" + "github.com/docker/libtrust" +) + +// registeredStore is the storage implementation used for saving manifests +// and tags. This is set by calling RegisterStore() before constructing +// the middleware. +var registeredStore Store + +func InitMiddleware(ctx context.Context, repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) { + if registeredStore == nil { + return nil, fmt.Errorf("no store has been registered for metadata middleware") + } + + trustKey, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, fmt.Errorf("unable to generate ephemeral signing key: %s", err) + } + + // Return a new struct which embeds the repository anonymously. This allows + // us to overwrite specific repository functions for loading manifest and + // tag services. + return &WrappedRepository{ + Repository: repository, + + app: ctx.(*handlers.App), + store: registeredStore, + signingKey: trustKey, + }, nil + +} + +// WrappedRepository implements distribution.Repository, providing new calls +// when creating the TagService and MetadataService +type WrappedRepository struct { + distribution.Repository + + app *handlers.App + store Store + signingKey libtrust.PrivateKey +} + +func (repo *WrappedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + // Get the default manifest service which uses blobStore to save manifests. + blobService, err := repo.Repository.Manifests(ctx, options...) + + return &manifestStore{ + app: repo.app, + ctx: ctx, + store: repo.store, + signingKey: repo.signingKey, + + repo: repo, + blobService: blobService, + }, err +} + +func (repo *WrappedRepository) Tags(ctx context.Context) distribution.TagService { + blobMfstService, err := repo.Repository.Manifests(ctx) + if err != nil { + context.GetLoggerWithField(ctx, "err", err).Error("error creating ManifestService within metadata TagService") + } + return &tagStore{ + ctx: ctx, + repo: repo, + store: repo.store, + + blobService: repo.Repository.Tags(ctx), + blobMfstService: blobMfstService, + } +} diff --git a/docs/middleware/migration/README.md b/docs/middleware/migration/README.md new file mode 100644 index 00000000..d8de731d --- /dev/null +++ b/docs/middleware/migration/README.md @@ -0,0 +1,38 @@ +Migration +========= + +Migrate all tag and manifest metadata into the new tag/metadata store using +rethinkdb defined within `manager/`. + +## How? + +Similar to mark and sweep: + +1. Iterate through all repositories +2. For each repository, iterate through each tag +3. For each tag load the manifest and: + 1. store the manifest plus config blob metadata + 2. store the tag data + +Once the migration completes update the `isRepoMetadataMigrated` flag (to be +renamed) to true. + +## Notes + +The tagstore middleware will ensure that any new pushes since migration starts +are properly inserted in the database. This means that we do not need to worry +about stale data from uploads started after the migration. + +## Problems + +**Resumes** + +This needs to be interruptable; if the task fails we should start from where we +left off (or near); we shouldn't start from scratch. + +In order to do this we store the name of the repository we're currently +migrating; we can iterate through all repositories until we reach the current +repository and then restart migration of all tags. + +This is an easy and low-cost solution to resumes vs always saving the name of +the tags we're migrating. diff --git a/docs/middleware/migration/enumerator.go b/docs/middleware/migration/enumerator.go new file mode 100644 index 00000000..474f3922 --- /dev/null +++ b/docs/middleware/migration/enumerator.go @@ -0,0 +1,82 @@ +package migration + +import ( + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/reference" + "github.com/palantir/stacktrace" + + log "github.com/Sirupsen/logrus" +) + +type Enumerator interface { + EnumerateRepo(ctx context.Context, reg distribution.Namespace, repoName string) error +} + +// NewEnumerator returns an enumerator which provides functions to iterate over +// a repository's tags, calling the given tagEnumerator function for each tag. +func NewEnumerator(onGetTag tagEnumerator) Enumerator { + return &enumerator{onGetTag} +} + +// tagEnumerator is a function signature for handling a specific repository's tag +// on each tieration +type tagEnumerator func(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error + +// enumerator handles iterating over a repository's tags, calling `onGetTag` on +// each tag +type enumerator struct { + onGetTag tagEnumerator +} + +// EnumerateRepo iterates over a given repository's tags, calling `EnumerateTag` +// on each tag. The repository is specified as a string via the `repoName` +// argument. +// A context and registry (distribution.Namespace) must be supplied with valid, +// instantiated drivers. +func (e *enumerator) EnumerateRepo(ctx context.Context, reg distribution.Namespace, repoName string) error { + named, err := reference.ParseNamed(repoName) + if err != nil { + log.WithField("error", err).Errorf("failed to parse repo name %s", repoName) + return nil + } + + repo, err := reg.Repository(ctx, named) + if err != nil { + log.WithField("error", err).Errorf("failed to construct repository %s", repoName) + return nil + } + + // enumerate all repository tags + tags, err := repo.Tags(ctx).All(ctx) + if err != nil { + log.WithField("error", err).Errorf("failed to return all tags for repository %s", repoName) + return nil + } + + for _, t := range tags { + if err = e.EnumerateTags(ctx, repo, t); err != nil { + log.WithField("error", err).Errorf("error processing tag during enumeration %s", t) + } + } + + return nil +} + +// EnumerateTags is called with a tag name as a string, loads the tag's +// descriptor and delegates to `enumerator.onGetTag` with the tag name +// and descriptor for further processing. +// +// This allows us to pass custom functions for migration and consistency +// checking whilst leveraging the same enumeration code. +func (e *enumerator) EnumerateTags(ctx context.Context, repo distribution.Repository, tagName string) error { + // TagService.All returns a slice of strings instead of a concrete + // distribution.Descriptor. Here we transform the tag name into a + // descriptor and call the supplied onGetTag function. + desc, err := repo.Tags(ctx).Get(ctx, tagName) + if err != nil { + return stacktrace.NewError("failed retrieving tag descriptor for tag %s: %s", tagName, err) + } + + return e.onGetTag(ctx, repo, tagName, desc) +} diff --git a/docs/middleware/migration/migration.go b/docs/middleware/migration/migration.go new file mode 100644 index 00000000..ddfb0194 --- /dev/null +++ b/docs/middleware/migration/migration.go @@ -0,0 +1,156 @@ +package migration + +import ( + "github.com/docker/dhe-deploy/manager/schema" + "github.com/docker/dhe-deploy/registry/middleware" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/manifest/schema2" + "github.com/palantir/stacktrace" +) + +func NewMigration(reg distribution.Namespace, store middleware.Store) *migration { + m := &migration{ + isFromResume: false, + reg: reg, + store: store, + } + m.enumerator = NewEnumerator(m.AddTagAndManifest) + return m +} + +func NewMigrationWithEnumerator(reg distribution.Namespace, e Enumerator) *migration { + return &migration{ + isFromResume: false, + enumerator: e, + reg: reg, + } +} + +// migration handles the migration process for moving tag and manifest +// information for repositories (stored as files in distribution) into our +// tagstore. +type migration struct { + // reg is a distribution.Namespace instance instantiated with storage + // drivers + reg distribution.Namespace + // isFromResume indicates whether this migration has been started because + // of a previously failed attempt + isFromResume bool + // currentRepo stores the repository we're currently migrating (or have + // just resumed from) + currentRepo string + // enumerator handles iterating through each repository's tags + enumerator Enumerator + // store + store middleware.Store +} + +func (m *migration) Resume(from string) { + m.isFromResume = true + m.currentRepo = from +} + +// Migrate begins migration from either the start of all repositories or +// `currentRepo` if `isFromResume` is true. +// +// If the migration fails the name of the current repository and the error is +// returned. +func (m *migration) Migrate(ctx context.Context) (repo string, err error) { + repositoryEnumerator, ok := m.reg.(distribution.RepositoryEnumerator) + if !ok { + return "", stacktrace.NewError("unable to convert Namespace to RepositoryEnumerator") + } + + hasResumed := false + err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error { + repo = repoName + + if m.isFromResume && !hasResumed { + // if the repository we're iterating through is before `currentRepo`, + // therefore we can skip this as we've already migrated this repo + // in a previous migration attempt + if repoName != m.currentRepo { + return nil + } + // this is the same repo as the last attempt, so we can continue + // the migration. + hasResumed = true + } + + context.GetLoggerWithFields(ctx, map[interface{}]interface{}{ + "repo": repoName, + }).Infof("enumerating repository") + + err := m.enumerator.EnumerateRepo(ctx, m.reg, repoName) + if err != nil { + context.GetLoggerWithFields(ctx, map[interface{}]interface{}{ + "repo": repoName, + "error": err, + }).Errorf("error enumerating repository") + } + return err + }) + + return repo, err +} + +// tag represents a singla tag which is being migrated into the tagstore. +type tag struct { + dbTag *schema.Tag + dbManifest *schema.Manifest + + // store is an implementation of the middleware store interface which + // saves tags and manifests to the DB + store middleware.Store +} + +// resolveTagAndManifest constructs a concrete schema.Tag and schema.Manifest +// from the blobs stored within the registry. +func (m *migration) AddTagAndManifest(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error { + repoName := repo.Named().Name() + + // Load the manifest as referred to by the tag + mfstService, err := repo.Manifests(ctx) + if err != nil { + return stacktrace.NewError("unable to construct manifest service for '%s:%s': %v", repoName, tagName, err) + } + manifest, err := mfstService.Get(ctx, tag.Digest) + if err != nil { + return stacktrace.NewError("unable to retrieve manifest service for '%s:%s': %v", repoName, tagName, err) + } + + // Note that the store expects the context to have a key named "target" + // with the config blob; this is due to how registry works when statting + // and verifying uploads. + // + // In order to re-use code for loading manifest information from a blob + // into the DB we should load the config blob if necessary and store it + // in the context. + + // Tackle manifest metadata such as layers, arch and OS + if v2m, ok := manifest.(*schema2.DeserializedManifest); ok { + // The target refers to the manifest config. We need this in order to store + // metadata such as the OS and architecture of this manifest, so instead of + // calling Stat we'll retrieve this blob and store it in the context for the + // Store to process + target := v2m.Target() + content, err := repo.Blobs(ctx).Get(ctx, target.Digest) + if err != nil { + return stacktrace.NewError("unable to retrieve manifest config for '%s:%s' (digest %s): %v", repoName, tagName, target.Digest, err) + } + ctx = context.WithValue(ctx, "target", content) + } + + // Manifest's PKs are formatted as `namespace/repo@sha256:...` + named := repo.Named().String() + if err = m.store.PutManifest(ctx, named, tag.Digest.String(), manifest); err != nil { + return stacktrace.NewError("unable to save manifest in store for '%s:%s': %v", repoName, tagName, err) + } + if err = m.store.PutTag(ctx, repo, tagName, tag); err != nil { + return stacktrace.NewError("unable to save tag in store for '%s:%s': %v", repoName, tagName, err) + } + + return nil +} diff --git a/docs/middleware/migration/migration_test.go b/docs/middleware/migration/migration_test.go new file mode 100644 index 00000000..972c59c4 --- /dev/null +++ b/docs/middleware/migration/migration_test.go @@ -0,0 +1,275 @@ +package migration + +import ( + "fmt" + "reflect" + "testing" + + "github.com/docker/dhe-deploy/registry/middleware/mocks" + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/inmemory" + + "github.com/stretchr/testify/mock" +) + +const root = "/docker/registry/v2/" + +type env struct { + registry distribution.Namespace + driver driver.StorageDriver + ctx context.Context +} + +func setupRegistry(t *testing.T) *env { + d := inmemory.New() + ctx := context.Background() + registry, err := storage.NewRegistry( + ctx, + d, + storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), + storage.EnableRedirect, + ) + if err != nil { + t.Fatalf("error iunstantiating registry: %v", err) + } + + // Add data to registry + var prefix = root + "repositories/admin/" + data := map[string]interface{}{ + "content": map[string]string{ + // REPOSITORIES + //a + prefix + "a-repo/_layers/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "a-repo/_layers/sha256/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/link": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d", + prefix + "a-repo/_manifests/revisions/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "a-repo/_manifests/tags/a-tag/current/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "a-repo/_manifests/tags/a-tag/index/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + //b + prefix + "b-repo/_layers/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "b-repo/_layers/sha256/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/link": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d", + prefix + "b-repo/_manifests/revisions/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "b-repo/_manifests/tags/b-tag/current/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + prefix + "b-repo/_manifests/tags/b-tag/index/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566", + // MANIFESTS + root + "blobs/sha256/1f/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/data": V2_MANIFEST_1, + root + "blobs/sha256/6b/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/data": V2_MANIFEST_CONFIG_1, + }, + } + for path, blob := range data["content"].(map[string]string) { + d.PutContent(ctx, path, []byte(blob)) + } + + return &env{ + registry, + d, + ctx, + } +} + +func TestMigrateResumes(t *testing.T) { + env := setupRegistry(t) + + tests := []struct { + migration *migration + expectedRepos []string + }{ + { + migration: &migration{ + reg: env.registry, + isFromResume: false, + }, + expectedRepos: []string{"admin/a-repo", "admin/b-repo"}, + }, + { + migration: &migration{ + reg: env.registry, + isFromResume: true, + currentRepo: "admin/b-repo", + }, + expectedRepos: []string{"admin/b-repo"}, + }, + } + + for _, test := range tests { + // Iterate through the repositories, storing each repository name within + // iteratedRepos. We can then compare which repos were passed to onTagFunc + // to check resumes + iteratedRepos := []string{} + onTagFunc := func(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error { + iteratedRepos = append(iteratedRepos, repo.Named().Name()) + return nil + } + test.migration.enumerator = NewEnumerator(onTagFunc) + if _, err := test.migration.Migrate(env.ctx); err != nil { + t.Fatalf("error migrating: %s", err) + } + + if !reflect.DeepEqual(iteratedRepos, test.expectedRepos) { + t.Fatalf("resume failed, expected vs actual repo iteration: %s vs %s", test.expectedRepos, iteratedRepos) + } + } + +} + +// This is a basic test asserting that there are no obvious errors with +// the migration logic. +func TestAddTagAndManifest(t *testing.T) { + env := setupRegistry(t) + store := mocks.NewStore() + migration := NewMigration(env.registry, store) + + store.TagStore.On( + "PutTag", + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfTypeArgument("*storage.repository"), + mock.AnythingOfType("string"), + mock.AnythingOfType("distribution.Descriptor"), + ).Return(nil).Run(func(a mock.Arguments) { + fmt.Printf("%#v", a) + }) + + store.ManifestStore.On( + "PutManifest", + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + mock.AnythingOfType("*schema2.DeserializedManifest"), + ).Return(nil).Run(func(a mock.Arguments) { + fmt.Printf("%#v", a) + }) + + _, err := migration.Migrate(env.ctx) + if err != nil { + t.Fatalf("unexpected error during migration: %s", err) + } +} + +// Assert that failing during a migration returns no error +// and instead only logs the error +func TestAddTagAndManifestReturnsNil(t *testing.T) { + env := setupRegistry(t) + store := mocks.NewStore() + migration := NewMigration(env.registry, store) + + // When we get admin/a-repo we can fail fast. + store.TagStore.On( + "PutTag", + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfTypeArgument("*storage.repository"), + mock.AnythingOfType("string"), + mock.AnythingOfType("distribution.Descriptor"), + ).Return(nil) + + store.ManifestStore.On( + "PutManifest", + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfType("string"), + mock.AnythingOfType("string"), + mock.AnythingOfType("*schema2.DeserializedManifest"), + ).Return(nil) + + _, err := migration.Migrate(env.ctx) + if err != nil { + t.Fatalf("unexpected error during migration: %v", err) + } +} + +const V2_MANIFEST_1 = ` +{ + "schemaVersion": 2, + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "config": { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": 1473, + "digest": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d" + }, + "layers": [ + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 146, + "digest": "sha256:c170e8502f05562c30101cd65993e514cf63d242d6f14af6ca49896168c59ffd" + } + ] +} +` + +const V2_MANIFEST_CONFIG_1 = ` +{ + "architecture": "amd64", + "config": { + "Hostname": "9aec87ce8e45", + "Domainname": "", + "User": "", + "AttachStdin": false, + "AttachStdout": false, + "AttachStderr": false, + "Tty": false, + "OpenStdin": false, + "StdinOnce": false, + "Env": [ + "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + ], + "Cmd": [ + "/true" + ], + "Image": "sha256:bbadf13f1e9e0d1629c07ad1e7eedcc5a6383300b7701c131a6f0beac49866ad", + "Volumes": null, + "WorkingDir": "", + "Entrypoint": null, + "OnBuild": null, + "Labels": { + } + }, + "container": "dab58e1226ef3b699c25b7befc7cec562707a959135d130f667a039e18e63f72", + "container_config": { + "Hostname": "9aec87ce8e45", + "Domainname": "", + "User": "", + "AttachStdin": false, + "AttachStdout": false, + "AttachStderr": false, + "Tty": false, + "OpenStdin": false, + "StdinOnce": false, + "Env": [ + "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + ], + "Cmd": [ + "/bin/sh", + "-c", + "#(nop) CMD [\"/true\"]" + ], + "Image": "sha256:bbadf13f1e9e0d1629c07ad1e7eedcc5a6383300b7701c131a6f0beac49866ad", + "Volumes": null, + "WorkingDir": "", + "Entrypoint": null, + "OnBuild": null, + "Labels": { + } + }, + "created": "2016-05-19T20:38:48.345518736Z", + "docker_version": "1.11.1", + "history": [ + { + "created": "2016-05-19T20:38:48.277232795Z", + "created_by": "/bin/sh -c #(nop) ADD file:513005a00bb6ce26c9eb571d6f16e0c12378ba40f8e3100bcb484db53008e3b2 in /true" + }, + { + "created": "2016-05-19T20:38:48.345518736Z", + "created_by": "/bin/sh -c #(nop) CMD [\"/true\"]", + "empty_layer": true + } + ], + "os": "linux", + "rootfs": { + "type": "layers", + "diff_ids": [ + "sha256:af593d271f82964b57d51cc5e647c6076fb160bf8620f605848130110f0ed647" + ] + } +} +` diff --git a/docs/middleware/mocks/ManifestStore.go b/docs/middleware/mocks/ManifestStore.go new file mode 100644 index 00000000..f54d7257 --- /dev/null +++ b/docs/middleware/mocks/ManifestStore.go @@ -0,0 +1,36 @@ +package mocks + +import "github.com/stretchr/testify/mock" + +import "github.com/docker/distribution" +import "github.com/docker/distribution/context" + +type ManifestStore struct { + mock.Mock +} + +func (m *ManifestStore) GetManifest(ctx context.Context, key string) ([]byte, error) { + ret := m.Called(ctx, key) + + var r0 []byte + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + r1 := ret.Error(1) + + return r0, r1 +} +func (m *ManifestStore) PutManifest(ctx context.Context, repo, digest string, val distribution.Manifest) error { + ret := m.Called(ctx, repo, digest, val) + + r0 := ret.Error(0) + + return r0 +} +func (m *ManifestStore) DeleteManifest(ctx context.Context, key string) error { + ret := m.Called(ctx, key) + + r0 := ret.Error(0) + + return r0 +} diff --git a/docs/middleware/mocks/Store.go b/docs/middleware/mocks/Store.go new file mode 100644 index 00000000..67fb8ad5 --- /dev/null +++ b/docs/middleware/mocks/Store.go @@ -0,0 +1,27 @@ +package mocks + +import ( + "time" + + "github.com/docker/dhe-deploy/manager/schema" +) + +type Store struct { + *ManifestStore + *TagStore +} + +func NewStore() *Store { + return &Store{ + &ManifestStore{}, + &TagStore{}, + } +} + +func (Store) CreateEvent(event *schema.Event) error { return nil } +func (Store) GetEvents(requestedPageEncoded string, perPage uint, publishedBefore, publishedAfter *time.Time, queryingUserId, actorId, eventType string, isAdmin bool) (events []schema.Event, nextPageEncoded string, err error) { + return []schema.Event{}, "", nil +} +func (Store) Subscribe(schema.EventReactor) chan bool { + return nil +} diff --git a/docs/middleware/mocks/TagStore.go b/docs/middleware/mocks/TagStore.go new file mode 100644 index 00000000..aec28a62 --- /dev/null +++ b/docs/middleware/mocks/TagStore.go @@ -0,0 +1,55 @@ +package mocks + +import "github.com/stretchr/testify/mock" + +import "github.com/docker/distribution" +import "github.com/docker/distribution/context" + +type TagStore struct { + mock.Mock +} + +func (m *TagStore) GetTag(ctx context.Context, repo distribution.Repository, key string) (distribution.Descriptor, error) { + ret := m.Called(ctx, repo, key) + + r0 := ret.Get(0).(distribution.Descriptor) + r1 := ret.Error(1) + + return r0, r1 +} +func (m *TagStore) PutTag(ctx context.Context, repo distribution.Repository, key string, val distribution.Descriptor) error { + ret := m.Called(ctx, repo, key, val) + + r0 := ret.Error(0) + + return r0 +} +func (m *TagStore) DeleteTag(ctx context.Context, repo distribution.Repository, key string) error { + ret := m.Called(ctx, repo, key) + + r0 := ret.Error(0) + + return r0 +} +func (m *TagStore) AllTags(ctx context.Context, repo distribution.Repository) ([]string, error) { + ret := m.Called(ctx, repo) + + var r0 []string + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + r1 := ret.Error(1) + + return r0, r1 +} +func (m *TagStore) LookupTags(ctx context.Context, repo distribution.Repository, digest distribution.Descriptor) ([]string, error) { + ret := m.Called(ctx, repo, digest) + + var r0 []string + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + r1 := ret.Error(1) + + return r0, r1 +} diff --git a/docs/middleware/store.go b/docs/middleware/store.go new file mode 100644 index 00000000..8e859714 --- /dev/null +++ b/docs/middleware/store.go @@ -0,0 +1,74 @@ +package middleware + +import ( + "fmt" + + "github.com/docker/dhe-deploy/manager/schema" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" +) + +// RegisterStore should be called before instantiating the metadata middleware +// to register your storage implementation with this package. +// +// This uses some minor global state to save the registered store. +func RegisterStore(store Store) error { + if registeredStore != nil { + return fmt.Errorf("a store has already been registered for the metadata middleware") + } + registeredStore = store + return nil +} + +// Store represents an abstract datastore for use with the metadata middleware. +// +// Each function is also passed the registry context, which contains useful +// information such as the currently authed user. +type Store interface { + ManifestStore + TagStore + schema.EventManager +} + +type ManifestStore interface { + // Get returns a manifest given its digest as a raw byte slice. + // + // If the key is not found this must return ErrNotFound from this + // package. + GetManifest(ctx context.Context, key string) ([]byte, error) + + // Put stores a manifest in the datastore given the manifest hash. + PutManifest(ctx context.Context, repo, digest string, val distribution.Manifest) error + + // Delete removes a manifest by the hash. + // + // If the key is not found this must return ErrNotFound from this + // package. + DeleteManifest(ctx context.Context, key string) error +} + +type TagStore interface { + // Get returns a tag's Descriptor given its name. + // + // If the key is not found this must return ErrNotFound from this + // package. + GetTag(ctx context.Context, repo distribution.Repository, key string) (distribution.Descriptor, error) + + // Put stores a tag's Descriptor in the datastore given the tag name. + PutTag(ctx context.Context, repo distribution.Repository, key string, val distribution.Descriptor) error + + // Delete removes a tag by the name. + // + // If the key is not found this must return ErrNotFound from this + // package. + DeleteTag(ctx context.Context, repo distribution.Repository, key string) error + + // AllTags returns all tag names as a slice of strings for the repository + // in which a TagStore was created + AllTags(ctx context.Context, repo distribution.Repository) ([]string, error) + + // Lookup returns all tags which point to a given digest as a slice of + // tag names + LookupTags(ctx context.Context, repo distribution.Repository, digest distribution.Descriptor) ([]string, error) +} diff --git a/docs/middleware/tagstore.go b/docs/middleware/tagstore.go new file mode 100644 index 00000000..b166a16b --- /dev/null +++ b/docs/middleware/tagstore.go @@ -0,0 +1,72 @@ +package middleware + +import ( + "github.com/docker/dhe-deploy/events" + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/auth" + + log "github.com/Sirupsen/logrus" + "github.com/palantir/stacktrace" +) + +type tagStore struct { + ctx context.Context + repo distribution.Repository + store Store + + blobService distribution.TagService + // When deleting tags we need the ManifestService backed by the blobstore + blobMfstService distribution.ManifestService +} + +// Get returns a tag from the blobstore. +// Note that we don't use the metadata store for this - if we did pulls would +// fail as the the metadata exists only on the filesystem. +func (t *tagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { + return t.blobService.Get(ctx, tag) +} + +// Tag associates the tag with the provided descriptor, updating the +// current association, if needed. +func (t *tagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error { + if err := t.blobService.Tag(ctx, tag, desc); err != nil { + return err + } + err := t.store.PutTag(ctx, t.repo, tag, desc) + if err != nil { + return err + } + author, _ := ctx.Value(auth.UserNameKey).(string) + // need to create event manager where the middleware gets initted + err = events.TagImageEvent(t.store, author, t.repo.Named().Name(), tag) + if err != nil { + log.Errorf("TagImageEvent creation failed: %+v", err) + } + return nil +} + +// Untag removes the given tag association from both the blobstore and our +// metadata store directly. +func (t *tagStore) Untag(ctx context.Context, tag string) error { + // If the metadata store deletes a manifest we should also remove the + // manifest from the filesystem + if err := t.store.DeleteTag(ctx, t.repo, tag); err != nil { + return stacktrace.Propagate(err, "error deleting tag from metadata store") + } + if err := t.blobService.Untag(ctx, tag); err != nil { + return stacktrace.Propagate(err, "error untagging from blobstore") + } + return nil +} + +// All returns the set of tags for the parent repository, as +// defined in tagStore.repo +func (t *tagStore) All(ctx context.Context) ([]string, error) { + return t.blobService.All(ctx) +} + +// Lookup returns the set of tags referencing the given digest. +func (t *tagStore) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) { + return t.blobService.Lookup(ctx, digest) +} diff --git a/docs/registry/registry.go b/docs/registry/registry.go new file mode 100644 index 00000000..cafc4083 --- /dev/null +++ b/docs/registry/registry.go @@ -0,0 +1,186 @@ +package main + +import ( + "io/ioutil" + "os" + "os/signal" + "path" + "syscall" + "time" + + "gopkg.in/yaml.v2" + + log "github.com/Sirupsen/logrus" + + // Register the DTR authorizer. + "github.com/docker/dhe-deploy" + _ "github.com/docker/dhe-deploy/garant/authz" + "github.com/docker/dhe-deploy/hubconfig" + "github.com/docker/dhe-deploy/hubconfig/etcd" + "github.com/docker/dhe-deploy/hubconfig/util" + "github.com/docker/dhe-deploy/manager/schema" + "github.com/docker/dhe-deploy/registry/middleware" + "github.com/docker/dhe-deploy/shared/containers" + "github.com/docker/dhe-deploy/shared/dtrutil" + + // register all storage and auth drivers + _ "github.com/docker/distribution/registry/auth/htpasswd" + _ "github.com/docker/distribution/registry/auth/silly" + _ "github.com/docker/distribution/registry/auth/token" + _ "github.com/docker/distribution/registry/proxy" + _ "github.com/docker/distribution/registry/storage/driver/azure" + _ "github.com/docker/distribution/registry/storage/driver/filesystem" + _ "github.com/docker/distribution/registry/storage/driver/gcs" + _ "github.com/docker/distribution/registry/storage/driver/inmemory" + _ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" + _ "github.com/docker/distribution/registry/storage/driver/oss" + _ "github.com/docker/distribution/registry/storage/driver/s3-aws" + _ "github.com/docker/distribution/registry/storage/driver/swift" + + "github.com/docker/distribution/configuration" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry" + "github.com/docker/distribution/version" + "github.com/docker/garant" + + // Metadata store + repomiddleware "github.com/docker/distribution/registry/middleware/repository" +) + +const configFilePath = "/config/storage.yml" + +func main() { + log.SetFormatter(new(log.JSONFormatter)) + releaseRestartLock() + notifyReadOnly() + setupMiddleware() + go waitForReload() + go runGarant() + runRegistry() +} + +func runGarant() { + log.Info("garant starting") + + app, err := garant.NewApp("/config/garant.yml") + if err != nil { + log.Fatalf("unable to initialize token server app: %s", err) + } + + log.Fatal(app.ListenAndServe()) +} + +func waitForReload() { + log.Info("listening for sigusr2") + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGUSR2) + _ = <-c + log.Info("got sigusr2! Attempting to shut down safely") + + dtrKVStore := makeKVStore() + + log.Info("getting restart lock") + // This will block until no other registry is restarting + err := dtrKVStore.Lock(deploy.RegistryRestartLockPath, []byte(os.Getenv(deploy.ReplicaIDEnvVar)), time.Minute) + if err != nil { + log.Fatalf("Failed to get restart lock: %s", err) + } + + log.Fatal("restarting now") +} + +func releaseRestartLock() { + kvStore := makeKVStore() + + value, err := kvStore.Get(deploy.RegistryRestartLockPath) + if err != nil { + log.Infof("No lock found to release: %s", err) + return + } + if string(value) == os.Getenv(deploy.ReplicaIDEnvVar) { + // Unlock the key so others can restart too + // TODO: check for intermittent failures and do some retries + err := kvStore.Delete(deploy.RegistryRestartLockPath) + log.Infof("removing restart lock: %s", err) + } else { + log.Info("someone else is holding the lock, not releasing") + } +} + +func notifyReadOnly() { + storageFile, err := ioutil.ReadFile(configFilePath) + if err != nil { + log.Fatalf("error reading storage.yml: %s", err) + } + var storageYML configuration.Configuration + err = yaml.Unmarshal(storageFile, &storageYML) + if err != nil { + log.Fatalf("error unmarshaling storage.yml: %s", err) + } + roMode := util.GetReadonlyMode(&storageYML.Storage) + kvStore := makeKVStore() + roModePath := path.Join(deploy.RegistryROStatePath, os.Getenv(deploy.ReplicaIDEnvVar)) + if roMode { + log.Infof("registering self as being in read-only mode at key: %s", roModePath) + err := kvStore.Put(roModePath, []byte{}) + if err != nil { + log.Errorf("Failed to register self as read-only: %s", err) + time.Sleep(1) + log.Fatalf("Failed to register self as read-only: %s", err) + } + } else { + // TODO: check the type of error and retry if it's an intermittent failure instead of a double delete + err = kvStore.Delete(roModePath) + log.Infof("no longer in read-only mode: %s", err) + } +} + +func runRegistry() { + log.Info("registry starting") + + fp, err := os.Open(configFilePath) + if err != nil { + log.Fatalf("unable to open registry config: %s", err) + } + + defer fp.Close() + + config, err := configuration.Parse(fp) + if err != nil { + log.Fatalf("error parsing registry config: %s", err) + } + if config.Storage.Type() == "filesystem" { + params := config.Storage["filesystem"] + params["rootdirectory"] = "/storage" + config.Storage["filesystem"] = params + } + + registry, err := registry.NewRegistry(context.WithVersion(context.Background(), version.Version), config) + if err != nil { + log.Fatalf("unable to initialize registry: %s", err) + } + log.Fatal(registry.ListenAndServe()) +} + +// TODO: make don't call this function so many times +func makeKVStore() hubconfig.KeyValueStore { + dtrKVStore, err := etcd.NewKeyValueStore(containers.EtcdUrls(), deploy.EtcdPath) + if err != nil { + log.Fatalf("something went wrong when trying to initialize the Lock: %s", err) + } + return dtrKVStore +} + +func setupMiddleware() { + replicaID := os.Getenv(deploy.ReplicaIDEnvVar) + db, err := dtrutil.GetRethinkSession(replicaID) + if err != nil { + log.WithField("error", err).Fatal("failed to connect to rethink") + } + store := schema.NewMetadataManager(db) + middleware.RegisterStore(store) + if err := repomiddleware.Register("metadata", middleware.InitMiddleware); err != nil { + log.WithField("err", err).Fatal("unable to register metadata middleware") + } + log.Info("connected to middleware") +}