Skip to content

Commit

Permalink
Avoid some status updates from being delayed to next sync
Browse files Browse the repository at this point in the history
The writerlease is a work queue, but we were exiting immediately on
conflicts. This is not our normal pattern, which is to build a work
queue and then resync from the latest cache state. Change how status.go
queues up work so that we perform our retry inside the lease function.
Should ensure that the correct output is eventually written.
  • Loading branch information
smarterclayton committed Apr 11, 2018
1 parent e2eeb99 commit d44d476
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 164 deletions.
7 changes: 5 additions & 2 deletions pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
f5plugin "github.com/openshift/origin/pkg/router/f5"
Expand Down Expand Up @@ -231,6 +232,8 @@ func (o *F5RouterOptions) Run() error {
return err
}

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)

var plugin router.Plugin = f5Plugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
Expand All @@ -239,7 +242,8 @@ func (o *F5RouterOptions) Run() error {
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand All @@ -249,7 +253,6 @@ func (o *F5RouterOptions) Run() error {
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewHostAdmitter(plugin, o.F5RouteAdmitterFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
watchNodes := (len(o.InternalAddress) != 0 && len(o.VxlanGateway) != 0)
controller := factory.Create(plugin, watchNodes)
controller.Run()
Expand Down
7 changes: 5 additions & 2 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cmdversion "github.com/openshift/origin/pkg/cmd/version"
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
"github.com/openshift/origin/pkg/router/metrics"
Expand Down Expand Up @@ -412,6 +413,8 @@ func (o *TemplateRouterOptions) Run() error {
return err
}

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)

var plugin router.Plugin = templatePlugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
Expand All @@ -420,7 +423,8 @@ func (o *TemplateRouterOptions) Run() error {
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand All @@ -430,7 +434,6 @@ func (o *TemplateRouterOptions) Run() error {
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewHostAdmitter(plugin, o.RouteAdmissionFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
controller := factory.Create(plugin, false)
controller.Run()

Expand Down
25 changes: 15 additions & 10 deletions pkg/router/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes bool)

func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.createNamespacesSharedInformer(rc)
f.createNamespacesSharedInformer()
}
f.createEndpointsSharedInformer(rc)
f.createRoutesSharedInformer(rc)
f.createEndpointsSharedInformer()
f.CreateRoutesSharedInformer()

if rc.WatchNodes {
f.createNodesSharedInformer(rc)
f.createNodesSharedInformer()
}

// Start informers
Expand Down Expand Up @@ -187,7 +187,7 @@ func (f *RouterControllerFactory) setSelectors(options *v1.ListOptions) {
options.FieldSelector = f.FieldSelector
}

func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createEndpointsSharedInformer() {
// we do not scope endpoints by labels or fields because the route labels != endpoints labels
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
Expand All @@ -204,7 +204,13 @@ func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontro
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createRoutesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) CreateRoutesSharedInformer() kcache.SharedIndexInformer {
rt := &routeapi.Route{}
objType := reflect.TypeOf(rt)
if informer, ok := f.informers[objType]; ok {
return informer
}

lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
f.setSelectors(&options)
Expand All @@ -221,22 +227,21 @@ func (f *RouterControllerFactory) createRoutesSharedInformer(rc *routercontrolle
return f.RClient.Route().Routes(f.Namespace).Watch(options)
},
}
rt := &routeapi.Route{}
objType := reflect.TypeOf(rt)
indexers := kcache.Indexers{kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc}
informer := kcache.NewSharedIndexInformer(lw, rt, f.ResyncInterval, indexers)
f.informers[objType] = informer
return informer
}

func (f *RouterControllerFactory) createNodesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createNodesSharedInformer() {
// Use stock node informer as we don't need namespace/labels/fields filtering on nodes
ifactory := informerfactory.NewSharedInformerFactory(f.KClient, f.ResyncInterval)
informer := ifactory.Core().InternalVersion().Nodes().Informer()
objType := reflect.TypeOf(&kapi.Node{})
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createNamespacesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createNamespacesSharedInformer() {
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = f.NamespaceLabels.String()
Expand Down
124 changes: 66 additions & 58 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

routeapi "github.com/openshift/origin/pkg/route/apis/route"
client "github.com/openshift/origin/pkg/route/generated/internalclientset/typed/route/internalversion"
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/util/writerlease"
)
Expand All @@ -36,8 +37,10 @@ func (logRecorder) RecordRouteRejection(route *routeapi.Route, reason, message s

// StatusAdmitter ensures routes added to the plugin have status set.
type StatusAdmitter struct {
plugin router.Plugin
client client.RoutesGetter
plugin router.Plugin
client client.RoutesGetter
lister routelisters.RouteLister

routerName string
routerCanonicalHostname string

Expand All @@ -49,10 +52,12 @@ type StatusAdmitter struct {
// route has a status field set that matches this router. The admitter manages
// an LRU of recently seen conflicting updates to handle when two router processes
// with differing configurations are writing updates at the same time.
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string, lease writerlease.Lease, tracker ContentionTracker) *StatusAdmitter {
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, lister routelisters.RouteLister, name, hostName string, lease writerlease.Lease, tracker ContentionTracker) *StatusAdmitter {
return &StatusAdmitter{
plugin: plugin,
client: client,
plugin: plugin,
client: client,
lister: lister,

routerName: name,
routerCanonicalHostname: hostName,

Expand All @@ -74,7 +79,7 @@ var nowFn = getRfc3339Timestamp
func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi.Route) error {
switch eventType {
case watch.Added, watch.Modified:
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, a.lister, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionTrue,
})
Expand All @@ -100,51 +105,79 @@ func (a *StatusAdmitter) Commit() error {

// RecordRouteRejection attempts to update the route status with a reason for a route being rejected.
func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, message string) {
performIngressConditionUpdate("reject", a.lease, a.tracker, a.client, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
performIngressConditionUpdate("reject", a.lease, a.tracker, a.client, a.lister, route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
Reason: reason,
Message: message,
})
}

// performIngressConditionUpdate updates the route to the the appropriate status for the provided condition.
func performIngressConditionUpdate(action string, lease writerlease.Lease, tracker ContentionTracker, oc client.RoutesGetter, route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) {
// performIngressConditionUpdate updates the route to the appropriate status for the provided condition.
func performIngressConditionUpdate(action string, lease writerlease.Lease, tracker ContentionTracker, oc client.RoutesGetter, lister routelisters.RouteLister, route *routeapi.Route, routerName, hostName string, condition routeapi.RouteIngressCondition) {
attempts := 3
key := string(route.UID)
newestIngressName := findMostRecentIngress(route)
changed, created, now, latest := recordIngressCondition(route, name, hostName, condition)
if !changed {
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
tracker.Clear(key, latest)
// if the most recent change was to our ingress status, consider the current lease extended
if newestIngressName == name {
lease.Extend(key)
routeNamespace, routeName := route.Namespace, route.Name

lease.Try(key, func() (writerlease.WorkResult, bool) {
route, err := lister.Routes(routeNamespace).Get(routeName)
if err != nil {
return writerlease.None, false
}
if string(route.UID) != key {
glog.V(4).Infof("%s: skipped update due to route UID changing (likely delete and recreate): %s/%s", action, route.Namespace, route.Name)
return writerlease.None, false
}
return
}
// If the tracker determines that another process is attempting to update the ingress to an inconsistent
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
// with different configurations from endlessly updating the route status.
if !created && tracker.IsContended(key, now, latest) {
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
return
}

lease.Try(key, func() (bool, bool) {
updated := updateStatus(oc, route)
if updated {
route = route.DeepCopy()
changed, created, now, latest := recordIngressCondition(route, routerName, hostName, condition)
if !changed {
glog.V(4).Infof("%s: no changes to route needed: %s/%s", action, route.Namespace, route.Name)
tracker.Clear(key, latest)
} else {
glog.V(4).Infof("%s: did not update route status, skipping route until next resync", action)
// if the most recent change was to our ingress status, consider the current lease extended
if findMostRecentIngress(route) == routerName {
lease.Extend(key)
}
return writerlease.None, false
}

// If the tracker determines that another process is attempting to update the ingress to an inconsistent
// value, skip updating altogether and rely on the next resync to resolve conflicts. This prevents routers
// with different configurations from endlessly updating the route status.
if !created && tracker.IsContended(key, now, latest) {
glog.V(4).Infof("%s: skipped update due to another process altering the route with a different ingress status value: %s", action, key)
return writerlease.None, false
}

switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
case err == nil:
tracker.Clear(key, latest)
return writerlease.Extend, false
case errors.IsForbidden(err):
// if the router can't write status updates, allow the route to go through
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
tracker.Clear(key, latest)
return writerlease.Extend, false
case errors.IsNotFound(err):
// route was deleted
return writerlease.Release, false
case errors.IsConflict(err):
// just follow the normal process, and retry when we receive the update notification due to
// the other entity updating the route.
glog.V(4).Infof("%s: updating status of %s/%s failed due to write conflict", action, route.Namespace, route.Name)
return writerlease.Release, true
default:
utilruntime.HandleError(fmt.Errorf("Unable to write router status for %s/%s: %v", route.Namespace, route.Name, err))
attempts--
return writerlease.Release, attempts > 0
}
return updated, false
})
}

// recordIngressCondition updates the matching ingress on the route (or adds a new one) with the specified
// condition, returning whether the route was updated or created, the time assigned to the condition, and
// a pointer to the current ingress record.
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, _ time.Time, latest *routeapi.RouteIngress) {
func recordIngressCondition(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (changed, created bool, at time.Time, latest *routeapi.RouteIngress) {
for i := range route.Status.Ingress {
existing := &route.Status.Ingress[i]
if existing.RouterName != name {
Expand Down Expand Up @@ -227,31 +260,6 @@ func findCondition(ingress *routeapi.RouteIngress, t routeapi.RouteIngressCondit
return nil
}

func updateStatus(oc client.RoutesGetter, route *routeapi.Route) bool {
for i := 0; i < 3; i++ {
switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
case err == nil:
return true
case errors.IsNotFound(err):
// route was deleted
return false
case errors.IsForbidden(err):
// if the router can't write status updates, allow the route to go through
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
return true
case errors.IsConflict(err):
// just follow the normal process, and retry when we receive the update notification due to
// the other entity updating the route.
glog.V(4).Infof("updating status of %s/%s failed due to write conflict", route.Namespace, route.Name)
return false
default:
utilruntime.HandleError(fmt.Errorf("Unable to write router status for %s/%s: %v", route.Namespace, route.Name, err))
continue
}
}
return false
}

// ContentionTracker records modifications to a particular entry to prevent endless
// loops when multiple routers are configured with conflicting info. A given router
// process tracks whether the ingress status is change from a correct value to any
Expand Down
Loading

0 comments on commit d44d476

Please sign in to comment.