Skip to content

Commit

Permalink
Perform conflict detection when other routers edit the route
Browse files Browse the repository at this point in the history
When there are more than 2 routers, we can detect changes made by other
clients and treat it as a conflict. That won't prevent 2 routers from
competing over their own initial sync in all cases.
  • Loading branch information
smarterclayton committed May 15, 2018
1 parent a4e322b commit 5dc4777
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
48 changes: 36 additions & 12 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func performIngressConditionUpdate(action string, lease writerlease.Lease, track
}

route = route.DeepCopy()
changed, created, now, latest := recordIngressCondition(route, routerName, hostName, condition)
changed, created, now, latest, original := recordIngressCondition(route, routerName, hostName, condition)
if !changed {
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
tracker.Clear(key, latest)
Expand All @@ -144,7 +144,7 @@ func performIngressConditionUpdate(action string, lease writerlease.Lease, track
// If the tracker determines that another process is attempting to update the ingress to an inconsistent
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
// with different configurations from endlessly updating the route status.
if !created && tracker.IsContended(key, now, latest) {
if !created && tracker.IsContended(key, now, original) {
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
return writerlease.Release, false
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func performIngressConditionUpdate(action string, lease writerlease.Lease, track
// recordIngressCondition updates the matching ingress on the route (or adds a new one) with the specified
// condition, returning whether the route was updated or created, the time assigned to the condition, and
// a pointer to the current ingress record.
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, at time.Time, latest *routeapi.RouteIngress) {
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, at time.Time, latest, original *routeapi.RouteIngress) {
for i := range route.Status.Ingress {
existing := &route.Status.Ingress[i]
if existing.RouterName != name {
Expand All @@ -199,9 +199,13 @@ func recordIngressCondition(route *routeapi.Route, name, hostName string, condit
}
}
if !changed {
return false, false, time.Time{}, existing
return false, false, time.Time{}, existing, existing
}

// preserve a copy of the original ingress without conditions
original := *existing
original.Conditions = nil

// generate the correct ingress
existing.Host = route.Spec.Host
existing.WildcardPolicy = route.Spec.WildcardPolicy
Expand All @@ -215,7 +219,7 @@ func recordIngressCondition(route *routeapi.Route, name, hostName string, condit
now := nowFn()
existingCondition.LastTransitionTime = &now

return true, false, now.Time, existing
return true, false, now.Time, existing, &original
}

// add a new ingress
Expand All @@ -232,7 +236,7 @@ func recordIngressCondition(route *routeapi.Route, name, hostName string, condit
now := nowFn()
ingress.Conditions[0].LastTransitionTime = &now

return true, true, now.Time, ingress
return true, true, now.Time, ingress, nil
}

// findMostRecentIngress returns the name of the ingress status with the most recent Admitted condition transition time,
Expand Down Expand Up @@ -269,15 +273,15 @@ func findCondition(ingress *routeapi.RouteIngress, t routeapi.RouteIngressCondit
type ContentionTracker interface {
// IsContended should be invoked when the state of the object in storage differs
// from the desired state. It will return true if the provided id was recently
// reset from the correct state to an incorrect state. The input ingress is the
// reset from the correct state to an incorrect state. The current ingress is the
// expected state of the object at this time and may be used by the tracker to
// determine if the most recent update was a contention. now is the current time
// that should be used to record the change.
IsContended(id string, now time.Time, ingress *routeapi.RouteIngress) bool
IsContended(id string, now time.Time, current *routeapi.RouteIngress) bool
// Clear informs the tracker that the provided ingress state was confirmed to
// match the current state of this process. If a subsequent call to IsContended
// is made within the expiration window, the object will be considered as contended.
Clear(id string, ingress *routeapi.RouteIngress)
Clear(id string, current *routeapi.RouteIngress)
}

type elementState int
Expand All @@ -291,6 +295,7 @@ const (
type trackerElement struct {
at time.Time
state elementState
last *routeapi.RouteIngress
}

// SimpleContentionTracker tracks whether a given identifier is changed from a correct
Expand Down Expand Up @@ -375,7 +380,7 @@ func (t *SimpleContentionTracker) flush() {
t.contentions = contentions
}

func (t *SimpleContentionTracker) IsContended(id string, now time.Time, ingress *routeapi.RouteIngress) bool {
func (t *SimpleContentionTracker) IsContended(id string, now time.Time, current *routeapi.RouteIngress) bool {
t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -391,6 +396,7 @@ func (t *SimpleContentionTracker) IsContended(id string, now time.Time, ingress
t.ids[id] = trackerElement{
at: now,
state: stateIncorrect,
last: current,
}
return false
}
Expand All @@ -406,26 +412,44 @@ func (t *SimpleContentionTracker) IsContended(id string, now time.Time, ingress
t.ids[id] = trackerElement{
at: now,
state: stateContended,
last: current,
}
t.contentions++
glog.V(4).Infof("Object %s was previously correct and is now incorrect", id)
return true
}

// if it appears that the state is being changed by another party, mark it as contended
if last.last != nil && !ingressEqual(last.last, current) {
t.ids[id] = trackerElement{
at: now,
state: stateContended,
last: current,
}
t.contentions++
glog.V(4).Infof("Object %s was incorrect and is being modified by another writer", id)
return true
}

return false
}

func (t *SimpleContentionTracker) Clear(id string, ingress *routeapi.RouteIngress) {
func (t *SimpleContentionTracker) Clear(id string, current *routeapi.RouteIngress) {
t.lock.Lock()
defer t.lock.Unlock()
if at := ingressConditionTouched(ingress); at != nil {
if at := ingressConditionTouched(current); at != nil {
t.ids[id] = trackerElement{
at: at.Time,
state: stateCorrect,
last: current,
}
}
}

func ingressEqual(a, b *routeapi.RouteIngress) bool {
return a.Host == b.Host && a.RouterCanonicalHostname == b.RouterCanonicalHostname && a.WildcardPolicy == b.WildcardPolicy && a.RouterName == b.RouterName
}

func ingressConditionTouched(ingress *routeapi.RouteIngress) *metav1.Time {
var lastTouch *metav1.Time
for _, condition := range ingress.Conditions {
Expand Down
4 changes: 2 additions & 2 deletions pkg/router/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,10 @@ func TestProtractedStatusFightBetweenRouters(t *testing.T) {
makePass(t, oldHost, oldAdmitter2, currObj, false, false)
makePass(t, newHost, newAdmitter2, currObj, false, false)

t.Logf("If we now send out a route update, 'old' router #1 should update the status...")
t.Logf("If we now send out a route update, 'old' router #1 ignores it because it sees a conflict...")
now = metav1.Time{Time: now.Add(4 * time.Second)}
nowFn = func() metav1.Time { return now }
currObj = makePass(t, oldHost2, oldAdmitter1, currObj, true, false)
makePass(t, oldHost2, oldAdmitter1, currObj, false, false)

t.Logf("...and the other routers should ignore the update...")
makePass(t, newHost2, newAdmitter1, currObj, false, false)
Expand Down

0 comments on commit 5dc4777

Please sign in to comment.