Skip to content

Commit

Permalink
Merge pull request #19706 from smarterclayton/log_route_uid
Browse files Browse the repository at this point in the history
Make router conflict detection work even during initial informer sync
  • Loading branch information
openshift-merge-robot authored May 18, 2018
2 parents 3ddd571 + aa28fa9 commit 2270d0a
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 270 deletions.
5 changes: 3 additions & 2 deletions pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,11 @@ func (o *F5RouterOptions) Run() error {
if o.UpdateStatus {
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
informer := factory.CreateRoutesSharedInformer()
tracker := controller.NewSimpleContentionTracker(informer, o.RouterName, o.ResyncInterval/10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
routeLister := routelisters.NewRouteLister(informer.GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,11 @@ func (o *TemplateRouterOptions) Run() error {
if o.UpdateStatus {
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
informer := factory.CreateRoutesSharedInformer()
tracker := controller.NewSimpleContentionTracker(informer, o.RouterName, o.ResyncInterval/10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
routeLister := routelisters.NewRouteLister(informer.GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
Expand Down
281 changes: 281 additions & 0 deletions pkg/router/controller/contention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package controller

import (
"sync"
"time"

"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

routeapi "github.com/openshift/origin/pkg/route/apis/route"
)

// ContentionTracker records modifications to a particular entry to prevent endless
// loops when multiple routers are configured with conflicting info. A given router
// process tracks whether the ingress status is change from a correct value to any
// other value (by invoking IsContended when the state has diverged).
type ContentionTracker interface {
// IsContended should be invoked when the state of the object in storage differs
// from the desired state. It will return true if the provided id was recently
// reset from the correct state to an incorrect state. The current ingress is the
// expected state of the object at this time and may be used by the tracker to
// determine if the most recent update was a contention. This method does not
// update the state of the tracker.
IsChangeContended(id string, now time.Time, current *routeapi.RouteIngress) bool
// Clear informs the tracker that the provided ingress state was confirmed to
// match the current state of this process. If a subsequent call to IsChangeContended
// is made within the expiration window, the object will be considered as contended.
Clear(id string, current *routeapi.RouteIngress)
}

type elementState int

const (
stateCandidate elementState = iota
stateContended
)

type trackerElement struct {
at time.Time
state elementState
last *routeapi.RouteIngress
}

// SimpleContentionTracker tracks whether a given identifier is changed from a correct
// state (set by Clear) to an incorrect state (inferred by calling IsContended).
type SimpleContentionTracker struct {
informer cache.SharedInformer
routerName string

expires time.Duration
// maxContentions is the number of contentions detected before the entire
maxContentions int
message string

lock sync.Mutex
contentions int
ids map[string]trackerElement
}

// NewSimpleContentionTracker creates a ContentionTracker that will prevent writing
// to the same route more often than once per interval. A background process will
// periodically flush old entries (at twice interval) in order to prevent the list
// growing unbounded if routes are created and deleted frequently. The informer
// detects changes to ingress records for routerName and will advance the tracker
// state from candidate to contended if the host, wildcardPolicy, or canonical host
// name fields are repeatedly updated.
func NewSimpleContentionTracker(informer cache.SharedInformer, routerName string, interval time.Duration) *SimpleContentionTracker {
return &SimpleContentionTracker{
informer: informer,
routerName: routerName,
expires: interval,
maxContentions: 5,

ids: make(map[string]trackerElement),
}
}

// SetConflictMessage will print message whenever contention with another writer
// is detected.
func (t *SimpleContentionTracker) SetConflictMessage(message string) {
t.lock.Lock()
defer t.lock.Unlock()
t.message = message
}

// Run starts the background cleanup process for expired items.
func (t *SimpleContentionTracker) Run(stopCh <-chan struct{}) {
// Watch the informer for changes to the route ingress we care about (identified
// by router name) and if we see it change remember it. This loop can process
// changes to routes faster than the router, which means it has more up-to-date
// contention info and can detect contention while the main controller is still
// syncing.
t.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, obj interface{}) {
oldRoute, ok := oldObj.(*routeapi.Route)
if !ok {
return
}
route, ok := obj.(*routeapi.Route)
if !ok {
return
}
if ingress := ingressChanged(oldRoute, route, t.routerName); ingress != nil {
t.Changed(string(route.UID), ingress)
}
},
})

// periodically clean up expired changes
ticker := time.NewTicker(t.expires * 2)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
t.flush()
}
}
}

func (t *SimpleContentionTracker) flush() {
t.lock.Lock()
defer t.lock.Unlock()

// reset conflicts every expiration interval, but remove tracking info less often
contentionExpiration := nowFn().Add(-t.expires)
trackerExpiration := contentionExpiration.Add(-2 * t.expires)

removed := 0
contentions := 0
for id, last := range t.ids {
switch last.state {
case stateContended:
if last.at.Before(contentionExpiration) {
delete(t.ids, id)
removed++
continue
}
contentions++
default:
if last.at.Before(trackerExpiration) {
delete(t.ids, id)
removed++
continue
}
}
}
if t.contentions > 0 && len(t.message) > 0 {
glog.Warning(t.message)
}
glog.V(5).Infof("Flushed contention tracker (%s): %d out of %d removed, %d total contentions", t.expires*2, removed, removed+len(t.ids), t.contentions)
t.contentions = contentions
}

// Changed records that a change to an ingress value was detected. This is called from
// a separate goroutine and may have seen newer events than the current route controller
// plugins, so we don't do direct time comparisons. Instead we count edge transitions on
// a given id.
func (t *SimpleContentionTracker) Changed(id string, current *routeapi.RouteIngress) {
t.lock.Lock()
defer t.lock.Unlock()

// we have detected a sufficient number of conflicts to skip all updates for this interval
if t.contentions > t.maxContentions {
glog.V(4).Infof("Reached max contentions, stop tracking changes")
return
}

// if we have never recorded this object
last, ok := t.ids[id]
if !ok {
t.ids[id] = trackerElement{
at: nowFn().Time,
state: stateCandidate,
last: current,
}
glog.V(4).Infof("Object %s is a candidate for contention", id)
return
}

// the previous state matches the current state, nothing to do
if ingressEqual(last.last, current) {
glog.V(4).Infof("Object %s is unchanged", id)
return
}

if last.state == stateContended {
t.contentions++
glog.V(4).Infof("Object %s is contended and has been modified by another writer", id)
return
}

// if it appears that the state is being changed by another party, mark it as contended
if last.state == stateCandidate {
t.ids[id] = trackerElement{
at: nowFn().Time,
state: stateContended,
last: current,
}
t.contentions++
glog.V(4).Infof("Object %s has been modified by another writer", id)
return
}
}

func (t *SimpleContentionTracker) IsChangeContended(id string, now time.Time, current *routeapi.RouteIngress) bool {
t.lock.Lock()
defer t.lock.Unlock()

// we have detected a sufficient number of conflicts to skip all updates for this interval
if t.contentions > t.maxContentions {
glog.V(4).Infof("Reached max contentions, rejecting all update attempts until the next interval")
return true
}

// if we have expired or never recorded this object
last, ok := t.ids[id]
if !ok || last.at.Add(t.expires).Before(now) {
return false
}

// if the object is contended, exit early
if last.state == stateContended {
glog.V(4).Infof("Object %s is being contended by another writer", id)
return true
}

return false
}

func (t *SimpleContentionTracker) Clear(id string, current *routeapi.RouteIngress) {
t.lock.Lock()
defer t.lock.Unlock()

last, ok := t.ids[id]
if !ok {
return
}
last.last = current
last.state = stateCandidate
t.ids[id] = last
}

func ingressEqual(a, b *routeapi.RouteIngress) bool {
return a.Host == b.Host && a.RouterCanonicalHostname == b.RouterCanonicalHostname && a.WildcardPolicy == b.WildcardPolicy && a.RouterName == b.RouterName
}

func ingressConditionTouched(ingress *routeapi.RouteIngress) *metav1.Time {
var lastTouch *metav1.Time
for _, condition := range ingress.Conditions {
if t := condition.LastTransitionTime; t != nil {
switch {
case lastTouch == nil, t.After(lastTouch.Time):
lastTouch = t
}
}
}
return lastTouch
}

func ingressChanged(oldRoute, route *routeapi.Route, routerName string) *routeapi.RouteIngress {
var ingress *routeapi.RouteIngress
for i := range route.Status.Ingress {
if route.Status.Ingress[i].RouterName == routerName {
ingress = &route.Status.Ingress[i]
for _, old := range oldRoute.Status.Ingress {
if old.RouterName == routerName {
if !ingressEqual(ingress, &old) {
return ingress
}
return nil
}
}
return nil
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/router/controller/router_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *RouterController) Commit() {

// processRoute logs and propagates a route event to the plugin
func (c *RouterController) processRoute(eventType watch.EventType, route *routeapi.Route) {
glog.V(4).Infof("Processing Route: %s/%s -> %s", route.Namespace, route.Name, route.Spec.To.Name)
glog.V(4).Infof("Processing route: %s/%s -> %s %s", route.Namespace, route.Name, route.Spec.To.Name, route.UID)
glog.V(4).Infof(" Alias: %s", route.Spec.Host)
if len(route.Spec.Path) > 0 {
glog.V(4).Infof(" Path: %s", route.Spec.Path)
Expand Down
Loading

0 comments on commit 2270d0a

Please sign in to comment.