Skip to content

Commit

Permalink
Changed the router reload timing controls
Browse files Browse the repository at this point in the history
Mostly so that a router reload doesn't hold a lock of the router object for the duration of the reload.

But added other controls too... need to elaborate!

Commit()
    // When this is called:
    // - Acquire lock
    // - Set $needCommit state to Now
    // - Release lock
    // - Call bottom-half callback

commitWorker()
    // Bottom half callback:
    // - Acquire lock
    // - Check $commitRunning state
    //   - If running:
    //       - We are done... we will catch the new work when the job finishes
    //       - Release lock
    //       - Return
    //   - If not running, check times:
    //     - If one of:
    //         * time.Now() - $lastReloadStart < $minReloadGap      -- Overall start to start reload time
    //         * time.Now() - $lastReloadEnd   < $minReloadFreq     -- Gap between last reload end and next
    //         * time.Now() - $needCommit      < $reloadCoalesceDur -- Gather a few events before a reload
    //       Then:
    //       - Set $needCommit and trigger callback for remaining time above
    //       - Release lock
    //       - Return
    //     - Else, we can reload, so:
    //       - Set $commitRunning
    //       - Set $lastReloadStart
    //       - Clear $needCommit
    //       - Release lock
    //       - Call the reload function and wait
    //       - Acquire lock
    //       - Clear $commitRunning
    //       - Set $lastReloadEnd
    //       - Release lock
    //       - Call the callback function again to catch trailing work
    //       - Return
  • Loading branch information
knobunc committed Sep 27, 2017
1 parent b3475a3 commit cc3edd2
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 36 deletions.
14 changes: 8 additions & 6 deletions pkg/router/template/fake.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package templaterouter

import "time"

// NewFakeTemplateRouter provides an empty template router with a simple certificate manager
// backed by a fake cert writer for testing
func NewFakeTemplateRouter() *templateRouter {
fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{})
return &templateRouter{
state: map[string]ServiceAliasConfig{},
serviceUnits: make(map[string]ServiceUnit),
certManager: fakeCertManager,
rateLimitedCommitFunction: nil,
rateLimitedCommitStopChannel: make(chan struct{}),
state: map[string]ServiceAliasConfig{},
serviceUnits: make(map[string]ServiceUnit),
certManager: fakeCertManager,
}
}

// FakeReloadHandler implements the minimal changes needed to make the locking behavior work
// This MUST match the behavior with the stateChanged of commitAndReload
// This MUST match the behavior with the object updates of commitAndReload() in router.go
func (r *templateRouter) FakeReloadHandler() {
r.lock.Lock()
defer r.lock.Unlock()

r.stateChanged = false
r.lastReloadStart = time.Now()
r.lastReloadEnd = time.Now()

return
}
Expand Down
209 changes: 180 additions & 29 deletions pkg/router/template/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"

cmdutil "github.com/openshift/origin/pkg/cmd/util"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
"github.com/openshift/origin/pkg/router/controller"
"github.com/openshift/origin/pkg/util/ratelimiter"
)

