Skip to content

Commit

Permalink
Merge pull request #18686 from smarterclayton/router_push
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 18686, 18998).

Perform real backoff when contending for writes from the router

The current route backoff mechanism works by tracking the last touch
time from other routes, but this is prone to failure and will not scale
to very large sets of routers competing for updating status.

Instead, treat the ability to write status as a lease renewal, and have
failure to write status as a cue to backoff. Each new write further
increases the lease confidence up to an interval. Treat observed writes
from other processes as a signal that the lease holder is maintaining
their lease.

This should allow route status updates to be scale free

Needs an e2e test still

@openshift/sig-networking @ramr
  • Loading branch information
openshift-merge-robot authored Mar 17, 2018
2 parents d4dc5b7 + 73905e4 commit c6d8a92
Show file tree
Hide file tree
Showing 14 changed files with 1,573 additions and 351 deletions.
12 changes: 10 additions & 2 deletions pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"

Expand All @@ -21,6 +23,7 @@ import (
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
f5plugin "github.com/openshift/origin/pkg/router/f5"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -228,10 +231,15 @@ func (o *F5RouterOptions) Run() error {
return err
}

var recorder controller.RejectionRecorder = controller.LogRejections
var plugin router.Plugin = f5Plugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname)
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/cmd/infra/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ func (o *RouterSelection) AdmissionCheck(route *routeapi.Route) error {
glog.V(4).Infof("host %s rejected - not in the list of allowed domains", route.Spec.Host)
return fmt.Errorf("host not in the allowed list of domains")
}

glog.V(4).Infof("host %s admitted", route.Spec.Host)
return nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/openshift/origin/pkg/router/metrics/haproxy"
templateplugin "github.com/openshift/origin/pkg/router/template"
"github.com/openshift/origin/pkg/util/proc"
"github.com/openshift/origin/pkg/util/writerlease"
"github.com/openshift/origin/pkg/version"
)

Expand Down Expand Up @@ -404,16 +406,21 @@ func (o *TemplateRouterOptions) Run() error {
return err
}

svcFetcher := templateplugin.NewListWatchServiceLookup(kc.Core(), 10*time.Minute)
svcFetcher := templateplugin.NewListWatchServiceLookup(kc.Core(), o.ResyncInterval)
templatePlugin, err := templateplugin.NewTemplatePlugin(pluginCfg, svcFetcher)
if err != nil {
return err
}

var recorder controller.RejectionRecorder = controller.LogRejections
var plugin router.Plugin = templatePlugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname)
lease := writerlease.New(time.Minute, 3*time.Second)
go lease.Run(wait.NeverStop)
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/router/controller/router_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,10 @@ func (c *RouterController) Commit() {
func (c *RouterController) processRoute(eventType watch.EventType, route *routeapi.Route) {
glog.V(4).Infof("Processing Route: %s/%s -> %s", route.Namespace, route.Name, route.Spec.To.Name)
glog.V(4).Infof(" Alias: %s", route.Spec.Host)
glog.V(4).Infof(" Path: %s", route.Spec.Path)
glog.V(4).Infof(" Event: %s", eventType)
if len(route.Spec.Path) > 0 {
glog.V(4).Infof(" Path: %s", route.Spec.Path)
}
glog.V(4).Infof(" Event: %s rv=%s", eventType, route.ResourceVersion)

c.RecordNamespaceRoutes(eventType, route)
if err := c.Plugin.HandleRoute(eventType, route); err != nil {
Expand Down
Loading

0 comments on commit c6d8a92

Please sign in to comment.