From 902bd6fe740a705a29ca68c1d28207fdf0fca5fc Mon Sep 17 00:00:00 2001 From: Benjamin Bennett Date: Tue, 26 Sep 2017 13:27:27 -0400 Subject: [PATCH] Change the router reload supression, and add some more controls The main purpose is to change the locking so that a reload doesn't hold a lock of the router object for the duration of the reload. The router controls are: - $RELOAD_INTERVAL - Max Reload Frequency (start-to-start of a reload), default 5s - $RELOAD_GAP - Gaps between reloads (end-to-start of a reload), default 100ms - $RELOAD_EVENT_WAIT - Bunch events by waiting a little before reload (wait N ms between the time the event comes in and when the reload starts), default 10ms The commit routine is now changed to have a top and bottom half. The top half checks to see if a reload is scheduled, and if it is, it returns immediately. If there's no reload scheduled then it calls the bottom half and returns. The bottom half is in charge of determining if it can immediately reload or if it has to wait. If it must wait, then it works out the earliest time it can reload and schedules a callback to itself for that time. If it determines it can reload, then it runs the reload code immediately. When the reload is complete, it calls itself again to make sure there was no other pending reload that had come in while the reload was running. --- pkg/cmd/infra/router/template.go | 48 ++++- pkg/router/template/fake.go | 14 +- pkg/router/template/plugin.go | 4 + pkg/router/template/router.go | 192 ++++++++++++++---- .../router_without_haproxy_test.go | 2 +- 5 files changed, 217 insertions(+), 43 deletions(-) diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index f602745a1d69..475c41bb873f 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -37,6 +37,12 @@ import ( // defaultReloadInterval is how often to do reloads in seconds. const defaultReloadInterval = 5 +// defaultReloadGap is the minimum gap between a reload end and start in milliseconds. +const defaultReloadGap = 100 + +// defaultReloadEventWait is how long to wait for more events before triggering a reload. In milliseconds +const defaultReloadEventWait = 10 + var routerLong = templates.LongDesc(` Start a router @@ -68,6 +74,8 @@ type TemplateRouter struct { TemplateFile string ReloadScript string ReloadInterval time.Duration + ReloadGap time.Duration + ReloadEventWait time.Duration DefaultCertificate string DefaultCertificatePath string DefaultCertificateDir string @@ -87,12 +95,36 @@ func reloadInterval() time.Duration { interval := util.Env("RELOAD_INTERVAL", fmt.Sprintf("%vs", defaultReloadInterval)) value, err := time.ParseDuration(interval) if err != nil { - glog.Warningf("Invalid RELOAD_INTERVAL %q, using default value %v ...", interval, defaultReloadInterval) + glog.Warningf("Invalid RELOAD_INTERVAL %q, using default value %vs ...", interval, defaultReloadInterval) value = time.Duration(defaultReloadInterval * time.Second) } return value } +// reloadGap returns how often to wait between a reload end and the start of the next reload. +// The gap value is based on an environment variable or the default. +func reloadGap() time.Duration { + gap := util.Env("RELOAD_GAP", fmt.Sprintf("%vms", defaultReloadGap)) + value, err := time.ParseDuration(gap) + if err != nil { + glog.Warningf("Invalid RELOAD_GAP %q, using default value %vms ...", gap, defaultReloadGap) + value = time.Duration(defaultReloadGap * time.Millisecond) + } + return value +} + +// reloadEventWait returns how often to wait after getting the first event after a reload before triggering the next reload. +// The event wait value is based on an environment variable or the default. +func reloadEventWait() time.Duration { + eventWait := util.Env("RELOAD_EVENT_WAIT", fmt.Sprintf("%vms", defaultReloadEventWait)) + value, err := time.ParseDuration(eventWait) + if err != nil { + glog.Warningf("Invalid RELOAD_EVENT_WAIT %q, using default value %vms ...", eventWait, defaultReloadEventWait) + value = time.Duration(defaultReloadEventWait * time.Millisecond) + } + return value +} + func (o *TemplateRouter) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.RouterName, "name", util.Env("ROUTER_SERVICE_NAME", "public"), "The name the router will identify itself with in the route status") flag.StringVar(&o.RouterCanonicalHostname, "router-canonical-hostname", util.Env("ROUTER_CANONICAL_HOSTNAME", ""), "CanonicalHostname is the external host name for the router that can be used as a CNAME for the host requested for this route. This value is optional and may not be set in all cases.") @@ -103,7 +135,9 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.DefaultDestinationCAPath, "default-destination-ca-path", util.Env("DEFAULT_DESTINATION_CA_PATH", "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"), "A path to a PEM file containing the default CA bundle to use with re-encrypt routes. This CA should sign for certificates in the Kubernetes DNS space (service.namespace.svc).") flag.StringVar(&o.TemplateFile, "template", util.Env("TEMPLATE_FILE", ""), "The path to the template file to use") flag.StringVar(&o.ReloadScript, "reload", util.Env("RELOAD_SCRIPT", ""), "The path to the reload script to use") - flag.DurationVar(&o.ReloadInterval, "interval", reloadInterval(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the last reload time.") + flag.DurationVar(&o.ReloadInterval, "interval", reloadInterval(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the time the last reload started.") + flag.DurationVar(&o.ReloadGap, "reload-gap", reloadGap(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the time the last reload ended.") + flag.DurationVar(&o.ReloadEventWait, "reload-event-wait", reloadEventWait(), "Controls how often router reloads are invoked. Mutiple router reload events are gathered for the given duration before a reload is triggered.") flag.BoolVar(&o.ExtendedValidation, "extended-validation", util.Env("EXTENDED_VALIDATION", "true") == "true", "If set, then an additional extended validation step is performed on all routes admitted in by this router. Defaults to true and enables the extended validation checks.") flag.BoolVar(&o.BindPortsAfterSync, "bind-ports-after-sync", util.Env("ROUTER_BIND_PORTS_AFTER_SYNC", "") == "true", "Bind ports only after route state has been synchronized") flag.StringVar(&o.MaxConnections, "max-connections", util.Env("ROUTER_MAX_CONNECTIONS", ""), "Specifies the maximum number of concurrent connections.") @@ -205,6 +239,14 @@ func (o *TemplateRouterOptions) Complete() error { return fmt.Errorf("invalid reload interval: %v - must be a positive duration", nsecs) } + if o.ReloadGap < 0 { + return fmt.Errorf("invalid reload gap: %s - must not be negative", o.ReloadGap.String()) + } + + if o.ReloadEventWait < 0 { + return fmt.Errorf("invalid reload event wait: %s - must not be negative", o.ReloadEventWait.String()) + } + if len(routerCanonicalHostname) > 0 { if errs := validation.IsDNS1123Subdomain(routerCanonicalHostname); len(errs) != 0 { return fmt.Errorf("invalid canonical hostname: %s", routerCanonicalHostname) @@ -324,6 +366,8 @@ func (o *TemplateRouterOptions) Run() error { TemplatePath: o.TemplateFile, ReloadScriptPath: o.ReloadScript, ReloadInterval: o.ReloadInterval, + ReloadGap: o.ReloadGap, + ReloadEventWait: o.ReloadEventWait, DefaultCertificate: o.DefaultCertificate, DefaultCertificatePath: o.DefaultCertificatePath, DefaultCertificateDir: o.DefaultCertificateDir, diff --git a/pkg/router/template/fake.go b/pkg/router/template/fake.go index a34f5f019b29..ca34948fc8ae 100644 --- a/pkg/router/template/fake.go +++ b/pkg/router/template/fake.go @@ -1,25 +1,27 @@ package templaterouter +import "time" + // NewFakeTemplateRouter provides an empty template router with a simple certificate manager // backed by a fake cert writer for testing func NewFakeTemplateRouter() *templateRouter { fakeCertManager, _ := newSimpleCertificateManager(newFakeCertificateManagerConfig(), &fakeCertWriter{}) return &templateRouter{ - state: map[string]ServiceAliasConfig{}, - serviceUnits: make(map[string]ServiceUnit), - certManager: fakeCertManager, - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), + state: map[string]ServiceAliasConfig{}, + serviceUnits: make(map[string]ServiceUnit), + certManager: fakeCertManager, } } // FakeReloadHandler implements the minimal changes needed to make the locking behavior work -// This MUST match the behavior with the stateChanged of commitAndReload +// This MUST match the behavior with the object updates of commitAndReload() in router.go func (r *templateRouter) FakeReloadHandler() { r.lock.Lock() defer r.lock.Unlock() r.stateChanged = false + r.lastReloadStart = time.Now() + r.lastReloadEnd = time.Now() return } diff --git a/pkg/router/template/plugin.go b/pkg/router/template/plugin.go index aa27fbdab8ee..f8f0fb0863cc 100644 --- a/pkg/router/template/plugin.go +++ b/pkg/router/template/plugin.go @@ -46,6 +46,8 @@ type TemplatePluginConfig struct { TemplatePath string ReloadScriptPath string ReloadInterval time.Duration + ReloadGap time.Duration + ReloadEventWait time.Duration ReloadCallbacks []func() DefaultCertificate string DefaultCertificatePath string @@ -123,6 +125,8 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp templates: templates, reloadScriptPath: cfg.ReloadScriptPath, reloadInterval: cfg.ReloadInterval, + reloadGap: cfg.ReloadGap, + reloadEventWait: cfg.ReloadEventWait, reloadCallbacks: cfg.ReloadCallbacks, defaultCertificate: cfg.DefaultCertificate, defaultCertificatePath: cfg.DefaultCertificatePath, diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index 64026c2aa1ef..fe7391ed7831 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -17,12 +17,12 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" cmdutil "github.com/openshift/origin/pkg/cmd/util" routeapi "github.com/openshift/origin/pkg/route/apis/route" "github.com/openshift/origin/pkg/router/controller" - "github.com/openshift/origin/pkg/util/ratelimiter" ) const ( @@ -52,7 +52,6 @@ type templateRouter struct { dir string templates map[string]*template.Template reloadScriptPath string - reloadInterval time.Duration reloadCallbacks []func() state map[string]ServiceAliasConfig serviceUnits map[string]ServiceUnit @@ -86,13 +85,23 @@ type templateRouter struct { statsPort int // if the router should allow wildcard routes. allowWildcardRoutes bool - // rateLimitedCommitFunction is a rate limited commit (persist state + refresh the backend) - // function that coalesces and controls how often the router is reloaded. - rateLimitedCommitFunction *ratelimiter.RateLimitedFunction - // rateLimitedCommitStopChannel is the stop/terminate channel. - rateLimitedCommitStopChannel chan struct{} - // lock is a mutex used to prevent concurrent router reloads. + // lock is a mutex used to prevent concurrent router state updates lock sync.Mutex + // Track the start and end of router reloads + lastReloadStart time.Time + lastReloadEnd time.Time + // commitReqTime is nil if no commit is needed, otherwise it is the time the commit was requested + commitReqTime *time.Time + // commitRunning indicates whether there is the commitFunc is actively running + commitRunning bool + // commitTimer is the timer we use to make callbacks to the delayed commits + commitTimer *time.Timer + // The minimum time between the starts of reloads + reloadInterval time.Duration + // The minimum gap between the end of a reload and the start of the next + reloadGap time.Duration + // How long to wait after an event before triggering a reload in case other events come from the same change (should be short... milliseconds) + reloadEventWait time.Duration // If true, haproxy should only bind ports when it has route and endpoint state bindPortsAfterSync bool // whether the router state has been read from the api at least once @@ -103,6 +112,8 @@ type templateRouter struct { metricReload prometheus.Summary // metricWriteConfig tracks writing config metricWriteConfig prometheus.Summary + // commitFunc is the commit (persist state + refresh the backend) function. This is only to be used for our test hooks + commitFunc CommitFunc } // templateRouterCfg holds all configuration items required to initialize the template router @@ -111,6 +122,8 @@ type templateRouterCfg struct { templates map[string]*template.Template reloadScriptPath string reloadInterval time.Duration + reloadGap time.Duration + reloadEventWait time.Duration reloadCallbacks []func() defaultCertificate string defaultCertificatePath string @@ -186,7 +199,6 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { dir: dir, templates: cfg.templates, reloadScriptPath: cfg.reloadScriptPath, - reloadInterval: cfg.reloadInterval, reloadCallbacks: cfg.reloadCallbacks, state: make(map[string]ServiceAliasConfig), serviceUnits: make(map[string]ServiceUnit), @@ -203,15 +215,22 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { peerEndpoints: []Endpoint{}, bindPortsAfterSync: cfg.bindPortsAfterSync, + commitReqTime: nil, + commitRunning: false, + + reloadInterval: cfg.reloadInterval, + reloadGap: cfg.reloadGap, + reloadEventWait: cfg.reloadEventWait, + metricReload: metricsReload, metricWriteConfig: metricWriteConfig, - - rateLimitedCommitFunction: nil, - rateLimitedCommitStopChannel: make(chan struct{}), } - numSeconds := int(cfg.reloadInterval.Seconds()) - router.EnableRateLimiter(numSeconds, router.commitAndReload) + router.SetCommitFunc(func() error { + return router.commitAndReload() + }) + + glog.V(2).Infof("Template router will coalesce reloads within %s seconds of the last restart start time, within %s seconds of the last restart end time, and wait %s seconds after the first event", router.reloadInterval.String(), router.reloadGap.String(), router.reloadEventWait.String()) if err := router.writeDefaultCert(); err != nil { return nil, err @@ -221,20 +240,8 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { return nil, err } glog.V(4).Infof("Committing state") - // Bypass the rate limiter to ensure the first sync will be - // committed without delay. - router.commitAndReload() - return router, nil -} -func (r *templateRouter) EnableRateLimiter(interval int, handlerFunc ratelimiter.HandlerFunc) { - keyFunc := func(_ interface{}) (string, error) { - return "templaterouter", nil - } - - r.rateLimitedCommitFunction = ratelimiter.NewRateLimitedFunction(keyFunc, interval, handlerFunc) - r.rateLimitedCommitFunction.RunUntil(r.rateLimitedCommitStopChannel) - glog.V(2).Infof("Template router will coalesce reloads within %v seconds of each other", interval) + return router, nil } // secretToPem composes a PEM file at the output directory from an input private key and crt file. @@ -311,6 +318,14 @@ func (r *templateRouter) readState() error { return json.Unmarshal(data, &r.state) } +// Define a type for the pluggable commit function +type CommitFunc func() error + +// This should ONLY be used for external testing hooks +func (r *templateRouter) SetCommitFunc(commitFunc CommitFunc) { + r.commitFunc = commitFunc +} + // Commit applies the changes made to the router configuration - persists // the state and refresh the backend. This is all done in the background // so that we can rate limit + coalesce multiple changes. @@ -324,18 +339,122 @@ func (r *templateRouter) Commit() { r.stateChanged = true } - needsCommit := r.stateChanged + if r.stateChanged { + // If the state changed, then we need to commit + glog.V(8).Infof("Commit called and the state has changed, could reload") + + if r.commitReqTime == nil { + // There is no scheduled commit worker, so set the time we started and + // invoke the worker code. It will decide if it can run now, or if it + // needs to schedule a callback. + // + // We need to track the earliest commit time so we can do burst supression. + now := time.Now() + r.commitReqTime = &now + r.lock.Unlock() + + glog.V(8).Infof("No scheduled reload, calling the worker (curtime: %v)", now) + + r.commitWorker() + return + } + + glog.V(8).Infof("There is already a scheduled reload (for %v), skipping the worker", r.commitReqTime) + } + r.lock.Unlock() +} - if needsCommit { - r.rateLimitedCommitFunction.Invoke(r.rateLimitedCommitFunction) +// timeUntilNextAction() works out when we can next reload based on the current time, the last action time, and the minimum allowed gap between the two. +// In order to be allowed to reload immediately, the last action + minumum gap must be < the current time. +// If we can reload, then it returns a zero duration. +// If we can't reload, then it reuturns a duration to wait for before a reload would be allowed. +func timeUntilNextAction(now time.Time, lastActionTime time.Time, minimumActionGap time.Duration) (nextReload time.Duration) { + if sinceLastAction := now.Sub(lastActionTime); sinceLastAction < minimumActionGap { + return minimumActionGap - sinceLastAction } + + return 0 * time.Second +} + +func (r *templateRouter) commitWorker() { + r.lock.Lock() + + glog.V(8).Infof("CommitWorker called") + + if r.commitRunning { + // We don't need to do anything else... there's a commit in progress, and when it is done it will re-call this function at which point the work will then happen + glog.V(8).Infof("There was already a commit running (%v) or a scheduled reload (at %v), returning from the worker", r.commitRunning, r.commitReqTime) + r.lock.Unlock() + return + } + + // There is no commit running, let's see if we should run yet, or schedule a callback + var untilNextCallback time.Duration + now := time.Now() + + getNextReload := func(potentialDuration time.Duration) time.Duration { + // Use the largest of the potential durations so we only come back when the checks will allow it to run + if potentialDuration > untilNextCallback { + return potentialDuration + } + + return untilNextCallback + } + + untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadStart, r.reloadInterval)) + untilNextCallback = getNextReload(timeUntilNextAction(now, r.lastReloadEnd, r.reloadGap)) + if r.commitReqTime != nil { + untilNextCallback = getNextReload(timeUntilNextAction(now, *r.commitReqTime, r.reloadEventWait)) + } + + if untilNextCallback > 0*time.Second { + // We want to reload... but can't yet because some window is not satisfied + if r.commitTimer == nil { + r.commitTimer = time.AfterFunc(untilNextCallback, r.commitWorker) + } else { + r.commitTimer.Reset(untilNextCallback) + } + + glog.V(8).Infof("Can't reload the router yet, need to delay %s", untilNextCallback.String()) + + r.lock.Unlock() + return + } + + // Otherwise we can reload immediately... let's do it! + r.commitRunning = true + r.commitReqTime = nil + r.lock.Unlock() + + if err := func() error { + defer func() { + r.lock.Lock() + r.commitRunning = false + r.lock.Unlock() + }() + + glog.V(8).Infof("Calling the router commit function") + return r.commitFunc() + }(); err != nil { + utilruntime.HandleError(err) + } + + // Re-call the commit in case there is work waiting that came in while we were working + // we want to call the top level commit in case the state has not changed + r.Commit() } // commitAndReload refreshes the backend and persists the router state. +// This is the default implementation of the commit function, it can be replaced by the FakeReloadHandler() for testing purposes. +// If you change any state handling here make sure you keep that in sync. +// +// Note: Only one commitAndReload can be in progress at a time, but the Commit function takes care of +// ensuring that only one commit function is called. func (r *templateRouter) commitAndReload() error { - // only state changes must be done under the lock + if err := func() error { + // Only state reads and changes must be done under the lock, the reload itself must not be done under the lock r.lock.Lock() defer r.lock.Unlock() @@ -345,11 +464,11 @@ func (r *templateRouter) commitAndReload() error { } r.stateChanged = false + r.lastReloadStart = time.Now() glog.V(4).Infof("Writing the router config") - reloadStart := time.Now() err := r.writeConfig() - r.metricWriteConfig.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second)) + r.metricWriteConfig.Observe(float64(time.Now().Sub(r.lastReloadStart)) / float64(time.Second)) return err }(); err != nil { return err @@ -363,11 +482,16 @@ func (r *templateRouter) commitAndReload() error { glog.V(4).Infof("Reloading the router") reloadStart := time.Now() err := r.reloadRouter() - r.metricReload.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second)) + reloadEnd := time.Now() + r.metricReload.Observe(float64(reloadEnd.Sub(reloadStart)) / float64(time.Second)) if err != nil { return err } + r.lock.Lock() + r.lastReloadEnd = reloadEnd + r.lock.Unlock() + return nil } diff --git a/test/integration/router_without_haproxy_test.go b/test/integration/router_without_haproxy_test.go index f111690399aa..fa0c887246b9 100644 --- a/test/integration/router_without_haproxy_test.go +++ b/test/integration/router_without_haproxy_test.go @@ -390,7 +390,7 @@ func launchRateLimitedRouter(t *testing.T, routeclient routeinternalclientset.In func initializeRouterPlugins(routeclient routeinternalclientset.Interface, projectclient projectinternalclientset.Interface, name string, reloadInterval int, rateLimitingFunc ratelimiter.HandlerFunc) (*templateplugin.TemplatePlugin, router.Plugin) { r := templateplugin.NewFakeTemplateRouter() - r.EnableRateLimiter(reloadInterval, func() error { + r.SetCommitFunc(func() error { r.FakeReloadHandler() return rateLimitingFunc() })