Skip to content

Commit

Permalink
Perform real backoff when contending for writes from the router
Browse files Browse the repository at this point in the history
The current route backoff mechanism works by tracking the last touch
time from other routes, but this is prone to failure and will not scale
to very large sets of routers competing for updating status.

Instead, treat the ability to write status as a lease renewal, and have
failure to write status as a cue to backoff. Each new write further
increases the lease confidence up to an interval. Treat observed writes
from other processes as a signal that the lease holder is maintaining
their lease.
  • Loading branch information
smarterclayton committed Feb 22, 2018
1 parent bfdca25 commit b1f1f97
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 215 deletions.
7 changes: 6 additions & 1 deletion pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"

Expand All @@ -20,6 +22,7 @@ import (
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
"github.com/openshift/origin/pkg/router/controller"
f5plugin "github.com/openshift/origin/pkg/router/f5"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -230,7 +233,9 @@ func (o *F5RouterOptions) Run() error {
return err
}

statusPlugin := controller.NewStatusAdmitter(f5Plugin, routeclient.Route(), o.RouterName, "")
lease := writerlease.New(time.Minute, time.Second)
go lease.Run(wait.NeverStop)
statusPlugin := controller.NewStatusAdmitter(f5Plugin, routeclient.Route(), o.RouterName, "", lease)
uniqueHostPlugin := controller.NewUniqueHost(statusPlugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, statusPlugin)
plugin := controller.NewHostAdmitter(uniqueHostPlugin, o.F5RouteAdmitterFunc(), false, o.RouterSelection.DisableNamespaceOwnershipCheck, statusPlugin)

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/infra/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (o *RouterSelection) AdmissionCheck(route *routeapi.Route) error {
return fmt.Errorf("host not in the allowed list of domains")
}

glog.V(4).Infof("host %s admitted", route.Spec.Host)
glog.V(4).Infof("host %s passed admission", route.Spec.Host)
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/openshift/origin/pkg/router/metrics/haproxy"
templateplugin "github.com/openshift/origin/pkg/router/template"
"github.com/openshift/origin/pkg/util/proc"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -421,7 +423,9 @@ func (o *TemplateRouterOptions) Run() error {
return err
}

statusPlugin := controller.NewStatusAdmitter(templatePlugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname)
lease := writerlease.New(time.Minute, time.Second)
go lease.Run(wait.NeverStop)
statusPlugin := controller.NewStatusAdmitter(templatePlugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease)
var nextPlugin router.Plugin = statusPlugin
if o.ExtendedValidation {
nextPlugin = controller.NewExtendedValidator(nextPlugin, controller.RejectionRecorder(statusPlugin))
Expand Down
214 changes: 74 additions & 140 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -17,6 +16,7 @@ import (
routeapi "github.com/openshift/origin/pkg/route/apis/route"
client "github.com/openshift/origin/pkg/route/generated/internalclientset/typed/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/util/writerlease"
)

// RejectionRecorder is an object capable of recording why a route was rejected
Expand All @@ -30,25 +30,20 @@ type StatusAdmitter struct {
client client.RoutesGetter
routerName string
routerCanonicalHostname string

contentionInterval time.Duration
expected *lru.Cache
lease writerlease.Lease
}

// NewStatusAdmitter creates a plugin wrapper that ensures every accepted
// route has a status field set that matches this router. The admitter manages
// an LRU of recently seen conflicting updates to handle when two router processes
// with differing configurations are writing updates at the same time.
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string) *StatusAdmitter {
expected, _ := lru.New(1024)
func NewStatusAdmitter(plugin router.Plugin, client client.RoutesGetter, name, hostName string, lease writerlease.Lease) *StatusAdmitter {
return &StatusAdmitter{
plugin: plugin,
client: client,
routerName: name,
routerCanonicalHostname: hostName,

contentionInterval: 1 * time.Minute,
expected: expected,
lease: lease,
}
}

Expand Down Expand Up @@ -148,7 +143,7 @@ func ingressConditionTouched(ingress *routeapi.RouteIngress) *metav1.Time {

// recordIngressConditionFailure updates the matching ingress on the route (or adds a new one) with the specified
// condition, returning true if the object was modified.
func recordIngressConditionFailure(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) (*routeapi.RouteIngress, bool, *metav1.Time) {
func recordIngressConditionFailure(route *routeapi.Route, name, hostName string, condition routeapi.RouteIngressCondition) bool {
for i := range route.Status.Ingress {
existing := &route.Status.Ingress[i]
if existing.RouterName != name {
Expand All @@ -164,111 +159,44 @@ func recordIngressConditionFailure(route *routeapi.Route, name, hostName string,
// ...or replaced the entire condition list
// (NB: order matters in this OR -- short circuiting)
changed = setIngressCondition(existing, condition) || changed

lastTouch := ingressConditionTouched(existing)
return existing, changed, lastTouch
return changed
}
route.Status.Ingress = append(route.Status.Ingress, routeapi.RouteIngress{RouterName: name, RouterCanonicalHostname: hostName, Host: route.Spec.Host})
ingress := &route.Status.Ingress[len(route.Status.Ingress)-1]
setIngressCondition(ingress, condition)
return ingress, true, nil
}

// hasIngressBeenTouched returns true if the route appears to have been touched since the last time
func (a *StatusAdmitter) hasIngressBeenTouched(route *routeapi.Route, lastTouch *metav1.Time) bool {
glog.V(4).Infof("has last touch %v for %s/%s", lastTouch, route.Namespace, route.Name)
if lastTouch.IsZero() {
return false
}
old, ok := a.expected.Get(route.UID)
if ok && old.(time.Time).Before(nowFn().Add(-a.contentionInterval)) {
// throw out cache entries from before the contention interval, in case this is no longer valid
// (e.g. the previous updater no longer exists due to scale down)
glog.V(4).Infof("expired cached last touch of %s", old.(time.Time))
a.expected.Remove(route.UID)
ok = false
}

if !ok || old.(time.Time).Equal(lastTouch.Time) {
glog.V(4).Infof("missing or equal cached last touch")
return false
}
glog.V(4).Infof("different cached last touch of %s", old.(time.Time))
return true
}

// recordIngressTouch tracks whether the ingress record updated succeeded and returns true if the admitter can
// continue. Conflict errors are treated as no error, but indicate the touch was not successful and the caller
// should retry.
func (a *StatusAdmitter) recordIngressTouch(route *routeapi.Route, touch *metav1.Time, oldTouch *metav1.Time, err error) (bool, error) {
switch {
case err == nil:
if touch != nil {
a.expected.Add(route.UID, touch.Time)
}
return true, nil
// if the router can't write status updates, allow the route to go through
case errors.IsForbidden(err):
glog.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err)
if oldTouch != nil {
// record oldTouch so that if the problem gets rectified in the future,
// we can proceed as normal
a.expected.Add(route.UID, oldTouch.Time)
// HandleRoute attempts to admit the provided route on watch add / modifications.
func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi.Route) error {
if IsGeneratedRouteName(route.Name) {
// Can't record status for ingress resources
} else {
switch eventType {
case watch.Added, watch.Modified:
if ok := setRouteAdmitted(a.lease, a.client, route, a.routerName, a.routerCanonicalHostname); !ok {
glog.V(4).Infof("skipping route: %s", route.Name)
return nil
}
}
return true, nil
case errors.IsConflict(err):
// just follow the normal process, and retry when we receive the update notification due to
// the other entity updating the route.
return false, nil
}
return false, err
return a.plugin.HandleRoute(eventType, route)
}

// admitRoute returns true if the route has already been accepted to this router, or
// updates the route to contain an accepted condition. Returns an error if the route could
// not be admitted due to a failure, or false if the route can't be admitted at this time.
func (a *StatusAdmitter) admitRoute(oc client.RoutesGetter, route *routeapi.Route, name, hostName string) (bool, error) {
ingress, updated := findOrCreateIngress(route, name, hostName)

// keep lastTouch around
lastTouch := ingressConditionTouched(ingress)

if !updated {
for i := range ingress.Conditions {
cond := &ingress.Conditions[i]
if cond.Type == routeapi.RouteAdmitted && cond.Status == kapi.ConditionTrue {
// reduce extra round trips during the contention period by remembering this
// time, so we don't react later
if _, ok := a.expected.Get(route.UID); !ok {
a.expected.Add(route.UID, lastTouch.Time)
}
glog.V(4).Infof("admit: route already admitted")
return true, nil
}
}
}
func (a *StatusAdmitter) HandleNode(eventType watch.EventType, node *kapi.Node) error {
return a.plugin.HandleNode(eventType, node)
}

// this works by keeping a cache of what time we last touched the route.
// If the recorded last-touch time matches ours, then we were the ones to do the
// last update, and can continue forth. Additionally, if we have no entry in our
// cache, we continue forward anyways. Since replicas from a new deployment will
// have no entry, they will update the last-touch time, and therefore take "ownership"
// of updating the route. In the case of a new route being created during a rolling update,
// there will be a race to determine whether the old or new deployment gets to determine,
// but this will be corrected on the next event after contentionInterval time.
func (a *StatusAdmitter) HandleEndpoints(eventType watch.EventType, route *kapi.Endpoints) error {
return a.plugin.HandleEndpoints(eventType, route)
}

if a.hasIngressBeenTouched(route, lastTouch) {
glog.V(4).Infof("admit: observed a route update from someone else: route %s/%s has been updated to an inconsistent value, doing nothing", route.Namespace, route.Name)
return true, nil
}
func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
return a.plugin.HandleNamespaces(namespaces)
}

setIngressCondition(ingress, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionTrue,
})
glog.V(4).Infof("admit: admitting route by updating status: %s (%t): %s", route.Name, updated, route.Spec.Host)
_, err := oc.Routes(route.Namespace).UpdateStatus(route)
return a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, lastTouch, err)
func (a *StatusAdmitter) Commit() error {
return a.plugin.Commit()
}

// RecordRouteRejection attempts to update the route status with a reason for a route being rejected.
Expand All @@ -278,7 +206,7 @@ func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, mes
return
}

ingress, changed, lastTouch := recordIngressConditionFailure(route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
changed := recordIngressConditionFailure(route, a.routerName, a.routerCanonicalHostname, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionFalse,
Reason: reason,
Expand All @@ -289,50 +217,56 @@ func (a *StatusAdmitter) RecordRouteRejection(route *routeapi.Route, reason, mes
return
}

if a.hasIngressBeenTouched(route, lastTouch) {
glog.V(4).Infof("reject: observed a route update from someone else: route %s/%s has been updated to an inconsistent value, doing nothing", route.Namespace, route.Name)
return
}

_, err := a.client.Routes(route.Namespace).UpdateStatus(route)
_, err = a.recordIngressTouch(route, ingress.Conditions[0].LastTransitionTime, lastTouch, err)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to write route rejection to the status: %v", err))
}
a.lease.Try(leaseWork(a.client, route))
}

// HandleRoute attempts to admit the provided route on watch add / modifications.
func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi.Route) error {
if IsGeneratedRouteName(route.Name) {
// Can't record status for ingress resources
} else {
switch eventType {
case watch.Added, watch.Modified:
ok, err := a.admitRoute(a.client, route, a.routerName, a.routerCanonicalHostname)
if err != nil {
return err
}
if !ok {
glog.V(4).Infof("skipping route: %s", route.Name)
return nil
// setRouteAdmitted updates the route status with a condition indicating the current router
// has admitted the route.
func setRouteAdmitted(lease writerlease.Lease, oc client.RoutesGetter, route *routeapi.Route, name, hostName string) bool {
ingress, updated := findOrCreateIngress(route, name, hostName)

// check to see if we already have admitted the route
if !updated {
for i := range ingress.Conditions {
cond := &ingress.Conditions[i]
if cond.Type == routeapi.RouteAdmitted && cond.Status == kapi.ConditionTrue {
lease.Extend(fmt.Sprintf("%s/%s", route.Namespace, route.Name))
glog.V(4).Infof("admit: route status already admitted")
return true
}
}
}
return a.plugin.HandleRoute(eventType, route)
}

func (a *StatusAdmitter) HandleNode(eventType watch.EventType, node *kapi.Node) error {
return a.plugin.HandleNode(eventType, node)
}

func (a *StatusAdmitter) HandleEndpoints(eventType watch.EventType, route *kapi.Endpoints) error {
return a.plugin.HandleEndpoints(eventType, route)
}
setIngressCondition(ingress, routeapi.RouteIngressCondition{
Type: routeapi.RouteAdmitted,
Status: kapi.ConditionTrue,
})

func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
return a.plugin.HandleNamespaces(namespaces)
lease.Try(leaseWork(oc, route))
leader, ok := lease.WaitUntil(10 * time.Second)
if !ok {
glog.V(4).Infof("admit: did not update route status within interval, rejecting route until next resync")
}
return leader
}

func (a *StatusAdmitter) Commit() error {
return a.plugin.Commit()
func leaseWork(oc client.RoutesGetter, route *routeapi.Route) (string, writerlease.WorkFunc) {
key := fmt.Sprintf("%s/%s", route.Namespace, route.Name)
return key, writerlease.LimitRetries(3, func() (bool, bool) {
glog.V(4).Infof("admit: updating status on route: %s: %s", route.Name, route.Spec.Host)
switch _, err := oc.Routes(route.Namespace).UpdateStatus(route); {
case err == nil:
return true, false
case errors.IsForbidden(err):
// if the router can't write status updates, allow the route to go through
utilruntime.HandleError(fmt.Errorf("Unable to write router status - please ensure you reconcile your system policy or grant this router access to update route status: %v", err))
return true, false
case errors.IsConflict(err):
// just follow the normal process, and retry when we receive the update notification due to
// the other entity updating the route.
return false, false
default:
utilruntime.HandleError(fmt.Errorf("Unable to write router status: %v", err))
return false, true
}
})
}
Loading

0 comments on commit b1f1f97

Please sign in to comment.