diff --git a/pkg/cmd/server/bootstrappolicy/controller_policy.go b/pkg/cmd/server/bootstrappolicy/controller_policy.go index 16809db23f3c..11229c98317d 100644 --- a/pkg/cmd/server/bootstrappolicy/controller_policy.go +++ b/pkg/cmd/server/bootstrappolicy/controller_policy.go @@ -297,7 +297,7 @@ func init() { }, }) - // ingress-secretref-controller + // ingress-to-route-controller addControllerRole(rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraIngressToRouteControllerServiceAccountName}, Rules: []rbac.PolicyRule{ diff --git a/pkg/route/controller/ingress/ingress.go b/pkg/route/controller/ingress/ingress.go index 693dac16709b..13e045a9835d 100644 --- a/pkg/route/controller/ingress/ingress.go +++ b/pkg/route/controller/ingress/ingress.go @@ -2,6 +2,7 @@ package ingress import ( "fmt" + "sync" "time" "github.com/golang/glog" @@ -15,7 +16,9 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/json" + utilrand "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" @@ -46,9 +49,16 @@ import ( // 2. For every TLS hostname that has a corresponding path rule and points to a secret // that exists, a route should exist with a valid TLS config from that secret. // 3. For every service referenced by the ingress path rule, the route should have -// an update to date target port based on the service. -// 4. A route owned by an ingress that no longer satisfies the first three invariants -// should be deleted. +// a target port based on the service. +// 4. A route owned by an ingress that is not described by any of the three invariants +// above should be deleted. +// +// The controller also relies on the use of expectations to remind itself whether there +// are route creations it has not yet observed, which prevents the controller from +// creating more objects than it needs. The expectations are reset when the ingress +// object is modified. It is possible that expectations could leak if an ingress is +// deleted and its deletion is not observed by the cache, but such leaks are only expected +// if there is a bug in the informer cache which must be fixed anyway. // // Unsupported attributes: // @@ -73,6 +83,73 @@ type Controller struct { // queue is the list of namespace keys that must be synced. queue workqueue.RateLimitingInterface + + // expectations track upcoming route creations that we have not yet observed + expectations *expectations + // expectationDelay controls how long the controller waits to observe its + // own creates. Exposed only for testing. + expectationDelay time.Duration +} + +// expectations track an upcoming change to a named resource related +// to an ingress. This is a thread safe object but callers assume +// responsibility for ensuring expectations do not leak. +type expectations struct { + lock sync.Mutex + expect map[queueKey]sets.String +} + +// newExpectations returns a tracking object for upcoming events +// that the controller may expect to happen. +func newExpectations() *expectations { + return &expectations{ + expect: make(map[queueKey]sets.String), + } +} + +// Expect that an event will happen in the future for the given ingress +// and a named resource related to that ingress. +func (e *expectations) Expect(namespace, ingressName, name string) { + e.lock.Lock() + defer e.lock.Unlock() + key := queueKey{namespace: namespace, name: ingressName} + set, ok := e.expect[key] + if !ok { + set = sets.NewString() + e.expect[key] = set + } + set.Insert(name) +} + +// Satisfied clears the expectation for the given resource name on an +// ingress. +func (e *expectations) Satisfied(namespace, ingressName, name string) { + e.lock.Lock() + defer e.lock.Unlock() + key := queueKey{namespace: namespace, name: ingressName} + set := e.expect[key] + set.Delete(name) + if set.Len() == 0 { + delete(e.expect, key) + } +} + +// Expecting returns true if the provided ingress is still waiting to +// see changes. +func (e *expectations) Expecting(namespace, ingressName string) bool { + e.lock.Lock() + defer e.lock.Unlock() + key := queueKey{namespace: namespace, name: ingressName} + return e.expect[key].Len() > 0 +} + +// Clear indicates that all expectations for the given ingress should +// be cleared. +func (e *expectations) Clear(namespace, ingressName string) { + e.lock.Lock() + defer e.lock.Unlock() + key := queueKey{namespace: namespace, name: ingressName} + delete(e.expect, key) } type queueKey struct { @@ -92,6 +169,9 @@ func NewController(eventsClient kv1core.EventsGetter, client routeclient.RoutesG eventRecorder: recorder, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress-to-route"), + expectations: newExpectations(), + expectationDelay: 2 * time.Second, + client: client, ingressLister: ingresses.Lister(), @@ -186,6 +266,7 @@ func (c *Controller) processRoute(obj interface{}) { if !ok { return } + c.expectations.Satisfied(t.Namespace, ingressName, t.Name) c.queue.Add(queueKey{namespace: t.Namespace, name: ingressName}) default: utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj)) @@ -195,6 +276,10 @@ func (c *Controller) processRoute(obj interface{}) { func (c *Controller) processIngress(obj interface{}) { switch t := obj.(type) { case *extensionsv1beta1.Ingress: + // when we see a change to an ingress, reset our expectations + // this also allows periodic purging of the expectation list in the event + // we miss one or more events. + c.expectations.Clear(t.Namespace, t.Name) c.queue.Add(queueKey{namespace: t.Namespace, name: t.Name}) default: utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj)) @@ -234,8 +319,10 @@ func (c *Controller) processNext() bool { } defer c.queue.Done(key) + glog.V(5).Infof("processing %v begin", key) err := c.sync(key.(queueKey)) c.handleNamespaceErr(err, key) + glog.V(5).Infof("processing %v end", key) return true } @@ -262,6 +349,12 @@ func (c *Controller) sync(key queueKey) error { } return nil } + // if we are waiting to observe the result of route creations, simply delay + if c.expectations.Expecting(key.namespace, key.name) { + c.queue.AddAfter(key, c.expectationDelay) + glog.V(5).Infof("Ingress %s/%s has unsatisfied expectations", key.namespace, key.name) + return nil + } ingress, err := c.ingressLister.Ingresses(key.namespace).Get(key.name) if errors.IsNotFound(err) { @@ -328,7 +421,7 @@ func (c *Controller) sync(key queueKey) error { // add the new routes for _, route := range creates { - if _, err := c.client.Routes(route.Namespace).Create(route); err != nil { + if err := createRouteWithName(c.client, ingress, route, c.expectations); err != nil { errs = append(errs, err) } } @@ -546,3 +639,55 @@ func referencesSecret(ingress *extensionsv1beta1.Ingress, host string) (string, } return "", false } + +// createRouteWithName performs client side name generation so we can set a predictable expectation. +// If we fail multiple times in a row we will return an error. +// TODO: future optimization, check the local cache for the name first +func createRouteWithName(client routeclient.RoutesGetter, ingress *extensionsv1beta1.Ingress, route *routev1.Route, expect *expectations) error { + base := route.GenerateName + var lastErr error + // only retry a limited number of times + for i := 0; i < 3; i++ { + if len(base) > 0 { + route.GenerateName = "" + route.Name = generateRouteName(base) + } + + // Set the expectation before we talk to the server in order to + // prevent racing with the route cache. + expect.Expect(ingress.Namespace, ingress.Name, route.Name) + + _, err := client.Routes(route.Namespace).Create(route) + if err == nil { + return nil + } + + // We either collided with another randomly generated name, or another + // error between us and the server prevented observing the success + // of the result. In either case we are not expecting a new route. This + // is safe because expectations are an optimization to avoid churn rather + // than to prevent true duplicate creation. + expect.Satisfied(ingress.Namespace, ingress.Name, route.Name) + + // if we aren't generating names (or if we got any other type of error) + // return right away + if len(base) == 0 || !errors.IsAlreadyExists(err) { + return err + } + lastErr = err + } + return lastErr +} + +const ( + maxNameLength = 63 + randomLength = 5 + maxGeneratedNameLength = maxNameLength - randomLength +) + +func generateRouteName(base string) string { + if len(base) > maxGeneratedNameLength { + base = base[:maxGeneratedNameLength] + } + return fmt.Sprintf("%s%s", base, utilrand.String(randomLength)) +} diff --git a/pkg/route/controller/ingress/ingress_test.go b/pkg/route/controller/ingress/ingress_test.go index 8ec7cbf19383..50686350dc43 100644 --- a/pkg/route/controller/ingress/ingress_test.go +++ b/pkg/route/controller/ingress/ingress_test.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/scheme" corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" clientgotesting "k8s.io/client-go/testing" @@ -143,6 +144,206 @@ func (r *nsSecretLister) Get(name string) (*v1.Secret, error) { return nil, errors.NewNotFound(schema.GroupResource{}, name) } +const complexIngress = ` +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + name: test-1 + namespace: test +spec: + rules: + - host: 1.ingress-test.com + http: + paths: + - path: /test + backend: + serviceName: ingress-endpoint-1 + servicePort: 80 + - path: /other + backend: + serviceName: ingress-endpoint-2 + servicePort: 80 + - host: 2.ingress-test.com + http: + paths: + - path: / + backend: + serviceName: ingress-endpoint-1 + servicePort: 80 + - host: 3.ingress-test.com + http: + paths: + - path: / + backend: + serviceName: ingress-endpoint-1 + servicePort: 80 +` + +func TestController_stabilizeAfterCreate(t *testing.T) { + obj, _, err := scheme.Codecs.UniversalDeserializer().Decode([]byte(complexIngress), nil, nil) + if err != nil { + t.Fatal(err) + } + ingress := obj.(*extensionsv1beta1.Ingress) + + i := &ingressLister{ + Items: []*extensionsv1beta1.Ingress{ + ingress, + }, + } + r := &routeLister{} + s := &secretLister{} + svc := &serviceLister{Items: []*v1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ingress-endpoint-1", + Namespace: "test", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ingress-endpoint-2", + Namespace: "test", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "80-tcp", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + }, + }} + + var names []string + kc := &fake.Clientset{} + kc.AddReactor("*", "routes", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + switch a := action.(type) { + case clientgotesting.CreateAction: + obj := a.GetObject().DeepCopyObject() + m := obj.(metav1.Object) + if len(m.GetName()) == 0 { + m.SetName(m.GetGenerateName()) + } + names = append(names, m.GetName()) + return true, obj, nil + } + return true, nil, nil + }) + + c := &Controller{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress-to-route-test"), + client: kc.Route(), + ingressLister: i, + routeLister: r, + secretLister: s, + serviceLister: svc, + expectations: newExpectations(), + } + defer c.queue.ShutDown() + + // load the ingresses for the namespace + if err := c.sync(queueKey{namespace: "test"}); err != nil { + t.Errorf("Controller.sync() error = %v", err) + } + if c.queue.Len() != 1 { + t.Fatalf("Controller.sync() unexpected queue: %#v", c.queue.Len()) + } + actions := kc.Actions() + if len(actions) != 0 { + t.Fatalf("Controller.sync() unexpected actions: %#v", actions) + } + + // process the ingress + key, _ := c.queue.Get() + expectKey := queueKey{namespace: ingress.Namespace, name: ingress.Name} + if key.(queueKey) != expectKey { + t.Fatalf("incorrect key: %v", key) + } + if err := c.sync(key.(queueKey)); err != nil { + t.Fatalf("Controller.sync() error = %v", err) + } + c.queue.Done(key) + if c.queue.Len() != 0 { + t.Fatalf("Controller.sync() unexpected queue: %#v", c.queue.Len()) + } + actions = kc.Actions() + if len(actions) == 0 { + t.Fatalf("Controller.sync() unexpected actions: %#v", actions) + } + if !c.expectations.Expecting("test", "test-1") { + t.Fatalf("Controller.sync() should be holding an expectation: %#v", c.expectations.expect) + } + + for _, action := range actions { + switch action.GetVerb() { + case "create": + switch o := action.(clientgotesting.CreateAction).GetObject().(type) { + case *routev1.Route: + r.Items = append(r.Items, o) + c.processRoute(o) + default: + t.Fatalf("Unexpected create: %T", o) + } + default: + t.Fatalf("Unexpected action: %#v", action) + } + } + if c.queue.Len() != 1 { + t.Fatalf("Controller.sync() unexpected queue: %#v", c.queue.Len()) + } + if c.expectations.Expecting("test", "test-1") { + t.Fatalf("Controller.sync() should have cleared all expectations: %#v", c.expectations.expect) + } + c.expectations.Expect("test", "test-1", names[0]) + + // waiting for a single expected route, will do nothing + key, _ = c.queue.Get() + if err := c.sync(key.(queueKey)); err != nil { + t.Errorf("Controller.sync() error = %v", err) + } + c.queue.Done(key) + actions = kc.Actions() + if len(actions) == 0 { + t.Fatalf("Controller.sync() unexpected actions: %#v", actions) + } + if c.queue.Len() != 1 { + t.Fatalf("Controller.sync() unexpected queue: %#v", c.queue.Len()) + } + c.expectations.Satisfied("test", "test-1", names[0]) + + // steady state, nothing has changed + key, _ = c.queue.Get() + if err := c.sync(key.(queueKey)); err != nil { + t.Errorf("Controller.sync() error = %v", err) + } + c.queue.Done(key) + actions = kc.Actions() + if len(actions) == 0 { + t.Fatalf("Controller.sync() unexpected actions: %#v", actions) + } + if c.queue.Len() != 0 { + t.Fatalf("Controller.sync() unexpected queue: %#v", c.queue.Len()) + } +} + +func newTestExpectations(fn func(*expectations)) *expectations { + e := newExpectations() + fn(e) + return e +} + func TestController_sync(t *testing.T) { services := &serviceLister{Items: []*v1.Service{ { @@ -240,14 +441,17 @@ func TestController_sync(t *testing.T) { svc corelisters.ServiceLister } tests := []struct { - name string - fields fields - args queueKey - wantErr bool - wantCreates []*routev1.Route - wantPatches []clientgotesting.PatchActionImpl - wantDeletes []clientgotesting.DeleteActionImpl - wantQueue []queueKey + name string + fields fields + args queueKey + expects *expectations + wantErr bool + wantCreates []*routev1.Route + wantPatches []clientgotesting.PatchActionImpl + wantDeletes []clientgotesting.DeleteActionImpl + wantQueue []queueKey + wantExpectation *expectations + wantExpects []queueKey }{ { name: "no changes", @@ -415,11 +619,12 @@ func TestController_sync(t *testing.T) { }}, r: &routeLister{}, }, - args: queueKey{namespace: "test", name: "1"}, + args: queueKey{namespace: "test", name: "1"}, + wantExpects: []queueKey{{namespace: "test", name: "1"}}, wantCreates: []*routev1.Route{ { ObjectMeta: metav1.ObjectMeta{ - GenerateName: "1-", + Name: "", Namespace: "test", OwnerReferences: []metav1.OwnerReference{{APIVersion: "extensions/v1beta1", Kind: "Ingress", Name: "1", Controller: &boolTrue}}, }, @@ -436,7 +641,7 @@ func TestController_sync(t *testing.T) { }, { ObjectMeta: metav1.ObjectMeta{ - GenerateName: "1-", + Name: "", Namespace: "test", OwnerReferences: []metav1.OwnerReference{{APIVersion: "extensions/v1beta1", Kind: "Ingress", Name: "1", Controller: &boolTrue}}, }, @@ -453,6 +658,54 @@ func TestController_sync(t *testing.T) { }, }, }, + { + name: "create route - blocked by expectation", + fields: fields{ + i: &ingressLister{Items: []*extensionsv1beta1.Ingress{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "1", + Namespace: "test", + }, + Spec: extensionsv1beta1.IngressSpec{ + Rules: []extensionsv1beta1.IngressRule{ + { + Host: "test.com", + IngressRuleValue: extensionsv1beta1.IngressRuleValue{ + HTTP: &extensionsv1beta1.HTTPIngressRuleValue{ + Paths: []extensionsv1beta1.HTTPIngressPath{ + { + Path: "/deep", Backend: extensionsv1beta1.IngressBackend{ + ServiceName: "service-1", + ServicePort: intstr.FromString("http"), + }, + }, + { + Path: "/", Backend: extensionsv1beta1.IngressBackend{ + ServiceName: "service-1", + ServicePort: intstr.FromString("http"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }}, + r: &routeLister{}, + }, + expects: newTestExpectations(func(e *expectations) { + e.Expect("test", "1", "route-test-1") + }), + args: queueKey{namespace: "test", name: "1"}, + wantQueue: []queueKey{{namespace: "test", name: "1"}}, + // preserves the expectations unchanged + wantExpectation: newTestExpectations(func(e *expectations) { + e.Expect("test", "1", "route-test-1") + }), + }, { name: "update route", fields: fields{ @@ -1244,8 +1497,19 @@ func TestController_sync(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + var names []string kc := &fake.Clientset{} kc.AddReactor("*", "routes", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + switch a := action.(type) { + case clientgotesting.CreateAction: + obj := a.GetObject().DeepCopyObject() + m := obj.(metav1.Object) + if len(m.GetName()) == 0 { + m.SetName(m.GetGenerateName()) + } + names = append(names, m.GetName()) + return true, obj, nil + } return true, nil, nil }) @@ -1256,8 +1520,12 @@ func TestController_sync(t *testing.T) { routeLister: tt.fields.r, secretLister: tt.fields.s, serviceLister: tt.fields.svc, + expectations: tt.expects, } // default these + if c.expectations == nil { + c.expectations = newExpectations() + } if c.secretLister == nil { c.secretLister = secrets } @@ -1282,6 +1550,20 @@ func TestController_sync(t *testing.T) { t.Errorf("unexpected queue: %s", diff.ObjectReflectDiff(tt.wantQueue, hasQueue)) } + wants := tt.wantExpectation + if wants == nil { + wants = newTestExpectations(func(e *expectations) { + for _, key := range tt.wantExpects { + for _, routeName := range names { + e.Expect(key.namespace, key.name, routeName) + } + } + }) + } + if !reflect.DeepEqual(wants, c.expectations) { + t.Errorf("unexpected expectations: %s", diff.ObjectReflectDiff(wants.expect, c.expectations.expect)) + } + actions := kc.Actions() for i := range tt.wantCreates { @@ -1295,8 +1577,13 @@ func TestController_sync(t *testing.T) { if action.GetNamespace() != tt.args.namespace { t.Errorf("unexpected action[%d]: %#v", i, action) } - if !reflect.DeepEqual(tt.wantCreates[i], action.GetObject()) { - t.Errorf("unexpected create: %s", diff.ObjectReflectDiff(tt.wantCreates[i], action.GetObject())) + obj := action.GetObject() + if tt.wantCreates[i].Name == "" { + tt.wantCreates[i].Name = names[0] + names = names[1:] + } + if !reflect.DeepEqual(tt.wantCreates[i], obj) { + t.Errorf("unexpected create: %s", diff.ObjectReflectDiff(tt.wantCreates[i], obj)) } } actions = actions[len(tt.wantCreates):] diff --git a/pkg/router/controller/extended_validator.go b/pkg/router/controller/extended_validator.go index 511008ed82c4..4b11262d4a97 100644 --- a/pkg/router/controller/extended_validator.go +++ b/pkg/router/controller/extended_validator.go @@ -2,7 +2,6 @@ package controller import ( "fmt" - "reflect" "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/sets" @@ -22,9 +21,6 @@ type ExtendedValidator struct { // recorder is an interface for indicating route rejections. recorder RejectionRecorder - - // invalidRoutes is a map of invalid routes previously encountered. - invalidRoutes map[string]routeapi.Route } // NewExtendedValidator creates a plugin wrapper that ensures only routes that @@ -32,9 +28,8 @@ type ExtendedValidator struct { // Recorder is an interface for indicating why a route was rejected. func NewExtendedValidator(plugin router.Plugin, recorder RejectionRecorder) *ExtendedValidator { return &ExtendedValidator{ - plugin: plugin, - recorder: recorder, - invalidRoutes: make(map[string]routeapi.Route), + plugin: plugin, + recorder: recorder, } } @@ -52,17 +47,6 @@ func (p *ExtendedValidator) HandleEndpoints(eventType watch.EventType, endpoints func (p *ExtendedValidator) HandleRoute(eventType watch.EventType, route *routeapi.Route) error { // Check if previously seen route and its Spec is unchanged. routeName := routeNameKey(route) - old, ok := p.invalidRoutes[routeName] - if ok && reflect.DeepEqual(old.Spec, route.Spec) { - // Route spec was unchanged and it is already marked in - // error, we don't need to do anything more. - p.plugin.HandleRoute(watch.Deleted, route) - if eventType == watch.Deleted { - delete(p.invalidRoutes, routeName) - } - return fmt.Errorf("invalid route configuration") - } - if errs := validation.ExtendedValidateRoute(route); len(errs) > 0 { errmsg := "" for i := 0; i < len(errs); i++ { diff --git a/test/extended/testdata/ingress.yaml b/test/extended/testdata/ingress.yaml index 78887547cb08..9e1a67cc3499 100644 --- a/test/extended/testdata/ingress.yaml +++ b/test/extended/testdata/ingress.yaml @@ -1,7 +1,7 @@ kind: List apiVersion: v1 items: -# an ingress that should be captured as three routes +# an ingress that should be captured as individual routes - apiVersion: extensions/v1beta1 kind: Ingress metadata: