Skip to content

Commit

Permalink
QUOTA: refactor our use of quota
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Dec 5, 2017
1 parent 722867b commit 1557bb3
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 429 deletions.
13 changes: 9 additions & 4 deletions pkg/cmd/server/origin/admission/plugin_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
imageinformer "github.com/openshift/origin/pkg/image/generated/informers/internalversion"
imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset"
projectcache "github.com/openshift/origin/pkg/project/cache"
oquota "github.com/openshift/origin/pkg/quota"
"github.com/openshift/origin/pkg/quota/controller/clusterquotamapping"
quotainformer "github.com/openshift/origin/pkg/quota/generated/informers/internalversion"
quotaclient "github.com/openshift/origin/pkg/quota/generated/internalclientset"
"github.com/openshift/origin/pkg/quota/image"
securityinformer "github.com/openshift/origin/pkg/security/generated/informers/internalversion"
"github.com/openshift/origin/pkg/service"
templateclient "github.com/openshift/origin/pkg/template/generated/internalclientset"
Expand All @@ -42,6 +42,8 @@ import (
kclientsetinternal "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/quota/install"
)

type InformerAccess interface {
Expand Down Expand Up @@ -99,12 +101,15 @@ func NewPluginInitializer(
return nil, nil, err
}

quotaRegistry := oquota.NewAllResourceQuotaRegistryForAdmission(
informers.GetExternalKubeInformers(),
// TODO make a union registry
quotaRegistry := generic.NewRegistry(install.NewQuotaConfigurationForAdmission().Evaluators())
imageEvaluators := image.NewReplenishmentEvaluatorsForAdmission(
informers.GetImageInformers().Image().InternalVersion().ImageStreams(),
imageClient.Image(),
kubeExternalClient,
)
for i := range imageEvaluators {
quotaRegistry.Add(imageEvaluators[i])
}

defaultRegistry := env("OPENSHIFT_DEFAULT_REGISTRY", "${DOCKER_REGISTRY_SERVICE_HOST}:${DOCKER_REGISTRY_SERVICE_PORT}")
svcCache := service.NewServiceResolverCache(kubeInternalClient.Core().Services(metav1.NamespaceDefault).Get)
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/server/origin/controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kexternalinformers "k8s.io/client-go/informers"
kclientsetinternal "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
Expand Down Expand Up @@ -40,9 +41,13 @@ type ControllerContext struct {
QuotaInformers quotainformer.SharedInformerFactory
AuthorizationInformers authorizationinformer.SharedInformerFactory
SecurityInformers securityinformer.SharedInformerFactory
GenericInformerFunc func(schema.GroupVersionResource) (kexternalinformers.GenericInformer, error)

// Stop is the stop channel
Stop <-chan struct{}
// InformersStarted is closed after all of the controllers have been initialized and are running. After this point it is safe,
// for an individual controller to start the shared informers. Before it is closed, they should not.
InformersStarted chan struct{}
}

// OpenshiftControllerOptions contain the options used to run the controllers. Eventually we need to construct a way to properly
Expand Down
77 changes: 46 additions & 31 deletions pkg/cmd/server/origin/controller/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,44 @@ import (
"time"

"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
quotacontroller "github.com/openshift/origin/pkg/quota/controller"
"github.com/openshift/origin/pkg/quota/controller/clusterquotamapping"
"github.com/openshift/origin/pkg/quota/controller/clusterquotareconciliation"
"github.com/openshift/origin/pkg/quota/image"
"k8s.io/kubernetes/pkg/controller"
kresourcequota "k8s.io/kubernetes/pkg/controller/resourcequota"

"github.com/openshift/origin/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
)

func RunResourceQuotaManager(ctx ControllerContext) (bool, error) {
concurrentResourceQuotaSyncs := int(ctx.OpenshiftControllerOptions.ResourceQuotaOptions.ConcurrentSyncs)
resourceQuotaSyncPeriod := ctx.OpenshiftControllerOptions.ResourceQuotaOptions.SyncPeriod.Duration
replenishmentSyncPeriodFunc := calculateResyncPeriod(ctx.OpenshiftControllerOptions.ResourceQuotaOptions.MinResyncPeriod.Duration)
saName := "resourcequota-controller"
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.GenericInformerFunc)
quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)

resourceQuotaRegistry := quota.NewOriginQuotaRegistry(
imageEvaluators := image.NewReplenishmentEvaluators(
listerFuncForResource,
ctx.ImageInformers.Image().InternalVersion().ImageStreams(),
ctx.ClientBuilder.OpenshiftInternalImageClientOrDie(saName).Image(),
)
ctx.ClientBuilder.OpenshiftInternalImageClientOrDie(saName).Image())
resourceQuotaRegistry := generic.NewRegistry(imageEvaluators)

resourceQuotaControllerOptions := &kresourcequota.ResourceQuotaControllerOptions{
QuotaClient: ctx.ClientBuilder.ClientOrDie(saName).Core(),
ResourceQuotaInformer: ctx.ExternalKubeInformers.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(resourceQuotaSyncPeriod),
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: quota.AllEvaluatedGroupKinds,
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(
ctx.ExternalKubeInformers,
ctx.ImageInformers.Image().InternalVersion().ImageStreams(),
),
QuotaClient: ctx.ClientBuilder.ClientOrDie(saName).Core(),
ResourceQuotaInformer: ctx.ExternalKubeInformers.Core().V1().ResourceQuotas(),
ResyncPeriod: controller.StaticResyncPeriodFunc(resourceQuotaSyncPeriod),
Registry: resourceQuotaRegistry,
ReplenishmentResyncPeriod: replenishmentSyncPeriodFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
InformersStarted: ctx.InformersStarted,
InformerFactory: ctx.ExternalKubeInformers,
}
controller, err := kresourcequota.NewResourceQuotaController(resourceQuotaControllerOptions)
if err != nil {
return true, err
}
go kresourcequota.NewResourceQuotaController(resourceQuotaControllerOptions).Run(concurrentResourceQuotaSyncs, ctx.Stop)
go controller.Run(concurrentResourceQuotaSyncs, ctx.Stop)

return true, nil
}
Expand All @@ -49,32 +54,42 @@ type ClusterQuotaReconciliationControllerConfig struct {

func (c *ClusterQuotaReconciliationControllerConfig) RunController(ctx ControllerContext) (bool, error) {
saName := bootstrappolicy.InfraClusterQuotaReconciliationControllerServiceAccountName
resourceQuotaRegistry := quota.NewAllResourceQuotaRegistry(
ctx.ExternalKubeInformers,
ctx.ImageInformers.Image().InternalVersion().ImageStreams(),
ctx.ClientBuilder.OpenshiftInternalImageClientOrDie(saName).Image(),
ctx.ClientBuilder.ClientOrDie(saName),
)
groupKindsToReplenish := quota.AllEvaluatedGroupKinds

clusterQuotaMappingController := clusterquotamapping.NewClusterQuotaMappingController(
ctx.ExternalKubeInformers.Core().V1().Namespaces(),
ctx.QuotaInformers.Quota().InternalVersion().ClusterResourceQuotas())
resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")
discoveryFunc := resourceQuotaControllerClient.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.GenericInformerFunc)
quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)

