Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the router reload suppression so that it doesn't block updates #17049

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/router/template/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ package templaterouter
func NewFakeTemplateRouter() *templateRouter {
fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{})
return &templateRouter{
state: map[string]ServiceAliasConfig{},
serviceUnits: make(map[string]ServiceUnit),
certManager: fakeCertManager,
rateLimitedCommitFunction: nil,
rateLimitedCommitStopChannel: make(chan struct{}),
state: map[string]ServiceAliasConfig{},
serviceUnits: make(map[string]ServiceUnit),
certManager: fakeCertManager,
rateLimitedCommitFunction: nil,
}
}

Expand Down
144 changes: 144 additions & 0 deletions pkg/router/template/limiter/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package limiter

import (
"sync"
"time"

"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

// HandlerFunc defines function signature for a CoalescingSerializingRateLimiter.
type HandlerFunc func() error

// CoalescingSerializingRateLimiter guarantees that calls will not happen to the given function
// more frequently than the given interval, and it guarantees that only one call will happen at a time.
// The calls are not queued, i.e. if you make 5 calls to RegisterChange(), it does not guarantee that the
// handler will be invoked 5 times, it merely guarantees it will be invoked once, and no more often than
// the rate.
// The calls to the handler will happen in the background and are expected to do their own locking if needed.
type CoalescingSerializingRateLimiter struct {
// handlerFunc is the function to rate limit and seriaize calls to.
handlerFunc HandlerFunc

// callInterval is the minimum time between the starts of handler calls.
callInterval time.Duration

// lastStart is the time the last run of the handler started.
lastStart time.Time

// changeReqTime is nil if no change has been registered since the last handler run completed, otherwise it is the
// time last change was registered.
changeReqTime *time.Time

// handlerRunning indicates whether the Handler is actively running.
handlerRunning bool

// lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once.
lock sync.Mutex

// callbackTimer is the timer we use to make callbacks to re-run the function to decide if we need to do work.
callbackTimer *time.Timer
}

func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc HandlerFunc) *CoalescingSerializingRateLimiter {
limiter := &CoalescingSerializingRateLimiter{
handlerFunc: handlerFunc,
callInterval: interval,
lastStart: time.Time{},
changeReqTime: nil,
handlerRunning: false,
}

return limiter
}

// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within
// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will
// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will
// only run once when the time allows it.
func (csrl *CoalescingSerializingRateLimiter) RegisterChange() {
glog.V(8).Infof("RegisterChange called")

csrl.changeWorker(true)
}

