Skip to content

Commit

Permalink
Set a route expectation to delay processing until routes are created
Browse files Browse the repository at this point in the history
When a controller creates multiple routes from a single ingress it may
observe arbitrary delays between the time of creation and when it
observes that route in its cache. Like other controllers that use random
name generation (secret controller, replica set controller) we therefore
need to wait until our expectation is met. To simplify the code, we
bypass server side name generation and use our own name generation to
avoid racing against the server (create route, server sends event, set
expectation = expectation is never cleared).
  • Loading branch information
smarterclayton committed Mar 30, 2018
1 parent e224347 commit 7ca5f80
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/server/bootstrappolicy/controller_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func init() {
},
})

// ingress-secretref-controller
// ingress-to-route-controller
addControllerRole(rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraIngressToRouteControllerServiceAccountName},
Rules: []rbac.PolicyRule{
Expand Down
153 changes: 149 additions & 4 deletions pkg/route/controller/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingress

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -15,7 +16,9 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/json"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
Expand Down Expand Up @@ -46,9 +49,16 @@ import (
// 2. For every TLS hostname that has a corresponding path rule and points to a secret
// that exists, a route should exist with a valid TLS config from that secret.
// 3. For every service referenced by the ingress path rule, the route should have
// an update to date target port based on the service.
// 4. A route owned by an ingress that no longer satisfies the first three invariants
// should be deleted.
// a target port based on the service.
// 4. A route owned by an ingress that is not described by any of the three invariants
// above should be deleted.
//
// The controller also relies on the use of expectations to remind itself whether there
// are route creations it has not yet observed, which prevents the controller from
// creating more objects than it needs. The expectations are reset when the ingress
// object is modified. It is possible that expectations could leak if an ingress is
// deleted and its deletion is not observed by the cache, but such leaks are only expected
// if there is a bug in the informer cache which must be fixed anyway.
//
// Unsupported attributes:
//
Expand All @@ -73,6 +83,73 @@ type Controller struct {

// queue is the list of namespace keys that must be synced.
queue workqueue.RateLimitingInterface

// expectations track upcoming route creations that we have not yet observed
expectations *expectations
// expectationDelay controls how long the controller waits to observe its
// own creates. Exposed only for testing.
expectationDelay time.Duration
}

// expectations track an upcoming change to a named resource related
// to an ingress. This is a thread safe object but callers assume
// responsibility for ensuring expectations do not leak.
type expectations struct {
lock sync.Mutex
expect map[queueKey]sets.String
}

// newExpectations returns a tracking object for upcoming events
// that the controller may expect to happen.
func newExpectations() *expectations {
return &expectations{
expect: make(map[queueKey]sets.String),
}
}

// Expect that an event will happen in the future for the given ingress
// and a named resource related to that ingress.
func (e *expectations) Expect(namespace, ingressName, name string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
set, ok := e.expect[key]
if !ok {
set = sets.NewString()
e.expect[key] = set
}
set.Insert(name)
}

// Satisfied clears the expectation for the given resource name on an
// ingress.
func (e *expectations) Satisfied(namespace, ingressName, name string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
set := e.expect[key]
set.Delete(name)
if set.Len() == 0 {
delete(e.expect, key)
}
}

// Expecting returns true if the provided ingress is still waiting to
// see changes.
func (e *expectations) Expecting(namespace, ingressName string) bool {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
return e.expect[key].Len() > 0
}

// Clear indicates that all expectations for the given ingress should
// be cleared.
func (e *expectations) Clear(namespace, ingressName string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
delete(e.expect, key)
}

type queueKey struct {
Expand All @@ -92,6 +169,9 @@ func NewController(eventsClient kv1core.EventsGetter, client routeclient.RoutesG
eventRecorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress-to-route"),

expectations: newExpectations(),
expectationDelay: 2 * time.Second,

client: client,

ingressLister: ingresses.Lister(),
Expand Down Expand Up @@ -186,6 +266,7 @@ func (c *Controller) processRoute(obj interface{}) {
if !ok {
return
}
c.expectations.Satisfied(t.Namespace, ingressName, t.Name)
c.queue.Add(queueKey{namespace: t.Namespace, name: ingressName})
default:
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
Expand All @@ -195,6 +276,10 @@ func (c *Controller) processRoute(obj interface{}) {
func (c *Controller) processIngress(obj interface{}) {
switch t := obj.(type) {
case *extensionsv1beta1.Ingress:
// when we see a change to an ingress, reset our expectations
// this also allows periodic purging of the expectation list in the event
// we miss one or more events.
c.expectations.Clear(t.Namespace, t.Name)
c.queue.Add(queueKey{namespace: t.Namespace, name: t.Name})
default:
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
Expand Down Expand Up @@ -234,8 +319,10 @@ func (c *Controller) processNext() bool {
}
defer c.queue.Done(key)

glog.V(5).Infof("processing %v begin", key)
err := c.sync(key.(queueKey))
c.handleNamespaceErr(err, key)
glog.V(5).Infof("processing %v end", key)

return true
}
Expand All @@ -262,6 +349,12 @@ func (c *Controller) sync(key queueKey) error {
}
return nil
}
// if we are waiting to observe the result of route creations, simply delay
if c.expectations.Expecting(key.namespace, key.name) {
c.queue.AddAfter(key, c.expectationDelay)
glog.V(5).Infof("Ingress %s/%s has unsatisfied expectations", key.namespace, key.name)
return nil
}

ingress, err := c.ingressLister.Ingresses(key.namespace).Get(key.name)
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -328,7 +421,7 @@ func (c *Controller) sync(key queueKey) error {

// add the new routes
for _, route := range creates {
if _, err := c.client.Routes(route.Namespace).Create(route); err != nil {
if err := createRouteWithName(c.client, ingress, route, c.expectations); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -546,3 +639,55 @@ func referencesSecret(ingress *extensionsv1beta1.Ingress, host string) (string,
}
return "", false
}

// createRouteWithName performs client side name generation so we can set a predictable expectation.
// If we fail multiple times in a row we will return an error.
// TODO: future optimization, check the local cache for the name first
func createRouteWithName(client routeclient.RoutesGetter, ingress *extensionsv1beta1.Ingress, route *routev1.Route, expect *expectations) error {
base := route.GenerateName
var lastErr error
// only retry a limited number of times
for i := 0; i < 3; i++ {
if len(base) > 0 {
route.GenerateName = ""
route.Name = generateRouteName(base)
}

// Set the expectation before we talk to the server in order to
// prevent racing with the route cache.
expect.Expect(ingress.Namespace, ingress.Name, route.Name)

_, err := client.Routes(route.Namespace).Create(route)
if err == nil {
return nil
}

// We either collided with another randomly generated name, or another
// error between us and the server prevented observing the success
// of the result. In either case we are not expecting a new route. This
// is safe because expectations are an optimization to avoid churn rather
// than to prevent true duplicate creation.
expect.Satisfied(ingress.Namespace, ingress.Name, route.Name)

// if we aren't generating names (or if we got any other type of error)
// return right away
if len(base) == 0 || !errors.IsAlreadyExists(err) {
return err
}
lastErr = err
}
return lastErr
}

const (
maxNameLength = 63
randomLength = 5
maxGeneratedNameLength = maxNameLength - randomLength
)

func generateRouteName(base string) string {
if len(base) > maxGeneratedNameLength {
base = base[:maxGeneratedNameLength]
}
return fmt.Sprintf("%s%s", base, utilrand.String(randomLength))
}
Loading

0 comments on commit 7ca5f80

Please sign in to comment.