// TODO make a union registry
resourceQuotaRegistry := generic.NewRegistry(quotaConfiguration.Evaluators())
imageEvaluators := image.NewReplenishmentEvaluators(
listerFuncForResource,
ctx.ImageInformers.Image().InternalVersion().ImageStreams(),
ctx.ClientBuilder.OpenshiftInternalImageClientOrDie(saName).Image())
for i := range imageEvaluators {
resourceQuotaRegistry.Add(imageEvaluators[i])
}

options := clusterquotareconciliation.ClusterQuotaReconcilationControllerOptions{
ClusterQuotaInformer: ctx.QuotaInformers.Quota().InternalVersion().ClusterResourceQuotas(),
ClusterQuotaMapper: clusterQuotaMappingController.GetClusterQuotaMapper(),
ClusterQuotaClient: ctx.ClientBuilder.OpenshiftInternalQuotaClientOrDie(saName).Quota().ClusterResourceQuotas(),

Registry: resourceQuotaRegistry,
ResyncPeriod: c.DefaultResyncPeriod,
ControllerFactory: quotacontroller.NewAllResourceReplenishmentControllerFactory(
ctx.ExternalKubeInformers,
ctx.ImageInformers.Image().InternalVersion().ImageStreams(),
),
Registry: resourceQuotaRegistry,
ResyncPeriod: c.DefaultResyncPeriod,
ReplenishmentResyncPeriod: controller.StaticResyncPeriodFunc(c.DefaultReplenishmentSyncPeriod),
GroupKindsToReplenish: groupKindsToReplenish,
DiscoveryFunc: discoveryFunc,
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
InformersStarted: ctx.InformersStarted,
InformerFactory: ctx.ExternalKubeInformers,
}
clusterQuotaReconciliationController, err := clusterquotareconciliation.NewClusterQuotaReconcilationController(options)
if err != nil {
return true, err
}
clusterQuotaReconciliationController := clusterquotareconciliation.NewClusterQuotaReconcilationController(options)
clusterQuotaMappingController.GetClusterQuotaMapper().AddListener(clusterQuotaReconciliationController)

go clusterQuotaMappingController.Run(5, ctx.Stop)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/server/start/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func newControllerContext(
kubeExternal kclientsetexternal.Interface,
informers *informers,
stopCh <-chan struct{},
informersStarted chan struct{},
) origincontrollers.ControllerContext {

// divide up the QPS since it re-used separately for every client
Expand All @@ -51,15 +52,17 @@ func newControllerContext(
},
},
InternalKubeInformers: informers.internalKubeInformers,
ExternalKubeInformers: informers.externalKubeInformers,
ExternalKubeInformers: newGenericInformers(informers),
AppInformers: informers.appInformers,
AuthorizationInformers: informers.authorizationInformers,
BuildInformers: informers.buildInformers,
ImageInformers: informers.imageInformers,
QuotaInformers: informers.quotaInformers,
SecurityInformers: informers.securityInformers,
TemplateInformers: informers.templateInformers,
GenericInformerFunc: newGenericInformers(informers).ForResource,
Stop: stopCh,
InformersStarted: informersStarted,
}

return openshiftControllerContext
Expand Down
6 changes: 5 additions & 1 deletion pkg/cmd/server/start/start_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (m *Master) Start() error {
if err != nil {
return err
}

_, config, err := configapi.GetExternalKubeClient(m.config.MasterClients.OpenShiftLoopbackKubeConfig, m.config.MasterClients.OpenShiftLoopbackClientConnectionOverrides)
if err != nil {
return err
Expand Down Expand Up @@ -489,12 +490,15 @@ func (m *Master) Start() error {
if err != nil {
glog.Fatal(err)
}
controllerContext := newControllerContext(openshiftControllerOptions, privilegedLoopbackConfig, kubeExternal, informers, utilwait.NeverStop)

controllerContext := newControllerContext(openshiftControllerOptions, privilegedLoopbackConfig, kubeExternal, informers, utilwait.NeverStop, make(chan struct{}))
if err := startControllers(*m.config, allocationController, controllerContext); err != nil {
glog.Fatal(err)
}

informers.Start(utilwait.NeverStop)
close(controllerContext.InformersStarted)

}()
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/quota/admission/clusterresourcequota/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota"
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"

Expand Down Expand Up @@ -88,7 +89,7 @@ func (q *clusterQuotaAdmission) Admit(a admission.Attributes) (err error) {

q.init.Do(func() {
clusterQuotaAccessor := newQuotaAccessor(q.clusterQuotaLister, q.namespaceLister, q.clusterQuotaClient, q.clusterQuotaMapper)
q.evaluator = resourcequota.NewQuotaEvaluator(clusterQuotaAccessor, q.registry, q.lockAquisition, &resourcequotaapi.Configuration{}, numEvaluatorThreads, utilwait.NeverStop)
q.evaluator = resourcequota.NewQuotaEvaluator(clusterQuotaAccessor, install.DefaultIgnoredResources(), q.registry, q.lockAquisition, &resourcequotaapi.Configuration{}, numEvaluatorThreads, utilwait.NeverStop)
})

return q.evaluator.Evaluate(a)
Expand Down
Loading

0 comments on commit 1557bb3

Please sign in to comment.