From f98eaa938de857a06f00f34f5f8bfa5ddff6ce01 Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 28 Aug 2017 15:32:59 -0400 Subject: [PATCH] run controller by wiring up to a command --- .../kubernetes/master/controller/config.go | 47 ------------------- .../kubernetes/master/controller/core.go | 47 ------------------- pkg/cmd/server/start/start_master.go | 12 +++++ pkg/cmd/server/start/start_scheduler.go | 26 ++++++++++ 4 files changed, 38 insertions(+), 94 deletions(-) delete mode 100644 pkg/cmd/server/kubernetes/master/controller/core.go create mode 100644 pkg/cmd/server/start/start_scheduler.go diff --git a/pkg/cmd/server/kubernetes/master/controller/config.go b/pkg/cmd/server/kubernetes/master/controller/config.go index 0e330bad483b..a2096ad10d2a 100644 --- a/pkg/cmd/server/kubernetes/master/controller/config.go +++ b/pkg/cmd/server/kubernetes/master/controller/config.go @@ -1,19 +1,9 @@ package controller import ( - "fmt" - "io/ioutil" - "os" - - "k8s.io/apimachinery/pkg/runtime" - kerrors "k8s.io/apimachinery/pkg/util/errors" kubecontroller "k8s.io/kubernetes/cmd/kube-controller-manager/app" - scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" - schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" - latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" configapi "github.com/openshift/origin/pkg/cmd/server/api" - cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" "github.com/openshift/origin/pkg/cmd/util/variable" "k8s.io/kubernetes/pkg/volume" ) @@ -22,9 +12,6 @@ import ( // launch the set of kube (not openshift) controllers. type KubeControllerConfig struct { HorizontalPodAutoscalerControllerConfig HorizontalPodAutoscalerControllerConfig - - // TODO the scheduler should move out into its own logical component - SchedulerControllerConfig SchedulerControllerConfig } // GetControllerInitializers return the controller initializer functions for kube controllers @@ -36,48 +23,14 @@ func (c KubeControllerConfig) GetControllerInitializers() (map[string]kubecontro // in openshift-infra, and pass it a scale client that knows how to scale DCs ret["horizontalpodautoscaling"] = c.HorizontalPodAutoscalerControllerConfig.RunController - // FIXME: Move this under openshift controller intialization once we figure out - // deployment (options). - ret["openshift.io/scheduler"] = c.SchedulerControllerConfig.RunController - return ret, nil } // BuildKubeControllerConfig builds the struct to create the controller initializers. Eventually we want this to be fully // stock kube with no modification. func BuildKubeControllerConfig(options configapi.MasterConfig) (*KubeControllerConfig, error) { - var err error ret := &KubeControllerConfig{} - kubeExternal, _, err := configapi.GetExternalKubeClient(options.MasterClients.OpenShiftLoopbackKubeConfig, options.MasterClients.OpenShiftLoopbackClientConnectionOverrides) - if err != nil { - return nil, err - } - - var schedulerPolicy *schedulerapi.Policy - if _, err := os.Stat(options.KubernetesMasterConfig.SchedulerConfigFile); err == nil { - schedulerPolicy = &schedulerapi.Policy{} - configData, err := ioutil.ReadFile(options.KubernetesMasterConfig.SchedulerConfigFile) - if err != nil { - return nil, fmt.Errorf("unable to read scheduler config: %v", err) - } - if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, schedulerPolicy); err != nil { - return nil, fmt.Errorf("invalid scheduler configuration: %v", err) - } - } - // resolve extended arguments - // TODO: this should be done in config validation (along with the above) so we can provide - // proper errors - schedulerserver := scheduleroptions.NewSchedulerServer() - schedulerserver.PolicyConfigFile = options.KubernetesMasterConfig.SchedulerConfigFile - if err := cmdflags.Resolve(options.KubernetesMasterConfig.SchedulerArguments, schedulerserver.AddFlags); len(err) > 0 { - return nil, kerrors.NewAggregate(err) - } - ret.SchedulerControllerConfig = SchedulerControllerConfig{ - PrivilegedClient: kubeExternal, - SchedulerServer: schedulerserver, - } - imageTemplate := variable.NewDefaultImageTemplate() imageTemplate.Format = options.ImageConfig.Format imageTemplate.Latest = options.ImageConfig.Latest diff --git a/pkg/cmd/server/kubernetes/master/controller/core.go b/pkg/cmd/server/kubernetes/master/controller/core.go deleted file mode 100644 index 975daa1b3a8d..000000000000 --- a/pkg/cmd/server/kubernetes/master/controller/core.go +++ /dev/null @@ -1,47 +0,0 @@ -package controller - -import ( - "fmt" - - kv1core "k8s.io/client-go/kubernetes/typed/core/v1" - kclientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - kubecontroller "k8s.io/kubernetes/cmd/kube-controller-manager/app" - kapi "k8s.io/kubernetes/pkg/api" - kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" - scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" - _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" -) - -type SchedulerControllerConfig struct { - // TODO: Move this closer to upstream, we want unprivileged client here. - PrivilegedClient kclientset.Interface - SchedulerServer *scheduleroptions.SchedulerServer -} - -func (c *SchedulerControllerConfig) RunController(ctx kubecontroller.ControllerContext) (bool, error) { - eventcast := record.NewBroadcaster() - recorder := eventcast.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: kapi.DefaultSchedulerName}) - eventcast.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(c.PrivilegedClient.CoreV1().RESTClient()).Events("")}) - - s, err := schedulerapp.CreateScheduler(c.SchedulerServer, - c.PrivilegedClient, - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().PersistentVolumes(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().ReplicationControllers(), - ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(), - ctx.InformerFactory.Apps().V1beta1().StatefulSets(), - ctx.InformerFactory.Core().V1().Services(), - recorder, - ) - if err != nil { - return false, fmt.Errorf("error creating scheduler: %v", err) - } - - go s.Run() - - return true, nil -} diff --git a/pkg/cmd/server/start/start_master.go b/pkg/cmd/server/start/start_master.go index 6674a21ae8e0..d053a2e4ff83 100644 --- a/pkg/cmd/server/start/start_master.go +++ b/pkg/cmd/server/start/start_master.go @@ -34,6 +34,7 @@ import ( kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/master" + schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" kutilerrors "k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/errors" assetapiserver "github.com/openshift/origin/pkg/assets/apiserver" @@ -392,6 +393,13 @@ func (m *Master) Start() error { controllersEnabled := m.controllers && m.config.Controllers != configapi.ControllersDisabled if controllersEnabled { + // start the scheduler + // TODO we need a real identity for this. Right now it's just using the loopback connection like it used to. + scheduler, err := newScheduler(m.config.MasterClients.OpenShiftLoopbackKubeConfig, m.config.KubernetesMasterConfig.SchedulerConfigFile, m.config.KubernetesMasterConfig.SchedulerArguments) + if err != nil { + return err + } + // informers are shared amongst all the various controllers we build informers, err := NewInformers(*m.config) if err != nil { @@ -472,6 +480,10 @@ func (m *Master) Start() error { go func() { controllerPlug.WaitForStart() + // this does a second leader election, but doing the second leader election will allow us to move out process in + // 3.8 if we so choose. + go schedulerapp.Run(scheduler) + controllerContext, err := getControllerContext(*m.config, kubeControllerManagerConfig, cloudProvider, informers, utilwait.NeverStop) if err != nil { glog.Fatal(err) diff --git a/pkg/cmd/server/start/start_scheduler.go b/pkg/cmd/server/start/start_scheduler.go new file mode 100644 index 000000000000..526f971ea508 --- /dev/null +++ b/pkg/cmd/server/start/start_scheduler.go @@ -0,0 +1,26 @@ +package start + +import ( + kerrors "k8s.io/apimachinery/pkg/util/errors" + scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" + + cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" +) + +func newScheduler(kubeconfigFile, schedulerConfigFile string, cmdLineArgs map[string][]string) (*scheduleroptions.SchedulerServer, error) { + if len(cmdLineArgs["kubeconfig"]) == 0 { + cmdLineArgs["kubeconfig"] = []string{kubeconfigFile} + } + if len(cmdLineArgs["policy-config-file"]) == 0 { + cmdLineArgs["policy-config-file"] = []string{schedulerConfigFile} + } + + // resolve arguments + schedulerserver := scheduleroptions.NewSchedulerServer() + if err := cmdflags.Resolve(cmdLineArgs, schedulerserver.AddFlags); len(err) > 0 { + return nil, kerrors.NewAggregate(err) + } + + return schedulerserver, nil +}