Skip to content

Commit

Permalink
Use backoff when writing route status to avoid conflicts
Browse files Browse the repository at this point in the history
Each router process now uses a rough "write leasing" scheme to avoid
sending conflicting writes. When a router starts, it tries to write
status for its ingress. If it succeeds in writing status, it considers
itself to hold the lease, and if it fails it considers itself a follower
and goes into exponential backoff. A single leader quickly emerges, and
all other routers observe the writes and consider the leader to be
extending her lease. In this fashion a large number of routers can use
the route status itself as a coordination mechanism and avoid generating
large numbers of meaningless writes.
  • Loading branch information
smarterclayton committed Mar 6, 2018
1 parent 9b71396 commit 60614d4
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 37 deletions.
6 changes: 5 additions & 1 deletion pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
f5plugin "github.com/openshift/origin/pkg/router/f5"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -232,10 +234,12 @@ func (o *F5RouterOptions) Run() error {
var plugin router.Plugin = f5Plugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, tracker)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/openshift/origin/pkg/router/metrics/haproxy"
templateplugin "github.com/openshift/origin/pkg/router/template"
"github.com/openshift/origin/pkg/util/proc"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -414,10 +415,12 @@ func (o *TemplateRouterOptions) Run() error {
var plugin router.Plugin = templatePlugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, tracker)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand Down
69 changes: 50 additions & 19 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
routeapi "github.com/openshift/origin/pkg/route/apis/route"
client "github.com/openshift/origin/pkg/route/generated/internalclientset/typed/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/util/writerlease"
)

// RejectionRecorder is an object capable of recording why a route was rejected
Expand All @@ -39,21 +40,24 @@ type StatusAdmitter struct {
client client.RoutesGetter
routerName string
routerCanonicalHostname string
tracker ContentionTracker

lease writerlease.Lease
tracker ContentionTracker
}

// NewStatusAdmitter creates a plugin wrapper that ensures every accepted
// route has a status field set that matches this router. The admitter manages
// an LRU of recently seen conflicting updates to handle when two router processes
// with differing configurations are writing updates at the same time.
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string, tracker ContentionTracker) *StatusAdmitter {
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string, lease writerlease.Lease, tracker ContentionTracker) *StatusAdmitter {
return &StatusAdmitter{
plugin: plugin,
client: client,
routerName: name,
routerCanonicalHostname: hostName,

tracker: tracker,
lease: lease,
}
}