func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) {
csrl.lock.Lock()
defer csrl.lock.Unlock()

glog.V(8).Infof("changeWorker called")

if userChanged && csrl.changeReqTime == nil {
// They just registered a change manually (and we aren't in the middle of a change)
now := time.Now()
csrl.changeReqTime = &now
}

if csrl.handlerRunning {
// We don't need to do anything else... there's a run in progress, and when it is done it will re-call this function at which point the work will then happen
glog.V(8).Infof("The handler was already running (%v) started at %s, returning from the worker", csrl.handlerRunning, csrl.lastStart.String())
return
}

if csrl.changeReqTime == nil {
// There's no work queued so we have nothing to do. We should only get here when
// the function is re-called after a reload
glog.V(8).Infof("No invoke requested time, so there's no queued work. Nothing to do.")
return
}

// There is no handler running, let's see if we should run yet, or schedule a callback
now := time.Now()
sinceLastRun := now.Sub(csrl.lastStart)
untilNextCallback := csrl.callInterval - sinceLastRun
glog.V(8).Infof("Checking reload; now: %v, lastStart: %v, sinceLast %v, limit %v, remaining %v", now, csrl.lastStart, sinceLastRun, csrl.callInterval, untilNextCallback)

if untilNextCallback > 0 {
// We want to reload... but can't yet because some window is not satisfied
if csrl.callbackTimer == nil {
csrl.callbackTimer = time.AfterFunc(untilNextCallback, func() { csrl.changeWorker(false) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we should have a for loop instead of a recursive call, just to avoid the remote possibility of constant changes causing stackoverflow.

} else {
// While we are resetting the timer, it should have fired and be stopped.
// The first time the worker is called it will know the precise duration
// until when a run would be valid and has scheduled a timer for that point
csrl.callbackTimer.Reset(untilNextCallback)
}

glog.V(8).Infof("Can't invoke the handler yet, need to delay %s, callback scheduled", untilNextCallback.String())

return
}

// Otherwise we can reload immediately... let's do it!
glog.V(8).Infof("Calling the handler function (for invoke time %v)", csrl.changeReqTime)
csrl.handlerRunning = true
csrl.changeReqTime = nil
csrl.lastStart = now

// Go run the handler so we don't block the caller
go csrl.runHandler()

return
}

func (csrl *CoalescingSerializingRateLimiter) runHandler() {
// Call the handler, but do it in its own function so we can cleanup in case the handler panics
runHandler := func() error {
defer func() {
csrl.lock.Lock()
csrl.handlerRunning = false
csrl.lock.Unlock()
}()

return csrl.handlerFunc()
}
if err := runHandler(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The func variable is the same as the function it resides in. Maybe choose a different name here. Spun me around a bit because I thought we are making a recursive call :).

utilruntime.HandleError(err)
}

// Re-call the commit in case there is work waiting that came in while we were working
// we want to call the top level commit in case the state has not changed
glog.V(8).Infof("Re-Calling the worker after a reload in case work came in")
csrl.changeWorker(false)
}
75 changes: 75 additions & 0 deletions pkg/router/template/limiter/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package limiter

import (
"fmt"
"sync"
"testing"
"time"
)

type handler struct {
count int
sync.Mutex
}

func (h *handler) handle() error {
h.Lock()
defer h.Unlock()
h.count += 1
return nil
}

func (h *handler) counter() int {
h.Lock()
defer h.Unlock()
return h.count
}

func TestCoalescingSerializingRateLimiter(t *testing.T) {

fmt.Println("start")

tests := []struct {
Name string
Interval time.Duration
Times int
}{
{
Name: "3PO",
Interval: 3 * time.Second,
Times: 10,
},
{
Name: "five-fer",
Interval: 5 * time.Second,
Times: 20,
},
{
Name: "longjob",
Interval: 2 * time.Second,
Times: 20,
},
}

for _, tc := range tests {
h := &handler{}
rlf := NewCoalescingSerializingRateLimiter(tc.Interval, h.handle)

for i := 0; i < tc.Times; i++ {
fmt.Println("start")
rlf.RegisterChange()
fmt.Println("end")
}

select {
case <-time.After(tc.Interval + 2*time.Second):
fmt.Println("after")

counter := h.counter()
if tc.Interval > 0 && counter >= tc.Times/2 {
t.Errorf("For coalesced calls, expected number of invocations to be at least half. Expected: < %v Got: %v",
tc.Times/2, counter)
}
}
}
}
25 changes: 8 additions & 17 deletions pkg/router/template/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
cmdutil "github.com/openshift/origin/pkg/cmd/util"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
"github.com/openshift/origin/pkg/router/controller"
"github.com/openshift/origin/pkg/util/ratelimiter"
"github.com/openshift/origin/pkg/router/template/limiter"
)