const (
Expand Down Expand Up @@ -86,13 +86,23 @@ type templateRouter struct {
statsPort int
// if the router should allow wildcard routes.
allowWildcardRoutes bool
// rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend)
// function that coalesces and controls how often the router is reloaded.
rateLimitedCommitFunction *ratelimiter.RateLimitedFunction
// rateLimitedCommitStopChannel is the stop/terminate channel.
rateLimitedCommitStopChannel chan struct{}
// lock is a mutex used to prevent concurrent router reloads.
// lock is a mutex used to prevent concurrent router state updates
lock sync.Mutex
// Track the start and end of router reloads
lastReloadStart time.Time
lastReloadEnd time.Time
// commitReqTime is nil if no commit is needed, otherwise it is the time the commit was requested
commitReqTime *time.Time
// commitRunning indicates whether there is the commitFunc is actively running
commitRunning bool
// commitTimer is the timer we use to make callbacks to the delayed commits
commitTimer *time.Timer
// The minimum time between the starts of reloads
minReloadFreq time.Duration
// The minimum gap between the end of a reload and the start of the next
minReloadGap time.Duration
// How long to wait after an event before triggering a reload in case other events come from the same change (should be short... milliseconds)
reloadCoalesceDur time.Duration
// If true, haproxy should only bind ports when it has route and endpoint state
bindPortsAfterSync bool
// whether the router state has been read from the api at least once
Expand All @@ -103,6 +113,8 @@ type templateRouter struct {
metricReload prometheus.Summary
// metricWriteConfig tracks writing config
metricWriteConfig prometheus.Summary
// commitFunc is the commit (persist state + refresh the backend) function
commitFunc CommitFunc
}

// templateRouterCfg holds all configuration items required to initialize the template router
Expand Down Expand Up @@ -203,15 +215,24 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
peerEndpoints: []Endpoint{},
bindPortsAfterSync: cfg.bindPortsAfterSync,

commitReqTime: nil,
commitRunning: false,

// BBXXX Need to fix these to expose other values in the cfg rather than hard coding them
minReloadFreq: cfg.reloadInterval,
minReloadGap: 500 * time.Millisecond,
reloadCoalesceDur: 10 * time.Millisecond,

metricReload: metricsReload,
metricWriteConfig: metricWriteConfig,

rateLimitedCommitFunction: nil,
rateLimitedCommitStopChannel: make(chan struct{}),
}

numSeconds := int(cfg.reloadInterval.Seconds())
router.EnableRateLimiter(numSeconds, router.commitAndReload)
router.SetCommitFunc(func() error {
return router.commitAndReload()
})

// BBXXX Add the other reload params
glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", router.minReloadFreq)

if err := router.writeDefaultCert(); err != nil {
return nil, err
Expand All @@ -227,16 +248,6 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
return router, nil
}

func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) {
keyFunc := func(_ interface{}) (string, error) {
return "templaterouter", nil
}

r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc)
r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel)
glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval)
}

// secretToPem composes a PEM file at the output directory from an input private key and crt file.
func secretToPem(secPath, outName string) error {
// The secret, when present, is mounted on /etc/pki/tls/private
Expand Down Expand Up @@ -311,6 +322,13 @@ func (r *templateRouter) readState() error {
return json.Unmarshal(data, &r.state)
}

// Define a type for the pluggable commit function
type CommitFunc func() error

func (r *templateRouter) SetCommitFunc(commitFunc CommitFunc) {
r.commitFunc = commitFunc
}

// Commit applies the changes made to the router configuration - persists
// the state and refresh the backend. This is all done in the background
// so that we can rate limit + coalesce multiple changes.
Expand All @@ -324,18 +342,146 @@ func (r *templateRouter) Commit() {
r.stateChanged = true
}

needsCommit := r.stateChanged
// BBXXX

// Configuration Variables:
// - Max Reload Frequency (no more than every 10 seconds) -- exists today $minReloadFreq
// - Gaps between reloads (wait N seconds after a reload finishes) -- $minReloadGap
// - Bunch events by waiting a little before reload (wait N ms) -- $reloadCoalesce

if r.stateChanged {
// If the state changed, then we need to commit

if r.commitReqTime == nil {
// There is no scheduled commit worker, so set the time we started and
// invoke the worker code. It will decide if it can run now, or if it
// needs to schedule a callback.
//
// We need to track the earliest commit time so we can do burst supression.
curTime := time.Now()
r.commitReqTime = &curTime
r.lock.Unlock()
r.commitWorker()
return
}
}

r.lock.Unlock()
}

if needsCommit {
r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction)
func timeUntilNextAction(now time.Time, lastActionTime time.Time, minimumActionGap time.Duration) (allowedToReload bool, nextReload *time.Duration) {
if sinceLastAction := now.Sub(lastActionTime); sinceLastAction < minimumActionGap {
untilNextAction := minimumActionGap - sinceLastAction
return true, &untilNextAction
} else {
return false, nil
}

return true, nil
}

func (r *templateRouter) commitWorker() {
r.lock.Lock()

if r.commitRunning || r.commitReqTime == nil {
// We don't need to do anything else...
// There's either a commit in progress, and when it is done it will re-call this function at which point the work will then happen
// Or there's no need to commit at the moment
r.lock.Unlock()
return
}

// There is no commit running, let's see if we should run yet, or schedule a callback
var untilNextCallback time.Duration
allowedToReload := true
wantsToReload := false
now := time.Now()

getNextReload := func(allowed bool, potentialDuration *time.Duration) time.Duration {
if !allowed {
allowedToReload = false
return untilNextCallback
}

if potentialDuration != nil {
wantsToReload = true
// Use the largest of the potential durations so we only come back when the checks will allow it to run
if *potentialDuration > untilNextCallback {
return *potentialDuration
}
}

return untilNextCallback
}

untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadStart, r.minReloadGap))
untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadEnd, r.minReloadFreq))
if r.commitReqTime != nil {
untilNextCallback = getNextReload(timeUntilNextAction(now, *r.commitReqTime, r.reloadCoalesceDur))
}

if wantsToReload && !allowedToReload {
r.scheduleReload(untilNextCallback)
r.lock.Unlock()
return
}

if !wantsToReload {
r.lock.Unlock()
return
}

// XXX This is gross... it looks like we leave a lock hanging. Rename to doReloadAndUnlock? Handle some other way?
r.doReload()

// Re-call the worker in case there is work waiting that came in while we were working
r.commitWorker()
}

// scheduleReload() schedules the router reload function to be called after the given duration
// Must be called with r.lock held
func (r *templateRouter) scheduleReload(untilNextCallback time.Duration) {
if r.commitTimer == nil {
r.commitTimer = time.AfterFunc(untilNextCallback, r.commitWorker)
} else {
r.commitTimer.Reset(untilNextCallback)
}
}

// doReload() updates the reload state and calls the registered commit function
// Must be called with r.lock held
func (r *templateRouter) doReload() {
// Otherwise we can reload immediately... let's do it!
r.commitRunning = true
r.commitReqTime = nil
r.lock.Unlock()

if err := func() error {
defer func() {
r.lock.Lock()
r.commitRunning = false
r.lock.Unlock()
}()

return r.commitFunc()
}(); err != nil {
utilruntime.HandleError(err)
}
}

// commitAndReload refreshes the backend and persists the router state.
// This is the default implementation of the commit function, it can be replaced, but if it is
// the following state fields must be updated:
// - r.stateChanged
// - r.lastReloadStart
// - r.lastReloadEnd
//
// Note: Only one commitAndReload can be in progress at a time, but the Commit function takes care of
// ensuring that only one commit function is called.
func (r *templateRouter) commitAndReload() error {
// only state changes must be done under the lock

if err := func() error {
// Only state reads and changes must be done under the lock, the reload itself must not be done under the lock
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -345,11 +491,11 @@ func (r *templateRouter) commitAndReload() error {
}

r.stateChanged = false
r.lastReloadStart = time.Now() // XXX Should I move this out of here and do it in the caller?

glog.V(4).Infof("Writing the router config")
reloadStart := time.Now()
err := r.writeConfig()
r.metricWriteConfig.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second))
r.metricWriteConfig.Observe(float64(time.Now().Sub(r.lastReloadStart)) / float64(time.Second))
return err
}(); err != nil {
return err
Expand All @@ -363,11 +509,16 @@ func (r *templateRouter) commitAndReload() error {
glog.V(4).Infof("Reloading the router")
reloadStart := time.Now()
err := r.reloadRouter()
r.metricReload.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second))
reloadEnd := time.Now()
r.metricReload.Observe(float64(reloadEnd.Sub(reloadStart)) / float64(time.Second))
if err != nil {
return err
}

r.lock.Lock()
r.lastReloadEnd = reloadEnd
r.lock.Unlock()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/router_without_haproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In
func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) {
r := templateplugin.NewFakeTemplateRouter()

r.EnableRateLimiter(reloadInterval, func() error {
r.SetCommitFunc(func() error {
r.FakeReloadHandler()
return rateLimitingFunc()
})
Expand Down

0 comments on commit cc3edd2

Please sign in to comment.