From 83811da2944532539b33badc2d659ece5fa4aa37 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Sat, 16 Dec 2017 00:11:16 -0500 Subject: [PATCH] Fix timeout validator unit test flakes Signed-off-by: Monis Khan Signed-off-by: Simo Sorce --- pkg/auth/oauth/registry/registry_test.go | 165 +++++++++++++++----- pkg/auth/oauth/registry/timeoutvalidator.go | 55 ++++++- 2 files changed, 177 insertions(+), 43 deletions(-) diff --git a/pkg/auth/oauth/registry/registry_test.go b/pkg/auth/oauth/registry/registry_test.go index c95fdee546bc..ab3ce52ca389 100644 --- a/pkg/auth/oauth/registry/registry_test.go +++ b/pkg/auth/oauth/registry/registry_test.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/user" clienttesting "k8s.io/client-go/testing" @@ -486,15 +487,34 @@ func (f fakeOAuthClientLister) Get(name string) (*oapi.OAuthClient, error) { } func (f fakeOAuthClientLister) List(selector labels.Selector) ([]*oapi.OAuthClient, error) { - var list []*oapi.OAuthClient - ret, _ := f.clients.List(metav1.ListOptions{}) - for i := range ret.Items { - list = append(list, &ret.Items[i]) - } - return list, nil + panic("not used") +} + +type fakeTicker struct { + clock *clock.FakeClock + ch <-chan time.Time +} + +func (t *fakeTicker) Now() time.Time { + return t.clock.Now() +} + +func (t *fakeTicker) C() <-chan time.Time { + return t.ch +} + +func (t *fakeTicker) Stop() {} + +func (t *fakeTicker) NewTicker(d time.Duration) { + t.ch = t.clock.Tick(d) } -func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oauthclient.OAuthAccessTokenInterface, present bool) { +func (t *fakeTicker) Sleep(d time.Duration) { + t.clock.Sleep(d) +} + +func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oauthclient.OAuthAccessTokenInterface, current *fakeTicker, present bool) { + t.Helper() userInfo, found, err := authf.AuthenticateToken(name) if present { if !found { @@ -508,9 +528,12 @@ func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oau } } else { if found { - token, _ := tokens.Get(name, metav1.GetOptions{}) + token, tokenErr := tokens.Get(name, metav1.GetOptions{}) + if tokenErr != nil { + t.Fatal(tokenErr) + } t.Errorf("Found token (created=%s, timeout=%di, now=%s), but it should be gone!", - token.CreationTimestamp, token.InactivityTimeoutSeconds, time.Now()) + token.CreationTimestamp, token.InactivityTimeoutSeconds, current.Now()) } if err != errTimedout { t.Errorf("Unexpected error checking absence of token %s: %v", name, err) @@ -521,13 +544,24 @@ func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oau } } +func wait(t *testing.T, c chan struct{}) { + t.Helper() + select { + case <-c: + case <-time.After(30 * time.Second): + t.Fatal("failed to see channel event") + } +} + func TestAuthenticateTokenTimeout(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) + testClock := &fakeTicker{clock: clock.NewFakeClock(time.Time{})} + defaultTimeout := int32(30) // 30 seconds - clientTimeout := int32(15) // 15 seconds so flush -> 5 seconds - minTimeout := int32(10) // 10 seconds + clientTimeout := int32(15) // 15 seconds + minTimeout := int32(10) // 10 seconds -> 10/3 = a tick per 3 seconds testClient := oapi.OAuthClient{ ObjectMeta: metav1.ObjectMeta{Name: "testClient"}, @@ -541,7 +575,7 @@ func TestAuthenticateTokenTimeout(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "slowClient"}, } testToken := oapi.OAuthAccessToken{ - ObjectMeta: metav1.ObjectMeta{Name: "testToken", CreationTimestamp: metav1.Time{Time: time.Now()}}, + ObjectMeta: metav1.ObjectMeta{Name: "testToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}}, ClientName: "testClient", ExpiresIn: 600, // 10 minutes UserName: "foo", @@ -549,7 +583,7 @@ func TestAuthenticateTokenTimeout(t *testing.T) { InactivityTimeoutSeconds: clientTimeout, } quickToken := oapi.OAuthAccessToken{ - ObjectMeta: metav1.ObjectMeta{Name: "quickToken", CreationTimestamp: metav1.Time{Time: time.Now()}}, + ObjectMeta: metav1.ObjectMeta{Name: "quickToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}}, ClientName: "quickClient", ExpiresIn: 600, // 10 minutes UserName: "foo", @@ -557,7 +591,7 @@ func TestAuthenticateTokenTimeout(t *testing.T) { InactivityTimeoutSeconds: minTimeout, } slowToken := oapi.OAuthAccessToken{ - ObjectMeta: metav1.ObjectMeta{Name: "slowToken", CreationTimestamp: metav1.Time{Time: time.Now()}}, + ObjectMeta: metav1.ObjectMeta{Name: "slowToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}}, ClientName: "slowClient", ExpiresIn: 600, // 10 minutes UserName: "foo", @@ -565,7 +599,7 @@ func TestAuthenticateTokenTimeout(t *testing.T) { InactivityTimeoutSeconds: defaultTimeout, } emergToken := oapi.OAuthAccessToken{ - ObjectMeta: metav1.ObjectMeta{Name: "emergToken", CreationTimestamp: metav1.Time{Time: time.Now()}}, + ObjectMeta: metav1.ObjectMeta{Name: "emergToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}}, ClientName: "quickClient", ExpiresIn: 600, // 10 minutes UserName: "foo", @@ -580,34 +614,77 @@ func TestAuthenticateTokenTimeout(t *testing.T) { lister := &fakeOAuthClientLister{ clients: oauthClients, } + timeouts := NewTimeoutValidator(accessTokenGetter, lister, defaultTimeout, minTimeout) + + // inject fake clock, which has some interesting properties + // 1. A sleep will cause at most one ticker event, regardless of how long the sleep was + // 2. The clock will hold one tick event and will drop the next one if something does not consume it first + timeouts.ticker = testClock + + // decorate flush + // The fake clock 1. and 2. require that we issue a wait(t, timeoutsSync) after each testClock.Sleep that causes a tick + originalFlush := timeouts.flushHandler + timeoutsSync := make(chan struct{}) + timeouts.flushHandler = func(flushHorizon time.Time) { + originalFlush(flushHorizon) + timeoutsSync <- struct{}{} // signal that flush is complete so we never race against it + } + + // decorate putToken + // We must issue a wait(t, putTokenSync) after each call to checkToken that should be successful + originalPutToken := timeouts.putTokenHandler + putTokenSync := make(chan struct{}) + timeouts.putTokenHandler = func(td *tokenData) { + originalPutToken(td) + putTokenSync <- struct{}{} // signal that putToken is complete so we never race against it + } + + // add some padding to all sleep invocations to make sure we are not failing on any boundary values + buffer := time.Nanosecond + tokenAuthenticator := NewTokenAuthenticator(accessTokenGetter, userRegistry, identitymapper.NoopGroupMapper{}, timeouts) go timeouts.Run(stopCh) - // wait to see that the other thread has updated the timeouts - time.Sleep(1 * time.Second) - - // TIME: 1 seconds have passed here + // TIME: 0 seconds have passed here // first time should succeed for all - checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true) - checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, true) - checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // from emergency flush because quickToken has a short enough timeout + + checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + // this should cause an emergency flush, if not the next auth will fail, // as the token will be timed out - checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // from emergency flush because emergToken has a super short timeout - // wait 5 seconds - time.Sleep(5 * time.Second) + // wait 6 seconds + testClock.Sleep(5*time.Second + buffer) + + // a tick happens every 3 seconds + wait(t, timeoutsSync) // TIME: 6th second // See if emergency flush happened - checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // from emergency flush because emergToken has a super short timeout // wait for timeout (minTimeout + 1 - the previously waited 6 seconds) - time.Sleep(time.Duration(minTimeout-5) * time.Second) + testClock.Sleep(time.Duration(minTimeout-5)*time.Second + buffer) + wait(t, timeoutsSync) // TIME: 11th second @@ -625,21 +702,34 @@ func TestAuthenticateTokenTimeout(t *testing.T) { } } - // this should fail - checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, false) + // this should fail, thus no call to wait(t, putTokenSync) + checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, testClock, false) + // while this should get updated - checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // wait for timeout - time.Sleep(time.Duration(clientTimeout+1) * time.Second) + testClock.Sleep(time.Duration(clientTimeout+1)*time.Second + buffer) + + // 16 seconds equals 5 more flushes, but the fake clock will only tick once during this time + wait(t, timeoutsSync) // TIME: 27th second // this should get updated - checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // while this should not fail - checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) // and should be updated to last at least till the 31st second token, err := accessTokenGetter.Get("testToken", metav1.GetOptions{}) if err != nil { @@ -663,10 +753,15 @@ func TestAuthenticateTokenTimeout(t *testing.T) { } // and wait until test token should time out, and has been flushed for sure - time.Sleep(time.Duration(minTimeout) * time.Second) + testClock.Sleep(time.Duration(minTimeout)*time.Second + buffer) + wait(t, timeoutsSync) // while this should not fail - checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true) + checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true) + wait(t, putTokenSync) + + wait(t, timeoutsSync) + // and should be updated to have a ZERO timeout token, err = accessTokenGetter.Get("testToken", metav1.GetOptions{}) if err != nil { diff --git a/pkg/auth/oauth/registry/timeoutvalidator.go b/pkg/auth/oauth/registry/timeoutvalidator.go index 57b4b2a78b96..e75588337822 100644 --- a/pkg/auth/oauth/registry/timeoutvalidator.go +++ b/pkg/auth/oauth/registry/timeoutvalidator.go @@ -43,6 +43,36 @@ func timeoutAsDuration(timeout int32) time.Duration { return time.Duration(timeout) * time.Second } +// This interface is used to allow mocking time from unit tests +// NOTE: Never use time.Now() directly in the code, use this interface +// Now() function instead. +type internalTickerInterface interface { + Now() time.Time + NewTicker(d time.Duration) + C() <-chan time.Time + Stop() +} + +type internalTicker struct { + ticker *time.Ticker +} + +func (t *internalTicker) Now() time.Time { + return time.Now() +} + +func (t *internalTicker) C() <-chan time.Time { + return t.ticker.C +} + +func (t *internalTicker) Stop() { + t.ticker.Stop() +} + +func (t *internalTicker) NewTicker(d time.Duration) { + t.ticker = time.NewTicker(d) +} + type TimeoutValidator struct { oauthClients oauthclientlister.OAuthClientLister tokens oauthclient.OAuthAccessTokenInterface @@ -50,6 +80,11 @@ type TimeoutValidator struct { data *rankedset.RankedSet defaultTimeout time.Duration tickerInterval time.Duration + + // fields that are used to have a deterministic order of events in unit tests + flushHandler func(flushHorizon time.Time) // allows us to decorate this func during unit tests + putTokenHandler func(td *tokenData) // allows us to decorate this func during unit tests + ticker internalTickerInterface // allows us to control time during unit tests } func NewTimeoutValidator(tokens oauthclient.OAuthAccessTokenInterface, oauthClients oauthclientlister.OAuthClientLister, defaultTimeout int32, minValidTimeout int32) *TimeoutValidator { @@ -60,7 +95,10 @@ func NewTimeoutValidator(tokens oauthclient.OAuthAccessTokenInterface, oauthClie data: rankedset.New(), defaultTimeout: timeoutAsDuration(defaultTimeout), tickerInterval: timeoutAsDuration(minValidTimeout / 3), // we tick at least 3 times within each timeout period + ticker: &internalTicker{}, } + a.flushHandler = a.flush + a.putTokenHandler = a.putToken glog.V(5).Infof("Token Timeout Validator primed with defaultTimeout=%s tickerInterval=%s", a.defaultTimeout, a.tickerInterval) return a } @@ -75,7 +113,7 @@ func (a *TimeoutValidator) Validate(token *oauth.OAuthAccessToken, _ *user.User) td := &tokenData{ token: token, - seen: time.Now(), + seen: a.ticker.Now(), } if td.timeout().Before(td.seen) { return errTimedout @@ -88,7 +126,7 @@ func (a *TimeoutValidator) Validate(token *oauth.OAuthAccessToken, _ *user.User) // After a positive timeout check we need to update the timeout and // schedule an update so that we can either set or update the Timeout // we do that launching a micro goroutine to avoid blocking - go a.putToken(td) + go a.putTokenHandler(td) return nil } @@ -186,16 +224,16 @@ func (a *TimeoutValidator) flush(flushHorizon time.Time) { func (a *TimeoutValidator) nextTick() time.Time { // Add a small safety Margin so flushes tend to // overlap a little rather than have gaps - return time.Now().Add(a.tickerInterval + 10*time.Second) + return a.ticker.Now().Add(a.tickerInterval + 10*time.Second) } func (a *TimeoutValidator) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() glog.V(5).Infof("Started Token Timeout Flush Handling thread!") - ticker := time.NewTicker(a.tickerInterval) + a.ticker.NewTicker(a.tickerInterval) // make sure to kill the ticker when we exit - defer ticker.Stop() + defer a.ticker.Stop() nextTick := a.nextTick() @@ -204,6 +242,7 @@ func (a *TimeoutValidator) Run(stopCh <-chan struct{}) { case <-stopCh: // if channel closes terminate return + case td := <-a.tokenChannel: a.data.Insert(td) // if this token is going to time out before the timer, flush now @@ -211,12 +250,12 @@ func (a *TimeoutValidator) Run(stopCh <-chan struct{}) { if tokenTimeout.Before(nextTick) { glog.V(5).Infof("Timeout for user=%q client=%q scopes=%v falls before next ticker (%s < %s), forcing flush!", td.token.UserName, td.token.ClientName, td.token.Scopes, tokenTimeout, nextTick) - a.flush(nextTick) + a.flushHandler(nextTick) } - case <-ticker.C: + case <-a.ticker.C(): nextTick = a.nextTick() - a.flush(nextTick) + a.flushHandler(nextTick) } } }