Skip to content

Commit

Permalink
Merge pull request #15796 from dmage/app
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 15725, 16244, 15796, 16328, 16334)

Add dockerregistry/server.App struct for application-level data
  • Loading branch information
openshift-merge-robot authored Sep 15, 2017
2 parents f35ae09 + dfbbed9 commit 6af6cb2
Show file tree
Hide file tree
Showing 24 changed files with 599 additions and 570 deletions.
101 changes: 13 additions & 88 deletions pkg/cmd/dockerregistry/dockerregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
"github.com/docker/distribution/health"
"github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/registry/handlers"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/docker/distribution/uuid"
Expand All @@ -45,7 +43,6 @@ import (
"github.com/openshift/origin/pkg/cmd/server/crypto"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
"github.com/openshift/origin/pkg/dockerregistry/server"
"github.com/openshift/origin/pkg/dockerregistry/server/api"
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
"github.com/openshift/origin/pkg/dockerregistry/server/client"
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
Expand Down Expand Up @@ -148,94 +145,22 @@ func Execute(configFile io.Reader) {
setDefaultLogParameters(dockerConfig)

ctx := context.Background()
ctx = server.WithConfiguration(ctx, extraConfig)
ctx, err = configureLogging(ctx, dockerConfig)
if err != nil {
log.Fatalf("error configuring logger: %v", err)
}

registryClient := client.NewRegistryClient(clientcmd.NewConfig().BindToFile())
ctx = server.WithRegistryClient(ctx, registryClient)

readLimiter := newLimiter(extraConfig.Requests.Read)
writeLimiter := newLimiter(extraConfig.Requests.Write)
ctx = server.WithWriteLimiter(ctx, writeLimiter)

log.WithFields(versionFields()).Info("start registry")
// inject a logger into the uuid library. warns us if there is a problem
// with uuid generation under low entropy.
uuid.Loggerf = context.GetLogger(ctx).Warnf

// add parameters for the auth middleware
if dockerConfig.Auth.Type() == server.OpenShiftAuth {
if dockerConfig.Auth[server.OpenShiftAuth] == nil {
dockerConfig.Auth[server.OpenShiftAuth] = make(configuration.Parameters)
}
dockerConfig.Auth[server.OpenShiftAuth][server.AccessControllerOptionParams] = server.AccessControllerParams{
Logger: context.GetLogger(ctx),
RegistryClient: registryClient,
}
}

app := handlers.NewApp(ctx, dockerConfig)

// Add a token handling endpoint
if options, usingOpenShiftAuth := dockerConfig.Auth[server.OpenShiftAuth]; usingOpenShiftAuth {
tokenRealm, err := server.TokenRealm(options)
if err != nil {
context.GetLogger(app).Fatalf("error setting up token auth: %s", err)
}
err = app.NewRoute().Methods("GET").PathPrefix(tokenRealm.Path).Handler(server.NewTokenHandler(ctx, registryClient)).GetError()
if err != nil {
context.GetLogger(app).Fatalf("error setting up token endpoint at %q: %v", tokenRealm.Path, err)
}
context.GetLogger(app).Debugf("configured token endpoint at %q", tokenRealm.String())
}

// TODO add https scheme
adminRouter := app.NewRoute().PathPrefix(api.AdminPrefix).Subrouter()
pruneAccessRecords := func(*http.Request) []auth.Access {
return []auth.Access{
{
Resource: auth.Resource{
Type: "admin",
},
Action: "prune",
},
}
}

app.RegisterRoute(
// DELETE /admin/blobs/<digest>
adminRouter.Path(api.AdminPath).Methods("DELETE"),
// handler
server.BlobDispatcher,
// repo name not required in url
handlers.NameNotRequired,
// custom access records
pruneAccessRecords,
)

// Registry extensions endpoint provides extra functionality to handle the image
// signatures.
server.RegisterSignatureHandler(app)

// Registry extensions endpoint provides prometheus metrics.
if extraConfig.Metrics.Enabled {
if len(extraConfig.Metrics.Secret) == 0 {
context.GetLogger(app).Fatalf("openshift.metrics.secret field cannot be empty when metrics are enabled")
}
server.RegisterMetricHandler(app)
}
registryClient := client.NewRegistryClient(clientcmd.NewConfig().BindToFile())

// Advertise features supported by OpenShift
if app.Config.HTTP.Headers == nil {
app.Config.HTTP.Headers = http.Header{}
}
app.Config.HTTP.Headers.Set("X-Registry-Supports-Signatures", "1")
readLimiter := newLimiter(extraConfig.Requests.Read)
writeLimiter := newLimiter(extraConfig.Requests.Write)

app.RegisterHealthChecks()
handler := http.Handler(app)
handler := server.NewApp(ctx, registryClient, dockerConfig, extraConfig, writeLimiter)
handler = limit(readLimiter, writeLimiter, handler)
handler = alive("/", handler)
// TODO: temporarily keep for backwards compatibility; remove in the future
Expand All @@ -245,9 +170,9 @@ func Execute(configFile io.Reader) {
handler = gorillahandlers.CombinedLoggingHandler(os.Stdout, handler)

if dockerConfig.HTTP.TLS.Certificate == "" {
context.GetLogger(app).Infof("listening on %v", dockerConfig.HTTP.Addr)
context.GetLogger(ctx).Infof("listening on %v", dockerConfig.HTTP.Addr)
if err := http.ListenAndServe(dockerConfig.HTTP.Addr, handler); err != nil {
context.GetLogger(app).Fatalln(err)
context.GetLogger(ctx).Fatalln(err)
}
} else {
var (
Expand All @@ -257,14 +182,14 @@ func Execute(configFile io.Reader) {
if s := os.Getenv("REGISTRY_HTTP_TLS_MINVERSION"); len(s) > 0 {
minVersion, err = crypto.TLSVersion(s)
if err != nil {
context.GetLogger(app).Fatalln(fmt.Errorf("invalid TLS version %q specified in REGISTRY_HTTP_TLS_MINVERSION: %v (valid values are %q)", s, err, crypto.ValidTLSVersions()))
context.GetLogger(ctx).Fatalln(fmt.Errorf("invalid TLS version %q specified in REGISTRY_HTTP_TLS_MINVERSION: %v (valid values are %q)", s, err, crypto.ValidTLSVersions()))
}
}
if s := os.Getenv("REGISTRY_HTTP_TLS_CIPHERSUITES"); len(s) > 0 {
for _, cipher := range strings.Split(s, ",") {
cipherSuite, err := crypto.CipherSuite(cipher)
if err != nil {
context.GetLogger(app).Fatalln(fmt.Errorf("invalid cipher suite %q specified in REGISTRY_HTTP_TLS_CIPHERSUITES: %v (valid suites are %q)", s, err, crypto.ValidCipherSuites()))
context.GetLogger(ctx).Fatalln(fmt.Errorf("invalid cipher suite %q specified in REGISTRY_HTTP_TLS_CIPHERSUITES: %v (valid suites are %q)", s, err, crypto.ValidCipherSuites()))
}
cipherSuites = append(cipherSuites, cipherSuite)
}
Expand All @@ -282,31 +207,31 @@ func Execute(configFile io.Reader) {
for _, ca := range dockerConfig.HTTP.TLS.ClientCAs {
caPem, err := ioutil.ReadFile(ca)
if err != nil {
context.GetLogger(app).Fatalln(err)
context.GetLogger(ctx).Fatalln(err)
}

if ok := pool.AppendCertsFromPEM(caPem); !ok {
context.GetLogger(app).Fatalln(fmt.Errorf("Could not add CA to pool"))
context.GetLogger(ctx).Fatalln(fmt.Errorf("Could not add CA to pool"))
}
}

for _, subj := range pool.Subjects() {
context.GetLogger(app).Debugf("CA Subject: %s", string(subj))
context.GetLogger(ctx).Debugf("CA Subject: %s", string(subj))
}

tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
tlsConf.ClientCAs = pool
}

context.GetLogger(app).Infof("listening on %v, tls", dockerConfig.HTTP.Addr)
context.GetLogger(ctx).Infof("listening on %v, tls", dockerConfig.HTTP.Addr)
server := &http.Server{
Addr: dockerConfig.HTTP.Addr,
Handler: handler,
TLSConfig: tlsConf,
}

if err := server.ListenAndServeTLS(dockerConfig.HTTP.TLS.Certificate, dockerConfig.HTTP.TLS.Key); err != nil {
context.GetLogger(app).Fatalln(err)
context.GetLogger(ctx).Fatalln(err)
}
}
}
Expand Down
36 changes: 33 additions & 3 deletions pkg/dockerregistry/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,49 @@ import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/registry/handlers"
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
gorillahandlers "github.com/gorilla/handlers"

"github.com/openshift/origin/pkg/dockerregistry/server/api"
)

// BlobDispatcher takes the request context and builds the appropriate handler
func (app *App) registerBlobHandler(dockerApp *handlers.App) {
adminRouter := dockerApp.NewRoute().PathPrefix(api.AdminPrefix).Subrouter()
pruneAccessRecords := func(*http.Request) []auth.Access {
return []auth.Access{
{
Resource: auth.Resource{
Type: "admin",
},
Action: "prune",
},
}
}

dockerApp.RegisterRoute(
// DELETE /admin/blobs/<digest>
adminRouter.Path(api.AdminPath).Methods("DELETE"),
// handler
app.blobDispatcher,
// repo name not required in url
handlers.NameNotRequired,
// custom access records
pruneAccessRecords,
)
}

// blobDispatcher takes the request context and builds the appropriate handler
// for handling blob requests.
func BlobDispatcher(ctx *handlers.Context, r *http.Request) http.Handler {
func (app *App) blobDispatcher(ctx *handlers.Context, r *http.Request) http.Handler {
reference := context.GetStringValue(ctx, "vars.digest")
dgst, _ := digest.ParseDigest(reference)

blobHandler := &blobHandler{
Context: ctx,
driver: app.driver,
Digest: dgst,
}

Expand All @@ -35,6 +64,7 @@ func BlobDispatcher(ctx *handlers.Context, r *http.Request) http.Handler {
type blobHandler struct {
*handlers.Context

driver storagedriver.StorageDriver
Digest digest.Digest
}

Expand All @@ -47,7 +77,7 @@ func (bh *blobHandler) Delete(w http.ResponseWriter, req *http.Request) {
return
}

vacuum := storage.NewVacuum(bh.Context, dockerStorageDriver)
vacuum := storage.NewVacuum(bh.Context, bh.driver)

err := vacuum.RemoveBlob(bh.Digest.String())
if err != nil {
Expand Down
137 changes: 137 additions & 0 deletions pkg/dockerregistry/server/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package server

import (
"net/http"
"os"

"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/handlers"
storagedriver "github.com/docker/distribution/registry/storage/driver"

"github.com/openshift/origin/pkg/dockerregistry/server/client"
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
)

// App is a global registry application object. Shared resources can be placed
// on this object that will be accessible from all requests.
type App struct {
// ctx is the parent context.
ctx context.Context

registryClient client.RegistryClient
extraConfig *registryconfig.Configuration
repositoryConfig repositoryConfig
writeLimiter maxconnections.Limiter

// driver gives access to the blob store.
// This variable holds the object created by docker/distribution. We
// import it into our namespace because there are no other ways to access
// it. In other cases it is hidden from us.
driver storagedriver.StorageDriver

// registry represents a collection of repositories, addressable by name.
// This variable holds the object created by docker/distribution. We
// import it into our namespace because there are no other ways to access
// it. In other cases it is hidden from us.
registry distribution.Namespace

// cachedLayers is a shared cache of blob digests to repositories that have previously been identified as
// containing that blob. Thread safe and reused by all middleware layers. It contains two kinds of
// associations:
// 1. <blobdigest> <-> <registry>/<namespace>/<name>
// 2. <blobdigest> <-> <namespace>/<name>
// The first associates a blob with a remote repository. Such an entry is set and used by pullthrough
// middleware. The second associates a blob with a local repository. Such a blob is expected to reside on
// local storage. It's set and used by blobDescriptorService middleware.
cachedLayers digestToRepositoryCache

// quotaEnforcing contains shared caches of quota objects keyed by project
// name. Will be initialized only if the quota is enforced.
// See EnforceQuotaEnvVar.
quotaEnforcing *quotaEnforcingConfig
}

// NewApp configures the registry application and returns http.Handler for it.
// The program will be terminated if an error happens.
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
app := &App{
ctx: ctx,
registryClient: registryClient,
extraConfig: extraConfig,
writeLimiter: writeLimiter,
}

cache, err := newDigestToRepositoryCache(defaultDigestToRepositoryCacheSize)
if err != nil {
panic(err)
}
app.cachedLayers = cache

weaveAppIntoConfig(app, dockerConfig)

repositoryEnabled := false
for _, middleware := range dockerConfig.Middleware["repository"] {
if middleware.Name == middlewareOpenShift {
rc, err := newRepositoryConfig(ctx, middleware.Options)
if err != nil {
context.GetLogger(ctx).Fatalf("error configuring the repository middleware: %s", err)
}
app.repositoryConfig = rc
app.quotaEnforcing = newQuotaEnforcingConfig(ctx, os.Getenv(EnforceQuotaEnvVar), os.Getenv(ProjectCacheTTLEnvVar), middleware.Options)
repositoryEnabled = true
break
}
}

dockerApp := handlers.NewApp(ctx, dockerConfig)

if repositoryEnabled {
if app.driver == nil {
context.GetLogger(ctx).Fatalf("configuration error: the storage driver middleware %q is not activated", middlewareOpenShift)
}

if app.registry == nil {
context.GetLogger(ctx).Fatalf("configuration error: the registry middleware %q is not activated", middlewareOpenShift)
}
}

// Add a token handling endpoint
if dockerConfig.Auth.Type() == middlewareOpenShift {
tokenRealm, err := TokenRealm(dockerConfig.Auth[middlewareOpenShift])
if err != nil {
context.GetLogger(dockerApp).Fatalf("error setting up token auth: %s", err)
}
err = dockerApp.NewRoute().Methods("GET").PathPrefix(tokenRealm.Path).Handler(NewTokenHandler(ctx, registryClient)).GetError()
if err != nil {
context.GetLogger(dockerApp).Fatalf("error setting up token endpoint at %q: %v", tokenRealm.Path, err)
}
context.GetLogger(dockerApp).Debugf("configured token endpoint at %q", tokenRealm.String())
}

app.registerBlobHandler(dockerApp)

// Registry extensions endpoint provides extra functionality to handle the image
// signatures.
RegisterSignatureHandler(dockerApp)

// Registry extensions endpoint provides prometheus metrics.
if extraConfig.Metrics.Enabled {
if len(extraConfig.Metrics.Secret) == 0 {
context.GetLogger(dockerApp).Fatalf("openshift.metrics.secret field cannot be empty when metrics are enabled")
}
RegisterMetricHandler(dockerApp)
}

// Advertise features supported by OpenShift
if dockerApp.Config.HTTP.Headers == nil {
dockerApp.Config.HTTP.Headers = http.Header{}
}
dockerApp.Config.HTTP.Headers.Set("X-Registry-Supports-Signatures", "1")

dockerApp.RegisterHealthChecks()

return dockerApp
}
Loading

0 comments on commit 6af6cb2

Please sign in to comment.