Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure router reload on initial sync #12199

Merged
merged 5 commits into from
Dec 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions pkg/router/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RouterController struct {
routesListConsumed bool
endpointsListConsumed bool
filteredByNamespace bool
syncing bool

RoutesListSuccessfulAtLeastOnce func() bool
EndpointsListSuccessfulAtLeastOnce func() bool
Expand Down Expand Up @@ -80,23 +81,18 @@ func (c *RouterController) handleFirstSync() bool {

// If either of the event queues were empty after the initial
// List, the tracking listConsumed variable's default value of
// 'false' may prevent the router from reloading to indicate the
// readiness status. Set the value to 'true' to ensure that a
// reload will be performed if necessary.
// 'false' may prevent the router from committing the readiness
// status. Set the value to 'true' to ensure that state will be
// committed if necessary.
if c.RoutesListCount() == 0 {
c.routesListConsumed = true
}
if c.EndpointsListCount() == 0 {
c.endpointsListConsumed = true
}
c.updateLastSyncProcessed()
c.commit()

err := c.Plugin.SetSyncedAtLeastOnce()
if err == nil {
return true
}
utilruntime.HandleError(err)
return false
return true
}

// watchForFirstSync loops until the first sync has been handled.
Expand All @@ -116,16 +112,17 @@ func (c *RouterController) HandleNamespaces() {
c.lock.Lock()
defer c.lock.Unlock()

glog.V(4).Infof("Updating watched namespaces: %v", namespaces)
if err := c.Plugin.HandleNamespaces(namespaces); err != nil {
utilruntime.HandleError(err)
}

// Namespace filtering is assumed to be have been
// performed so long as the plugin event handler is called
// at least once.
c.filteredByNamespace = true
c.updateLastSyncProcessed()
c.commit()

glog.V(4).Infof("Updating watched namespaces: %v", namespaces)
if err := c.Plugin.HandleNamespaces(namespaces); err != nil {
utilruntime.HandleError(err)
}
return
}
utilruntime.HandleError(fmt.Errorf("unable to find namespaces for router: %v", err))
Expand Down Expand Up @@ -164,18 +161,18 @@ func (c *RouterController) HandleRoute() {
c.lock.Lock()
defer c.lock.Unlock()

// Change the local sync state within the lock to ensure that all
// event handlers have the same view of sync state.
c.routesListConsumed = c.RoutesListConsumed()
c.updateLastSyncProcessed()

glog.V(4).Infof("Processing Route: %s -> %s", route.Name, route.Spec.To.Name)
glog.V(4).Infof(" Alias: %s", route.Spec.Host)
glog.V(4).Infof(" Event: %s", eventType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move the [defer] c.lock.{Lock,UnLock} to after the Info log messages.

Copy link
Contributor Author

@marun marun Dec 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's in scope for this PR.

Assuming it was in scope, why would that be a good idea? It would likely result in logs from the different handlers being interleaved rather than separated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why hold a lock for longer than you need to? You are blocked on IO on the log messages. Seems wasteful to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The majority of the router's logging occurs in the chain of plugins whose methods are also called inside the lock. I don't know why sacrificing log coherency - an important aid to debugging - would be worth the relatively insignificant penalty involved in having this method also log inside the lock.

But I neither wrote the logging nor modified it in the PR. If you want this changed I think it should be done separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look at cleaning up.


if err := c.Plugin.HandleRoute(eventType, route); err != nil {
utilruntime.HandleError(err)
}

// Change the local sync state within the lock to ensure that all
// event handlers have the same view of sync state.
c.routesListConsumed = c.RoutesListConsumed()
c.commit()
}

// HandleEndpoints handles a single Endpoints event and refreshes the router backend.
Expand All @@ -189,22 +186,36 @@ func (c *RouterController) HandleEndpoints() {
c.lock.Lock()
defer c.lock.Unlock()

if err := c.Plugin.HandleEndpoints(eventType, endpoints); err != nil {
utilruntime.HandleError(err)
}

// Change the local sync state within the lock to ensure that all
// event handlers have the same view of sync state.
c.endpointsListConsumed = c.EndpointsListConsumed()
c.updateLastSyncProcessed()
c.commit()
}

if err := c.Plugin.HandleEndpoints(eventType, endpoints); err != nil {
// commit notifies the plugin that it is safe to commit state.
func (c *RouterController) commit() {
syncing := !(c.endpointsListConsumed && c.routesListConsumed &&
(c.Namespaces == nil || c.filteredByNamespace))
c.logSyncState(syncing)
if syncing {
return
}
if err := c.Plugin.Commit(); err != nil {
utilruntime.HandleError(err)
}
}

// updateLastSyncProcessed notifies the plugin if the most recent sync
// of router resources has been completed.
func (c *RouterController) updateLastSyncProcessed() {
lastSyncProcessed := c.endpointsListConsumed && c.routesListConsumed &&
(c.Namespaces == nil || c.filteredByNamespace)
if err := c.Plugin.SetLastSyncProcessed(lastSyncProcessed); err != nil {
utilruntime.HandleError(err)
func (c *RouterController) logSyncState(syncing bool) {
if c.syncing != syncing {
c.syncing = syncing
if c.syncing {
glog.V(4).Infof("Router sync in progress")
} else {
glog.V(4).Infof("Router sync complete")
}
}
}
38 changes: 18 additions & 20 deletions pkg/router/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
)

type fakeRouterPlugin struct {
lastSyncProcessed bool
syncedAtLeastOnce bool
commitRequested bool
}

func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error {
Expand All @@ -28,13 +27,8 @@ func (p *fakeRouterPlugin) HandleNamespaces(namespaces sets.String) error {
return nil
}

func (p *fakeRouterPlugin) SetLastSyncProcessed(processed bool) error {
p.lastSyncProcessed = processed
return nil
}

func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() error {
p.syncedAtLeastOnce = true
func (p *fakeRouterPlugin) Commit() error {
p.commitRequested = true
return nil
}

Expand All @@ -45,7 +39,7 @@ func (n fakeNamespaceLister) NamespaceNames() (sets.String, error) {
return sets.NewString("foo"), nil
}

func TestRouterController_updateLastSyncProcessed(t *testing.T) {
func TestRouterController_commit(t *testing.T) {
p := fakeRouterPlugin{}
routesListConsumed := true
c := RouterController{
Expand All @@ -69,30 +63,34 @@ func TestRouterController_updateLastSyncProcessed(t *testing.T) {
NamespaceRetries: 1,
}

expectedMsg := "commit not expected to have been requested"
notExpectedMsg := "commit expected to have been requested"

// Simulate the initial sync
c.HandleNamespaces()
if p.lastSyncProcessed {
t.Fatalf("last sync not expected to have been processed")
if p.commitRequested {
t.Fatalf(notExpectedMsg)
}
c.HandleEndpoints()
if p.lastSyncProcessed {
t.Fatalf("last sync not expected to have been processed")
if p.commitRequested {
t.Fatalf(notExpectedMsg)
}
c.HandleRoute()
if !p.lastSyncProcessed {
t.Fatalf("last sync expected to have been processed")
if !p.commitRequested {
t.Fatalf(expectedMsg)
}

// Simulate a relist
p.commitRequested = false
routesListConsumed = false
c.HandleRoute()
if p.lastSyncProcessed {
t.Fatalf("last sync not expected to have been processed")
if p.commitRequested {
t.Fatalf(notExpectedMsg)
}
routesListConsumed = true
c.HandleRoute()
if !p.lastSyncProcessed {
t.Fatalf("last sync expected to have been processed")
if !p.commitRequested {
t.Fatalf(expectedMsg)
}

}
8 changes: 2 additions & 6 deletions pkg/router/controller/extended_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func (p *ExtendedValidator) HandleNamespaces(namespaces sets.String) error {
return p.plugin.HandleNamespaces(namespaces)
}

func (p *ExtendedValidator) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *ExtendedValidator) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
func (p *ExtendedValidator) Commit() error {
return p.plugin.Commit()
}
8 changes: 2 additions & 6 deletions pkg/router/controller/host_admitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,8 @@ func (p *HostAdmitter) HandleNamespaces(namespaces sets.String) error {
return p.plugin.HandleNamespaces(namespaces)
}

func (p *HostAdmitter) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *HostAdmitter) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
func (p *HostAdmitter) Commit() error {
return p.plugin.Commit()
}

// addRoute admits routes based on subdomain ownership - returns errors if the route is not admitted.
Expand Down
8 changes: 2 additions & 6 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,6 @@ func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
return a.plugin.HandleNamespaces(namespaces)
}

func (a *StatusAdmitter) SetLastSyncProcessed(processed bool) error {
return a.plugin.SetLastSyncProcessed(processed)
}

func (a *StatusAdmitter) SetSyncedAtLeastOnce() error {
return a.plugin.SetSyncedAtLeastOnce()
func (a *StatusAdmitter) Commit() error {
return a.plugin.Commit()
}
6 changes: 1 addition & 5 deletions pkg/router/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ func (p *fakePlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) error {
func (p *fakePlugin) HandleNamespaces(namespaces sets.String) error {
return fmt.Errorf("not expected")
}
func (p *fakePlugin) SetLastSyncProcessed(processed bool) error {
return fmt.Errorf("not expected")
}

func (p *fakePlugin) SetSyncedAtLeastOnce() error {
func (p *fakePlugin) Commit() error {
return fmt.Errorf("not expected")
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/router/controller/unique_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,8 @@ func (p *UniqueHost) HandleNamespaces(namespaces sets.String) error {
return p.plugin.HandleNamespaces(namespaces)
}

func (p *UniqueHost) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *UniqueHost) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
func (p *UniqueHost) Commit() error {
return p.plugin.Commit()
}

// routeKeys returns the internal router key to use for the given Route.
Expand Down
7 changes: 1 addition & 6 deletions pkg/router/f5/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,11 +630,6 @@ func (p *F5Plugin) HandleRoute(eventType watch.EventType,
}

// No-op since f5 configuration can be updated piecemeal
func (p *F5Plugin) SetLastSyncProcessed(processed bool) error {
return nil
}

// No-op since f5 has its own concept of what 'ready' means
func (p *F5Plugin) SetSyncedAtLeastOnce() error {
func (p *F5Plugin) Commit() error {
return nil
}
3 changes: 1 addition & 2 deletions pkg/router/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ type Plugin interface {
// If sent, filter the list of accepted routes and endpoints to this set
HandleNamespaces(namespaces sets.String) error
HandleNode(watch.EventType, *kapi.Node) error
SetLastSyncProcessed(processed bool) error
SetSyncedAtLeastOnce() error
Commit() error
}
Loading