const (
Expand Down Expand Up @@ -88,9 +88,7 @@ type templateRouter struct {
allowWildcardRoutes bool
// rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend)
// function that coalesces and controls how often the router is reloaded.
rateLimitedCommitFunction *ratelimiter.RateLimitedFunction
// rateLimitedCommitStopChannel is the stop/terminate channel.
rateLimitedCommitStopChannel chan struct{}
rateLimitedCommitFunction *limiter.CoalescingSerializingRateLimiter
// lock is a mutex used to prevent concurrent router reloads.
lock sync.Mutex
// If true, haproxy should only bind ports when it has route and endpoint state
Expand Down Expand Up @@ -206,12 +204,10 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
metricReload: metricsReload,
metricWriteConfig: metricWriteConfig,

rateLimitedCommitFunction: nil,
rateLimitedCommitStopChannel: make(chan struct{}),
rateLimitedCommitFunction: nil,
}

numSeconds := int(cfg.reloadInterval.Seconds())
router.EnableRateLimiter(numSeconds, router.commitAndReload)
router.EnableRateLimiter(cfg.reloadInterval, router.commitAndReload)

if err := router.writeDefaultCert(); err != nil {
return nil, err
Expand All @@ -227,14 +223,9 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
return router, nil
}

func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) {
keyFunc := func(_ interface{}) (string, error) {
return "templaterouter", nil
}

r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc)
r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel)
glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval)
func (r *templateRouter) EnableRateLimiter(interval time.Duration, handlerFunc limiter.HandlerFunc) {
r.rateLimitedCommitFunction = limiter.NewCoalescingSerializingRateLimiter(interval, handlerFunc)
glog.V(2).Infof("Template router will coalesce reloads within %s of each other", interval.String())
}

// secretToPem composes a PEM file at the output directory from an input private key and crt file.
Expand Down Expand Up @@ -327,7 +318,7 @@ func (r *templateRouter) Commit() {
r.lock.Unlock()

if needsCommit {
r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction)
r.rateLimitedCommitFunction.RegisterChange()
}
}

Expand Down
11 changes: 9 additions & 2 deletions test/end-to-end/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,10 +1348,15 @@ func createAndStartRouterContainerExtended(dockerCli *dockerClient.Client, maste
hostVols = append(hostVols, fmt.Sprintf("%[1]s:/usr/bin/openshift", binary))
}

logLevel := os.Getenv("OPENSHIFT_LOG_LEVEL")
if len(logLevel) == 0 {
logLevel = "4"
}

containerOpts := dockerClient.CreateContainerOptions{
Config: &dockerClient.Config{
Image: getRouterImage(),
Cmd: []string{"--master=" + masterIp, "--loglevel=4"},
Cmd: []string{"--master=" + masterIp, "--loglevel=" + logLevel},
Env: env,
ExposedPorts: exposedPorts,
VolumesFrom: vols,
Expand Down Expand Up @@ -1414,8 +1419,10 @@ func validateServer(server *tr.TestHttpService, t *testing.T) {

// cleanUp stops and removes the deployed router
func cleanUp(t *testing.T, dockerCli *dockerClient.Client, routerId string) {
getAllLogs, _ := strconv.ParseBool(os.Getenv("OPENSHIFT_GET_ALL_DOCKER_LOGS"))

dockerCli.StopContainer(routerId, 5)
if t.Failed() {
if t.Failed() || getAllLogs {
dockerCli.Logs(dockerClient.LogsOptions{
Container: routerId,
OutputStream: os.Stdout,
Expand Down
6 changes: 3 additions & 3 deletions test/integration/router_without_haproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func stressRouter(t *testing.T, namespaceCount, routesPerNamespace, routerCount,
plugins := []*templateplugin.TemplatePlugin{}

// Don't coalesce reloads to validate reload suppression during sync.
reloadInterval := 0
reloadInterval := 0 * time.Second

// Track reload counts indexed by router name.
reloadedMap := make(map[string]int)
Expand Down Expand Up @@ -364,7 +364,7 @@ func (p *DelayPlugin) Commit() error {

// launchRateLimitedRouter launches a rate-limited template router
// that communicates with the api via the provided clients.
func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval int, reloadedMap map[string]int) *templateplugin.TemplatePlugin {
func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, kc kclientset.Interface, name string, maxDelay int32, reloadInterval time.Duration, reloadedMap map[string]int) *templateplugin.TemplatePlugin {
reloadedMap[name] = 0
rateLimitingFunc := func() error {
reloadedMap[name] += 1
Expand All @@ -385,7 +385,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In
return templatePlugin
}

func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) {
func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval time.Duration, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) {
r := templateplugin.NewFakeTemplateRouter()

r.EnableRateLimiter(reloadInterval, func() error {
Expand Down