diff --git a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go index 69466bbad957..2338f90abf9c 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go @@ -49,6 +49,7 @@ func startEndpointController(ctx ControllerContext) (bool, error) { go endpointcontroller.NewEndpointController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Services(), + ctx.InformerFactory.Core().V1().Endpoints(), ctx.ClientBuilder.ClientOrDie("endpoint-controller"), ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) return true, nil diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD index df5f0b2db06a..e432af34fe81 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD @@ -16,6 +16,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", @@ -52,6 +53,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/testing", diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go index 64767014d303..446dd90ffb70 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -43,11 +44,6 @@ import ( ) const ( - // We'll attempt to recompute EVERY service's endpoints at least this - // often. Higher numbers = lower CPU/network load; lower numbers = - // shorter amount of time before a mistaken endpoint is corrected. - FullServiceResyncPeriod = 30 * time.Second - // An annotation on the Service denoting if the endpoints controller should // go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS @@ -66,26 +62,24 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController { +func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, + endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController { if client != nil && client.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter()) } e := &EndpointController{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + workerLoopPeriod: time.Second, } - serviceInformer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - AddFunc: e.enqueueService, - UpdateFunc: func(old, cur interface{}) { - e.enqueueService(cur) - }, - DeleteFunc: e.enqueueService, + serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: e.enqueueService, + UpdateFunc: func(old, cur interface{}) { + e.enqueueService(cur) }, - // TODO: Can we have much longer period here? - FullServiceResyncPeriod, - ) + DeleteFunc: e.enqueueService, + }) e.serviceLister = serviceInformer.Lister() e.servicesSynced = serviceInformer.Informer().HasSynced @@ -97,6 +91,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced + e.endpointsLister = endpointsInformer.Lister() + e.endpointsSynced = endpointsInformer.Informer().HasSynced + return e } @@ -118,12 +115,22 @@ type EndpointController struct { // Added as a member to the struct to allow injection for testing. podsSynced cache.InformerSynced + // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to + // NewEndpointController. + endpointsLister corelisters.EndpointsLister + // endpointsSynced returns true if the endpoints shared informer has been synced at least once. + // Added as a member to the struct to allow injection for testing. + endpointsSynced cache.InformerSynced + // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. queue workqueue.RateLimitingInterface + + // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes. + workerLoopPeriod time.Duration } // Runs e; will not return until stopCh is closed. workers determines how many @@ -132,13 +139,13 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer e.queue.ShutDown() - if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) { + if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return } for i := 0; i < workers; i++ { - go wait.Until(e.worker, time.Second, stopCh) + go wait.Until(e.worker, e.workerLoopPeriod, stopCh) } go func() { defer utilruntime.HandleCrash() @@ -313,14 +320,19 @@ func (e *EndpointController) deletePod(obj interface{}) { e.addPod(obj) return } - podKey, err := keyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err)) + // If we reached here it means the pod was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } - glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod) - - // TODO: keep a map of pods to services to handle this condition. + pod, ok := tombstone.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj)) + return + } + glog.V(4).Infof("Enqueuing services of deleted pod %s having final state unrecorded", pod.Name) + e.addPod(pod) } // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. @@ -472,7 +484,7 @@ func (e *EndpointController) syncService(key string) error { subsets = endpoints.RepackSubsets(subsets) // See if there's actually an update here. - currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{}) + currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) if err != nil { if errors.IsNotFound(err) { currentEndpoints = &v1.Endpoints{ @@ -491,7 +503,11 @@ func (e *EndpointController) syncService(key string) error { glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } - newEndpoints := currentEndpoints + copy, err := api.Scheme.DeepCopy(currentEndpoints) + if err != nil { + return err + } + newEndpoints := copy.(*v1.Endpoints) newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels if newEndpoints.Annotations == nil { @@ -527,13 +543,12 @@ func (e *EndpointController) syncService(key string) error { // some stragglers could have been left behind if the endpoint controller // reboots). func (e *EndpointController) checkLeftoverEndpoints() { - list, err := e.client.Core().Endpoints(metav1.NamespaceAll).List(metav1.ListOptions{}) + list, err := e.endpointsLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } - for i := range list.Items { - ep := &list.Items[i] + for _, ep := range list { key, err := keyFunc(ep) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep)) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go index 811de6f7c43d..a4f83a596c77 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go @@ -21,11 +21,13 @@ import ( "net/http" "net/http/httptest" "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" @@ -39,6 +41,7 @@ import ( ) var alwaysReady = func() bool { return true } +var neverReady = func() bool { return false } var emptyNodeName string func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { @@ -79,10 +82,10 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) { +func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) { fakeEndpointsHandler := utiltesting.FakeHandler{ - StatusCode: endpointsResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)), + StatusCode: http.StatusOK, + ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}), } mux := http.NewServeMux() mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) @@ -96,39 +99,43 @@ func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResp type endpointController struct { *EndpointController - podStore cache.Store - serviceStore cache.Store + podStore cache.Store + serviceStore cache.Store + endpointsStore cache.Store } func newController(url string) *endpointController { client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client) + endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), + informerFactory.Core().V1().Endpoints(), client) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady return &endpointController{ endpoints, informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), + informerFactory.Core().V1().Endpoints().Informer().GetStore(), } } func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, @@ -139,29 +146,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { func TestCheckLeftoverEndpoints(t *testing.T) { ns := metav1.NamespaceDefault - // Note that this requests *all* endpoints, therefore metav1.NamespaceAll - // below. - testServer, _ := makeTestServer(t, metav1.NamespaceAll, - serverResponse{http.StatusOK, &v1.EndpointsList{ - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, - Items: []v1.Endpoints{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}, - }}) + testServer, _ := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.checkLeftoverEndpoints() - if e, a := 1, endpoints.queue.Len(); e != a { t.Fatalf("Expected %v, got %v", e, a) } @@ -173,21 +172,20 @@ func TestCheckLeftoverEndpoints(t *testing.T) { func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) - + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -197,7 +195,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -214,20 +213,20 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -237,7 +236,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -254,17 +254,17 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -274,6 +274,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -290,17 +291,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 0, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -310,6 +311,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -326,17 +328,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -346,6 +348,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -363,20 +366,20 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -386,6 +389,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -402,20 +406,20 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, metav1.NamespaceDefault, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "1", - Name: "foo", - Namespace: ns, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + Name: "foo", + Namespace: ns, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, @@ -425,13 +429,12 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", metav1.NamespaceDefault, "foo"), "GET", nil) + endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItems(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -447,6 +450,7 @@ func TestSyncEndpointsItems(t *testing.T) { }, }) endpoints.syncService("other/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -461,18 +465,17 @@ func TestSyncEndpointsItems(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -492,6 +495,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -506,34 +510,34 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ @@ -548,6 +552,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -716,3 +721,57 @@ func TestDetermineNeededServiceUpdates(t *testing.T) { } } } + +func TestWaitsForAllInformersToBeSynced2(t *testing.T) { + var tests = []struct { + podsSynced func() bool + servicesSynced func() bool + endpointsSynced func() bool + shouldUpdateEndpoints bool + }{ + {neverReady, alwaysReady, alwaysReady, false}, + {alwaysReady, neverReady, alwaysReady, false}, + {alwaysReady, alwaysReady, neverReady, false}, + {alwaysReady, alwaysReady, alwaysReady, true}, + } + + for _, test := range tests { + func() { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + }, + } + endpoints.serviceStore.Add(service) + endpoints.enqueueService(service) + endpoints.podsSynced = test.podsSynced + endpoints.servicesSynced = test.servicesSynced + endpoints.endpointsSynced = test.endpointsSynced + endpoints.workerLoopPeriod = 10 * time.Millisecond + stopCh := make(chan struct{}) + defer close(stopCh) + go endpoints.Run(1, stopCh) + + // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. + // To ensure we get all updates, including unexpected ones, we need to wait at least as long as + // a single cache sync period and worker period, with some fudge room. + time.Sleep(150 * time.Millisecond) + if test.shouldUpdateEndpoints { + // Ensure the work queue has been processed by looping for up to a second to prevent flakes. + wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { + return endpoints.queue.Len() == 0, nil + }) + endpointsHandler.ValidateRequestCount(t, 1) + } else { + endpointsHandler.ValidateRequestCount(t, 0) + } + }() + } +}