Skip to content

Commit

Permalink
Set a route expectation to delay processing until routes are created
Browse files Browse the repository at this point in the history
When a controller creates multiple routes from a single ingress it may
observe arbitrary delays between the time of creation and when it
observes that route in its cache. Like other controllers that use random
name generation (secret controller, replica set controller) we therefore
need to wait until our expectation is met. To simplify the code, we
bypass server side name generation and use our own name generation to
avoid racing against the server (create route, server sends event, set
expectation = expectation is never cleared).
  • Loading branch information
smarterclayton committed Mar 30, 2018
1 parent eeca469 commit 1a87746
Show file tree
Hide file tree
Showing 2 changed files with 446 additions and 14 deletions.
147 changes: 146 additions & 1 deletion pkg/route/controller/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingress

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -15,7 +16,9 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/json"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
Expand Down Expand Up @@ -50,6 +53,13 @@ import (
// 4. A route owned by an ingress that is not described by any of the three invariants
// above should be deleted.
//
// The controller also relies on the use of expectations to remind itself whether there
// are route creations it has not yet observed, which prevents the controller from
// creating more objects than it needs. The expectations are reset when the ingress
// object is modified. It is possible that expectations could leak if an ingress is
// deleted and its deletion is not observed by the cache, but such leaks are only expected
// if there is a bug in the informer cache which must be fixed anyway.
//
// Unsupported attributes:
//
// * the ingress class attribute
Expand All @@ -73,6 +83,73 @@ type Controller struct {

// queue is the list of namespace keys that must be synced.
queue workqueue.RateLimitingInterface

// expectations track upcoming route creations that we have not yet observed
expectations *expectations
// expectationDelay controls how long the controller waits to observe its
// own creates. Exposed only for testing.
expectationDelay time.Duration
}

// expectations track an upcoming change to a named resource related
// to an ingress. This is a thread safe object but callers assume
// responsibility for ensuring expectations do not leak.
type expectations struct {
lock sync.Mutex
expect map[queueKey]sets.String
}

// newExpectations returns a tracking object for upcoming events
// that the controller may expect to happen.
func newExpectations() *expectations {
return &expectations{
expect: make(map[queueKey]sets.String),
}
}

// Expect that an event will happen in the future for the given ingress
// and a named resource related to that ingress.
func (e *expectations) Expect(namespace, ingressName, name string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
set, ok := e.expect[key]
if !ok {
set = sets.NewString()
e.expect[key] = set
}
set.Insert(name)
}

// Satisfied clears the expectation for the given resource name on an
// ingress.
func (e *expectations) Satisfied(namespace, ingressName, name string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
set := e.expect[key]
set.Delete(name)
if set.Len() == 0 {
delete(e.expect, key)
}
}

// Expecting returns true if the provided ingress is still waiting to
// see changes.
func (e *expectations) Expecting(namespace, ingressName string) bool {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
return e.expect[key].Len() > 0
}

// Clear indicates that all expectations for the given ingress should
// be cleared.
func (e *expectations) Clear(namespace, ingressName string) {
e.lock.Lock()
defer e.lock.Unlock()
key := queueKey{namespace: namespace, name: ingressName}
delete(e.expect, key)
}

type queueKey struct {
Expand All @@ -92,6 +169,9 @@ func NewController(eventsClient kv1core.EventsGetter, client routeclient.RoutesG
eventRecorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress-to-route"),

expectations: newExpectations(),
expectationDelay: 2 * time.Second,

client: client,

ingressLister: ingresses.Lister(),
Expand Down Expand Up @@ -186,6 +266,7 @@ func (c *Controller) processRoute(obj interface{}) {
if !ok {
return
}
c.expectations.Satisfied(t.Namespace, ingressName, t.Name)
c.queue.Add(queueKey{namespace: t.Namespace, name: ingressName})
default:
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
Expand All @@ -195,6 +276,10 @@ func (c *Controller) processRoute(obj interface{}) {
func (c *Controller) processIngress(obj interface{}) {
switch t := obj.(type) {
case *extensionsv1beta1.Ingress:
// when we see a change to an ingress, reset our expectations
// this also allows periodic purging of the expectation list in the event
// we miss one or more events.
c.expectations.Clear(t.Namespace, t.Name)
c.queue.Add(queueKey{namespace: t.Namespace, name: t.Name})
default:
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
Expand Down Expand Up @@ -234,8 +319,10 @@ func (c *Controller) processNext() bool {
}
defer c.queue.Done(key)

glog.V(5).Infof("processing %v begin", key)
err := c.sync(key.(queueKey))
c.handleNamespaceErr(err, key)
glog.V(5).Infof("processing %v end", key)

return true
}
Expand All @@ -262,6 +349,12 @@ func (c *Controller) sync(key queueKey) error {
}
return nil
}
// if we are waiting to observe the result of route creations, simply delay
if c.expectations.Expecting(key.namespace, key.name) {
c.queue.AddAfter(key, c.expectationDelay)
glog.V(5).Infof("Ingress %s/%s has unsatisfied expectations", key.namespace, key.name)
return nil
}

ingress, err := c.ingressLister.Ingresses(key.namespace).Get(key.name)
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -328,7 +421,7 @@ func (c *Controller) sync(key queueKey) error {

// add the new routes
for _, route := range creates {
if _, err := c.client.Routes(route.Namespace).Create(route); err != nil {
if err := createRouteWithName(c.client, ingress, route, c.expectations); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -546,3 +639,55 @@ func referencesSecret(ingress *extensionsv1beta1.Ingress, host string) (string,
}
return "", false
}

// createRouteWithName performs client side name generation so we can set a predictable expectation.
// If we fail multiple times in a row we will return an error.
// TODO: future optimization, check the local cache for the name first
func createRouteWithName(client routeclient.RoutesGetter, ingress *extensionsv1beta1.Ingress, route *routev1.Route, expect *expectations) error {
base := route.GenerateName
var lastErr error
// only retry a limited number of times
for i := 0; i < 3; i++ {
if len(base) > 0 {
route.GenerateName = ""
route.Name = generateRouteName(base)
}

// Set the expectation before we talk to the server in order to
// prevent racing with the route cache.
expect.Expect(ingress.Namespace, ingress.Name, route.Name)

_, err := client.Routes(route.Namespace).Create(route)
if err == nil {
return nil
}

// We either collided with another randomly generated name, or another
// error between us and the server prevented observing the success
// of the result. In either case we are not expecting a new route. This
// is safe because expectations are an optimization to avoid churn rather
// than to prevent true duplicate creation.
expect.Satisfied(ingress.Namespace, ingress.Name, route.Name)

// if we aren't generating names (or if we got any other type of error)
// return right away
if len(base) == 0 || !errors.IsAlreadyExists(err) {
return err
}
lastErr = err
}
return lastErr
}

const (
maxNameLength = 63
randomLength = 5
maxGeneratedNameLength = maxNameLength - randomLength
)

func generateRouteName(base string) string {
if len(base) > maxGeneratedNameLength {
base = base[:maxGeneratedNameLength]
}
return fmt.Sprintf("%s%s", base, utilrand.String(randomLength))
}
Loading

0 comments on commit 1a87746

Please sign in to comment.