From 4cb506346f10db92eaca2bd0c4988111bcac2712 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Fri, 1 Sep 2017 12:05:20 -0600 Subject: [PATCH] UPSTREAM: 49142: Slow-start batch pod creation of rs, rc, ds, jobs --- .../pkg/controller/controller_utils.go | 41 ++++++++++-- .../kubernetes/pkg/controller/daemon/BUILD | 1 + .../pkg/controller/daemon/daemoncontroller.go | 44 +++++++++--- .../daemon/daemoncontroller_test.go | 22 ++++++ .../kubernetes/pkg/controller/job/BUILD | 1 + .../pkg/controller/job/jobcontroller.go | 55 ++++++++++----- .../pkg/controller/job/jobcontroller_test.go | 54 +++++++++------ .../pkg/controller/replicaset/BUILD | 1 + .../pkg/controller/replicaset/replica_set.go | 65 ++++++++++++------ .../controller/replicaset/replica_set_test.go | 27 ++++++++ .../pkg/controller/replication/BUILD | 1 + .../replication/replication_controller.go | 67 +++++++++++++------ .../replication_controller_test.go | 24 +++++++ 13 files changed, 312 insertions(+), 91 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go index 9f1a17767f42..5673632667af 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go @@ -65,6 +65,21 @@ const ( // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s // latency/pod at the scale of 3000 pods over 100 nodes. ExpectationsTimeout = 5 * time.Minute + // When batching pod creates, SlowStartInitialBatchSize is the size of the + // inital batch. The size of each successive batch is twice the size of + // the previous batch. For example, for a value of 1, batch sizes would be + // 1, 2, 4, 8, ... and for a value of 10, batch sizes would be + // 10, 20, 40, 80, ... Setting the value higher means that quota denials + // will result in more doomed API calls and associated event spam. Setting + // the value lower will result in more API call round trip periods for + // large batches. + // + // Given a number of pods to start "N": + // The number of doomed calls per sync once quota is exceeded is given by: + // min(N,SlowStartInitialBatchSize) + // The number of batches is given by: + // 1+floor(log_2(ceil(N/SlowStartInitialBatchSize))) + SlowStartInitialBatchSize = 1 ) var UpdateTaintBackoff = wait.Backoff{ @@ -610,11 +625,13 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime type FakePodControl struct { sync.Mutex - Templates []v1.PodTemplateSpec - ControllerRefs []metav1.OwnerReference - DeletePodName []string - Patches [][]byte - Err error + Templates []v1.PodTemplateSpec + ControllerRefs []metav1.OwnerReference + DeletePodName []string + Patches [][]byte + Err error + CreateLimit int + CreateCallCount int } var _ PodControlInterface = &FakePodControl{} @@ -632,6 +649,10 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { f.Lock() defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } f.Templates = append(f.Templates, *spec) if f.Err != nil { return f.Err @@ -642,6 +663,10 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } f.Templates = append(f.Templates, *spec) f.ControllerRefs = append(f.ControllerRefs, *controllerRef) if f.Err != nil { @@ -653,6 +678,10 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1. func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } f.Templates = append(f.Templates, *template) f.ControllerRefs = append(f.ControllerRefs, *controllerRef) if f.Err != nil { @@ -678,6 +707,8 @@ func (f *FakePodControl) Clear() { f.Templates = []v1.PodTemplateSpec{} f.ControllerRefs = []metav1.OwnerReference{} f.Patches = [][]byte{} + f.CreateLimit = 0 + f.CreateCallCount = 0 } // ByLogging allows custom sorting of pods so the best one can be picked for getting its logs. diff --git a/vendor/k8s.io/kubernetes/pkg/controller/daemon/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/daemon/BUILD index 3a185a253f33..c56d5365985a 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/daemon/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/daemon/BUILD @@ -55,6 +55,7 @@ go_library( "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller.go b/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller.go index 5f0e9e601d71..e971eb57dcda 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller.go @@ -34,6 +34,7 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/integer" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -812,20 +813,43 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} - createWait.Add(createDiff) template := util.CreatePodTemplate(ds.Spec.Template, ds.Spec.TemplateGeneration, hash) - for i := 0; i < createDiff; i++ { - go func(ix int) { - defer createWait.Done() - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil { - glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize + // and double with each successful iteration in a kind of "slow start". + // This handles attempts to start large numbers of pods that would + // likely all fail with the same error. For example a project with a + // low quota that attempts to create a large number of pods will be + // prevented from spamming the API service with the pod create requests + // after one of its pods fails. Conveniently, this also prevents the + // event spam that those failures would generate. + batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize) + for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize { + errorCount := len(errCh) + createWait.Add(batchSize) + for i := pos; i < pos+batchSize; i++ { + go func(ix int) { + defer createWait.Done() + if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil { + glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.CreationObserved(dsKey) + errCh <- err + utilruntime.HandleError(err) + } + }(i) + } + createWait.Wait() + // any skipped pods that we never attempted to start shouldn't be expected. + skippedPods := createDiff - batchSize + if errorCount < len(errCh) && skippedPods > 0 { + glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name) + for i := 0; i < skippedPods; i++ { dsc.expectations.CreationObserved(dsKey) - errCh <- err - utilruntime.HandleError(err) } - }(i) + // The skipped pods will be retried later. The next controller resync will + // retry the slow start process. + break + } } - createWait.Wait() glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff) deleteWait := sync.WaitGroup{} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller_test.go index 486b2ebbe1c2..bb0069686141 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemoncontroller_test.go @@ -406,6 +406,28 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { } } +// Simulate a cluster with 100 nodes, but simulate a limit (like a quota limit) +// of 10 pods, and verify that the ds doesn't make 100 create calls per sync pass +func TestSimpleDaemonSetPodCreateErrors(t *testing.T) { + for _, strategy := range updateStrategies() { + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = *strategy + manager, podControl, _ := newTestController(ds) + podControl.FakePodControl.CreateLimit = 10 + addNodes(manager.nodeStore, 0, podControl.FakePodControl.CreateLimit*10, nil) + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0) + expectedLimit := 0 + for pass := uint8(0); expectedLimit <= podControl.FakePodControl.CreateLimit; pass++ { + expectedLimit += controller.SlowStartInitialBatchSize << pass + } + if podControl.FakePodControl.CreateCallCount > expectedLimit { + t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount) + } + + } +} + func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { for _, strategy := range updateStrategies() { ds := newDaemonSet("foo") diff --git a/vendor/k8s.io/kubernetes/pkg/controller/job/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/job/BUILD index e37de5221bac..0e18312e223c 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/job/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/job/BUILD @@ -37,6 +37,7 @@ go_library( "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller.go b/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller.go index 6c0bd30fcfaf..97678b82f054 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller.go @@ -32,6 +32,7 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/integer" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -590,7 +591,6 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b }(i) } wait.Wait() - } else if active < parallelism { wantActive := int32(0) if job.Spec.Completions == nil { @@ -621,23 +621,48 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b active += diff wait := sync.WaitGroup{} - wait.Add(int(diff)) - for i := int32(0); i < diff; i++ { - go func() { - defer wait.Done() - if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { - defer utilruntime.HandleError(err) - // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize + // and double with each successful iteration in a kind of "slow start". + // This handles attempts to start large numbers of pods that would + // likely all fail with the same error. For example a project with a + // low quota that attempts to create a large number of pods will be + // prevented from spamming the API service with the pod create requests + // after one of its pods fails. Conveniently, this also prevents the + // event spam that those failures would generate. + for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) { + errorCount := len(errCh) + wait.Add(int(batchSize)) + for i := int32(0); i < batchSize; i++ { + go func() { + defer wait.Done() + if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { + defer utilruntime.HandleError(err) + // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) + jm.expectations.CreationObserved(jobKey) + activeLock.Lock() + active-- + activeLock.Unlock() + errCh <- err + } + }() + } + wait.Wait() + // any skipped pods that we never attempted to start shouldn't be expected. + skippedPods := diff - batchSize + if errorCount < len(errCh) && skippedPods > 0 { + glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name) + active -= skippedPods + for i := int32(0); i < skippedPods; i++ { jm.expectations.CreationObserved(jobKey) - activeLock.Lock() - active-- - activeLock.Unlock() - errCh <- err + // Decrement the expected number of creates because the informer won't observe this pod } - }() + // The skipped pods will be retried later. The next controller resync will + // retry the slow start process. + break + } + diff -= batchSize } - wait.Wait() } select { diff --git a/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller_test.go index 206d3816b147..f8bad0996917 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/job/jobcontroller_test.go @@ -123,6 +123,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism int32 completions int32 deleting bool + podLimit int // pod setup podControllerError error @@ -140,102 +141,107 @@ func TestControllerSyncJob(t *testing.T) { expectedComplete bool }{ "job start": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 0, 0, 0, 2, 0, 2, 0, 0, false, }, "WQ job start": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 0, 0, 0, 2, 0, 2, 0, 0, false, }, "pending pods": { - 2, 5, false, + 2, 5, false, 0, nil, 2, 0, 0, 0, 0, 0, 2, 0, 0, false, }, "correct # of pods": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 2, 0, 0, 0, 0, 2, 0, 0, false, }, "WQ job: correct # of pods": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 2, 0, 0, 0, 0, 2, 0, 0, false, }, "too few active pods": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 1, 1, 0, 1, 0, 2, 1, 0, false, }, "too few active pods with a dynamic job": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 1, 0, 0, 1, 0, 2, 0, 0, false, }, "too few active pods, with controller error": { - 2, 5, false, + 2, 5, false, 0, fmt.Errorf("Fake error"), 0, 1, 1, 0, 1, 0, 1, 1, 0, false, }, "too many active pods": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 3, 0, 0, 0, 1, 2, 0, 0, false, }, "too many active pods, with controller error": { - 2, 5, false, + 2, 5, false, 0, fmt.Errorf("Fake error"), 0, 3, 0, 0, 0, 1, 3, 0, 0, false, }, "failed pod": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 1, 1, 1, 1, 0, 2, 1, 1, false, }, "job finish": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 0, 5, 0, 0, 0, 0, 5, 0, true, }, "WQ job finishing": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 1, 1, 0, 0, 0, 1, 1, 0, false, }, "WQ job all finished": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 0, 2, 0, 0, 0, 0, 2, 0, true, }, "WQ job all finished despite one failure": { - 2, -1, false, + 2, -1, false, 0, nil, 0, 0, 1, 1, 0, 0, 0, 1, 1, true, }, "more active pods than completions": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 10, 0, 0, 0, 8, 2, 0, 0, false, }, "status change": { - 2, 5, false, + 2, 5, false, 0, nil, 0, 2, 2, 0, 0, 0, 2, 2, 0, false, }, "deleting job": { - 2, 5, true, + 2, 5, true, 0, nil, 1, 1, 1, 0, 0, 0, 2, 1, 0, false, }, + "limited pods": { + 100, 200, false, 10, + nil, 0, 0, 0, 0, + 10, 0, 10, 0, 0, false, + }, } for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{Err: tc.podControllerError} + fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -275,7 +281,7 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("%s: Syncing jobs would return error when podController exception", name) } } else { - if err != nil { + if err != nil && (tc.podLimit == 0 || fakePodControl.CreateCallCount < tc.podLimit) { t.Errorf("%s: unexpected error when syncing jobs %v", name, err) } } @@ -326,6 +332,14 @@ func TestControllerSyncJob(t *testing.T) { if tc.expectedComplete && !getCondition(actual, batch.JobComplete) { t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions) } + // validate slow start + expectedLimit := 0 + for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ { + expectedLimit += controller.SlowStartInitialBatchSize << pass + } + if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit { + t.Errorf("%s: Unexpected number of create calls. Expected <= %d, saw %d\n", name, expectedLimit, fakePodControl.CreateCallCount) + } } } diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/BUILD index 8e901e87a1ae..3e78babc6704 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/BUILD @@ -39,6 +39,7 @@ go_library( "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go index 068f9acd4e95..f38ac42752b3 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go @@ -35,6 +35,7 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/integer" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -451,31 +452,55 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) var wg sync.WaitGroup - wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) - for i := 0; i < diff; i++ { - go func() { - defer wg.Done() - var err error - boolPtr := func(b bool) *bool { return &b } - controllerRef := &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: rs.Name, - UID: rs.UID, - BlockOwnerDeletion: boolPtr(true), - Controller: boolPtr(true), - } - err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) - if err != nil { + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize + // and double with each successful iteration in a kind of "slow start". + // This handles attempts to start large numbers of pods that would + // likely all fail with the same error. For example a project with a + // low quota that attempts to create a large number of pods will be + // prevented from spamming the API service with the pod create requests + // after one of its pods fails. Conveniently, this also prevents the + // event spam that those failures would generate. + for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) { + errorCount := len(errCh) + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + go func() { + defer wg.Done() + var err error + boolPtr := func(b bool) *bool { return &b } + controllerRef := &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: rs.Name, + UID: rs.UID, + BlockOwnerDeletion: boolPtr(true), + Controller: boolPtr(true), + } + err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) + if err != nil { + // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) + rsc.expectations.CreationObserved(rsKey) + errCh <- err + } + }() + } + wg.Wait() + // any skipped pods that we never attempted to start shouldn't be expected. + skippedPods := diff - batchSize + if errorCount < len(errCh) && skippedPods > 0 { + glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %q/%q", skippedPods, rs.Namespace, rs.Name) + for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.CreationObserved(rsKey) - errCh <- err } - }() + // The skipped pods will be retried later. The next controller resync will + // retry the slow start process. + break + } + diff -= batchSize } - wg.Wait() } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set_test.go b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set_test.go index ec074b4e5319..c066df5c8e5c 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/replicaset/replica_set_test.go @@ -324,6 +324,33 @@ func TestSyncReplicaSetCreates(t *testing.T) { validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0) } +// Tell the rs to create 100 replicas, but simulate a limit (like a quota limit) +// of 10, and verify that the rs doesn't make 100 create calls per sync pass +func TestSyncReplicaSetCreateFailures(t *testing.T) { + fakePodControl := controller.FakePodControl{} + fakePodControl.CreateLimit = 10 + + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(fakePodControl.CreateLimit*10, labelMap) + client := fake.NewSimpleClientset(rs) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) + + informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) + + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0) + expectedLimit := 0 + for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { + expectedLimit += controller.SlowStartInitialBatchSize << pass + } + if fakePodControl.CreateCallCount > expectedLimit { + t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) + } +} + func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state fakeHandler := utiltesting.FakeHandler{ diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replication/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/replication/BUILD index a595021d8c4b..a4661805b614 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replication/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/replication/BUILD @@ -37,6 +37,7 @@ go_library( "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller.go index 266fd66b295b..8c76d5129f44 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller.go @@ -36,6 +36,7 @@ import ( clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/integer" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -447,32 +448,56 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup - wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) - for i := 0; i < diff; i++ { - go func() { - defer wg.Done() - var err error - boolPtr := func(b bool) *bool { return &b } - controllerRef := &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: rc.Name, - UID: rc.UID, - BlockOwnerDeletion: boolPtr(true), - Controller: boolPtr(true), - } - err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) - if err != nil { + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize + // and double with each successful iteration in a kind of "slow start". + // This handles attempts to start large numbers of pods that would + // likely all fail with the same error. For example a project with a + // low quota that attempts to create a large number of pods will be + // prevented from spamming the API service with the pod create requests + // after one of its pods fails. Conveniently, this also prevents the + // event spam that those failures would generate. + for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) { + errorCount := len(errCh) + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + go func() { + defer wg.Done() + var err error + boolPtr := func(b bool) *bool { return &b } + controllerRef := &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: rc.Name, + UID: rc.UID, + BlockOwnerDeletion: boolPtr(true), + Controller: boolPtr(true), + } + err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) + if err != nil { + // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) + rm.expectations.CreationObserved(rcKey) + errCh <- err + utilruntime.HandleError(err) + } + }() + } + wg.Wait() + // any skipped pods that we never attempted to start shouldn't be expected. + skippedPods := diff - batchSize + if errorCount < len(errCh) && skippedPods > 0 { + glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for controller %q/%q", skippedPods, rc.Namespace, rc.Name) + for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) - errCh <- err - utilruntime.HandleError(err) } - }() + // The skipped pods will be retried later. The next controller resync will + // retry the slow start process. + break + } + diff -= batchSize } - wg.Wait() select { case err := <-errCh: diff --git a/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller_test.go index b1f2699396d3..da412978b198 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/replication/replication_controller_test.go @@ -284,6 +284,30 @@ func TestSyncReplicationControllerCreates(t *testing.T) { validateSyncReplication(t, &fakePodControl, 2, 0, 0) } +// Tell the controller to create 100 replicas, but simulate a limit (like a quota limit) +// of 10, and verify that the controller doesn't make 100 create calls per sync pass +func TestSyncReplicationControllerCreateFailures(t *testing.T) { + fakePodControl := controller.FakePodControl{} + fakePodControl.CreateLimit = 10 + + rc := newReplicationController(fakePodControl.CreateLimit * 10) + c := fake.NewSimpleClientset(rc) + manager, _ /*podInformer*/, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) + + rcInformer.Informer().GetIndexer().Add(rc) + + manager.podControl = &fakePodControl + manager.syncReplicationController(getKey(rc, t)) + validateSyncReplication(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0) + expectedLimit := 0 + for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { + expectedLimit += controller.SlowStartInitialBatchSize << pass + } + if fakePodControl.CreateCallCount > expectedLimit { + t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) + } +} + func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Setup a fake server to listen for requests, and run the rc manager in steady state fakeHandler := utiltesting.FakeHandler{