Expand All @@ -73,7 +77,7 @@ func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi.
} else {
switch eventType {
case watch.Added, watch.Modified:
performIngressConditionUpdate("admit", a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionTrue,
})
Expand Down Expand Up @@ -104,7 +108,7 @@ func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, mes
// Can't record status for ingress resources
return
}
performIngressConditionUpdate("reject", a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
performIngressConditionUpdate("reject", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
Reason: reason,
Expand All @@ -113,27 +117,36 @@ func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, mes
}

// performIngressConditionUpdate updates the route to the the appropriate status for the provided condition.
func performIngressConditionUpdate(action string, tracker ContentionTracker, oc client.RoutesGetter, route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) bool {
func performIngressConditionUpdate(action string, lease writerlease.Lease, tracker ContentionTracker, oc client.RoutesGetter, route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) {
key := string(route.UID)
newestIngressName := findMostRecentIngress(route)
changed, created, now, latest := recordIngressCondition(route, name, hostName, condition)
if !changed {
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
tracker.Clear(string(route.UID), latest)
return true
tracker.Clear(key, latest)
// if the most recent change was to our ingress status, consider the current lease extended
if newestIngressName == name {
lease.Extend(key)
}
return
}
// If the tracker determines that another process is attempting to update the ingress to an inconsistent
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
// with different configurations from endlessly updating the route status.
if !created && tracker.IsContended(string(route.UID), now, latest) {
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, route.UID)
return true
}
updated := updateStatus(oc, route)
if updated {
tracker.Clear(string(route.UID), latest)
} else {
glog.V(4).Infof("%s: did not update route status, skipping route until next resync", action)
if !created && tracker.IsContended(key, now, latest) {
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
return
}
return updated

lease.Try(key, func() (bool, bool) {
updated := updateStatus(oc, route)
if updated {
tracker.Clear(key, latest)
} else {
glog.V(4).Infof("%s: did not update route status, skipping route until next resync", action)
}
return updated, false
})
}

// recordIngressCondition updates the matching ingress on the route (or adds a new one) with the specified
Expand Down Expand Up @@ -195,8 +208,23 @@ func recordIngressCondition(route *routeapi.Route, name, hostName string, condit
return true, true, now.Time, ingress
}

// setIngressCondition records the condition on the ingress, returning true if the ingress was changed and
// false if no modification was made (or the only modification would have been to update a time).
// findMostRecentIngress returns the name of the ingress status with the most recent Admitted condition transition time,
// or an empty string if no such ingress exists.
func findMostRecentIngress(route *routeapi.Route) string {
var newest string
var recent time.Time
for _, ingress := range route.Status.Ingress {
if condition := findCondition(&ingress, routeapi.RouteAdmitted); condition != nil && condition.LastTransitionTime != nil {
if condition.LastTransitionTime.Time.After(recent) {
recent = condition.LastTransitionTime.Time
newest = ingress.RouterName
}
}
}
return newest
}

// findCondition locates the first condition that corresponds to the requested type.
func findCondition(ingress *routeapi.RouteIngress, t routeapi.RouteIngressConditionType) (_ *routeapi.RouteIngressCondition) {
for i, existing := range ingress.Conditions {
if existing.Type != t {
Expand All @@ -212,6 +240,9 @@ func updateStatus(oc client.RoutesGetter, route *routeapi.Route) bool {
switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
case err == nil:
return true
case errors.IsNotFound(err):
// route was deleted
return false
case errors.IsForbidden(err):
// if the router can't write status updates, allow the route to go through
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
Expand Down
54 changes: 38 additions & 16 deletions pkg/router/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,30 @@ import (

routeapi "github.com/openshift/origin/pkg/route/apis/route"
"github.com/openshift/origin/pkg/route/generated/internalclientset/fake"
"github.com/openshift/origin/pkg/util/writerlease"
)

type noopLease struct{}

func (_ noopLease) Wait() bool {
panic("not implemented")
}

func (_ noopLease) WaitUntil(t time.Duration) (leader bool, ok bool) {
panic("not implemented")
}

func (_ noopLease) Try(key string, fn writerlease.WorkFunc) {
fn()
}

func (_ noopLease) Extend(key string) {
}

func (_ noopLease) Remove(key string) {
panic("not implemented")
}

type fakePlugin struct {
t watch.EventType
route *routeapi.Route
Expand Down Expand Up @@ -81,7 +103,7 @@ func TestStatusNoOp(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset()
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "a.b.c.d", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "a.b.c.d", noopLease{}, tracker)
err := admitter.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -149,7 +171,7 @@ func TestStatusResetsHost(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
err := admitter.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -186,7 +208,7 @@ func TestStatusAdmitsRouteOnForbidden(t *testing.T) {
return true, nil, errors.NewForbidden(kapi.Resource("Route"), "route1", nil)
})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
err := admitter.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -222,7 +244,7 @@ func TestStatusBackoffOnConflict(t *testing.T) {
return true, nil, errors.NewConflict(kapi.Resource("Route"), "route1", nil)
})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
err := admitter.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -251,7 +273,7 @@ func TestStatusRecordRejection(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
admitter.RecordRouteRejection(&routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -281,7 +303,7 @@ func TestStatusRecordRejectionNoChange(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
admitter.RecordRouteRejection(&routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -316,7 +338,7 @@ func TestStatusRecordRejectionWithStatus(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
admitter.RecordRouteRejection(&routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -361,7 +383,7 @@ func TestStatusRecordRejectionOnHostUpdateOnly(t *testing.T) {
p := &fakePlugin{}
c := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
admitter.RecordRouteRejection(&routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -417,7 +439,7 @@ func TestStatusRecordRejectionConflict(t *testing.T) {
return true, nil, errors.NewConflict(kapi.Resource("Route"), "route1", nil)
})
tracker := &fakeTracker{}
admitter := NewStatusAdmitter(p, c.Route(), "test", "", tracker)
admitter := NewStatusAdmitter(p, c.Route(), "test", "", noopLease{}, tracker)
admitter.RecordRouteRejection(&routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand Down Expand Up @@ -465,7 +487,7 @@ func TestStatusFightBetweenReplicas(t *testing.T) {
nowFn = func() metav1.Time { return now1 }
c1 := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker1 := &fakeTracker{}
admitter1 := NewStatusAdmitter(p, c1.Route(), "test", "", tracker1)
admitter1 := NewStatusAdmitter(p, c1.Route(), "test", "", noopLease{}, tracker1)
err := admitter1.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route1.test.local"},
Expand All @@ -482,7 +504,7 @@ func TestStatusFightBetweenReplicas(t *testing.T) {
nowFn = func() metav1.Time { return now2 }
c2 := fake.NewSimpleClientset(&routeapi.Route{ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")}})
tracker2 := &fakeTracker{}
admitter2 := NewStatusAdmitter(p, c2.Route(), "test", "", tracker2)
admitter2 := NewStatusAdmitter(p, c2.Route(), "test", "", noopLease{}, tracker2)
outObj1.Spec.Host = "route1.test-new.local"
err = admitter2.HandleRoute(watch.Added, outObj1)

Expand Down Expand Up @@ -527,7 +549,7 @@ func TestStatusFightBetweenRouters(t *testing.T) {
return false, nil, nil
})
tracker := &fakeTracker{}
admitter1 := NewStatusAdmitter(p, c1.Route(), "test2", "", tracker)
admitter1 := NewStatusAdmitter(p, c1.Route(), "test2", "", noopLease{}, tracker)
err := admitter1.HandleRoute(watch.Added, &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: "route1", Namespace: "default", UID: types.UID("uid1")},
Spec: routeapi.RouteSpec{Host: "route2.test-new.local"},
Expand Down Expand Up @@ -674,10 +696,10 @@ func TestProtractedStatusFightBetweenRouters(t *testing.T) {
// NB: contention period is 1 minute
t1, t2, t3, t4 := NewSimpleContentionTracker(time.Minute), NewSimpleContentionTracker(time.Minute), NewSimpleContentionTracker(time.Minute), NewSimpleContentionTracker(time.Minute)

oldAdmitter1 := NewStatusAdmitter(p, nil, "test", "", t1)
oldAdmitter2 := NewStatusAdmitter(p, nil, "test", "", t2)
newAdmitter1 := NewStatusAdmitter(p, nil, "test", "", t3)
newAdmitter2 := NewStatusAdmitter(p, nil, "test", "", t4)
oldAdmitter1 := NewStatusAdmitter(p, nil, "test", "", noopLease{}, t1)
oldAdmitter2 := NewStatusAdmitter(p, nil, "test", "", noopLease{}, t2)
newAdmitter1 := NewStatusAdmitter(p, nil, "test", "", noopLease{}, t3)
newAdmitter2 := NewStatusAdmitter(p, nil, "test", "", noopLease{}, t4)

t.Logf("Setup up the two 'old' routers")
currObj := makePass(t, oldHost, oldAdmitter1, initObj, true, false)
Expand Down
Loading

0 comments on commit 60614d4

Please sign in to comment.