Skip to content

Commit

Permalink
Fix timeout validator unit test flakes
Browse files Browse the repository at this point in the history
Signed-off-by: Monis Khan <[email protected]>
Signed-off-by: Simo Sorce <[email protected]>
  • Loading branch information
enj authored and simo5 committed Jan 3, 2018
1 parent 0ed56a9 commit 83811da
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 43 deletions.
165 changes: 130 additions & 35 deletions pkg/auth/oauth/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
clienttesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -486,15 +487,34 @@ func (f fakeOAuthClientLister) Get(name string) (*oapi.OAuthClient, error) {
}

func (f fakeOAuthClientLister) List(selector labels.Selector) ([]*oapi.OAuthClient, error) {
var list []*oapi.OAuthClient
ret, _ := f.clients.List(metav1.ListOptions{})
for i := range ret.Items {
list = append(list, &ret.Items[i])
}
return list, nil
panic("not used")
}

type fakeTicker struct {
clock *clock.FakeClock
ch <-chan time.Time
}

func (t *fakeTicker) Now() time.Time {
return t.clock.Now()
}

func (t *fakeTicker) C() <-chan time.Time {
return t.ch
}

func (t *fakeTicker) Stop() {}

func (t *fakeTicker) NewTicker(d time.Duration) {
t.ch = t.clock.Tick(d)
}

func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oauthclient.OAuthAccessTokenInterface, present bool) {
func (t *fakeTicker) Sleep(d time.Duration) {
t.clock.Sleep(d)
}

func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oauthclient.OAuthAccessTokenInterface, current *fakeTicker, present bool) {
t.Helper()
userInfo, found, err := authf.AuthenticateToken(name)
if present {
if !found {
Expand All @@ -508,9 +528,12 @@ func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oau
}
} else {
if found {
token, _ := tokens.Get(name, metav1.GetOptions{})
token, tokenErr := tokens.Get(name, metav1.GetOptions{})
if tokenErr != nil {
t.Fatal(tokenErr)
}
t.Errorf("Found token (created=%s, timeout=%di, now=%s), but it should be gone!",
token.CreationTimestamp, token.InactivityTimeoutSeconds, time.Now())
token.CreationTimestamp, token.InactivityTimeoutSeconds, current.Now())
}
if err != errTimedout {
t.Errorf("Unexpected error checking absence of token %s: %v", name, err)
Expand All @@ -521,13 +544,24 @@ func checkToken(t *testing.T, name string, authf authenticator.Token, tokens oau
}
}

func wait(t *testing.T, c chan struct{}) {
t.Helper()
select {
case <-c:
case <-time.After(30 * time.Second):
t.Fatal("failed to see channel event")
}
}

func TestAuthenticateTokenTimeout(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

testClock := &fakeTicker{clock: clock.NewFakeClock(time.Time{})}

defaultTimeout := int32(30) // 30 seconds
clientTimeout := int32(15) // 15 seconds so flush -> 5 seconds
minTimeout := int32(10) // 10 seconds
clientTimeout := int32(15) // 15 seconds
minTimeout := int32(10) // 10 seconds -> 10/3 = a tick per 3 seconds

testClient := oapi.OAuthClient{
ObjectMeta: metav1.ObjectMeta{Name: "testClient"},
Expand All @@ -541,31 +575,31 @@ func TestAuthenticateTokenTimeout(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "slowClient"},
}
testToken := oapi.OAuthAccessToken{
ObjectMeta: metav1.ObjectMeta{Name: "testToken", CreationTimestamp: metav1.Time{Time: time.Now()}},
ObjectMeta: metav1.ObjectMeta{Name: "testToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}},
ClientName: "testClient",
ExpiresIn: 600, // 10 minutes
UserName: "foo",
UserUID: string("bar"),
InactivityTimeoutSeconds: clientTimeout,
}
quickToken := oapi.OAuthAccessToken{
ObjectMeta: metav1.ObjectMeta{Name: "quickToken", CreationTimestamp: metav1.Time{Time: time.Now()}},
ObjectMeta: metav1.ObjectMeta{Name: "quickToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}},
ClientName: "quickClient",
ExpiresIn: 600, // 10 minutes
UserName: "foo",
UserUID: string("bar"),
InactivityTimeoutSeconds: minTimeout,
}
slowToken := oapi.OAuthAccessToken{
ObjectMeta: metav1.ObjectMeta{Name: "slowToken", CreationTimestamp: metav1.Time{Time: time.Now()}},
ObjectMeta: metav1.ObjectMeta{Name: "slowToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}},
ClientName: "slowClient",
ExpiresIn: 600, // 10 minutes
UserName: "foo",
UserUID: string("bar"),
InactivityTimeoutSeconds: defaultTimeout,
}
emergToken := oapi.OAuthAccessToken{
ObjectMeta: metav1.ObjectMeta{Name: "emergToken", CreationTimestamp: metav1.Time{Time: time.Now()}},
ObjectMeta: metav1.ObjectMeta{Name: "emergToken", CreationTimestamp: metav1.Time{Time: testClock.Now()}},
ClientName: "quickClient",
ExpiresIn: 600, // 10 minutes
UserName: "foo",
Expand All @@ -580,34 +614,77 @@ func TestAuthenticateTokenTimeout(t *testing.T) {
lister := &fakeOAuthClientLister{
clients: oauthClients,
}

timeouts := NewTimeoutValidator(accessTokenGetter, lister, defaultTimeout, minTimeout)

// inject fake clock, which has some interesting properties
// 1. A sleep will cause at most one ticker event, regardless of how long the sleep was
// 2. The clock will hold one tick event and will drop the next one if something does not consume it first
timeouts.ticker = testClock

// decorate flush
// The fake clock 1. and 2. require that we issue a wait(t, timeoutsSync) after each testClock.Sleep that causes a tick
originalFlush := timeouts.flushHandler
timeoutsSync := make(chan struct{})
timeouts.flushHandler = func(flushHorizon time.Time) {
originalFlush(flushHorizon)
timeoutsSync <- struct{}{} // signal that flush is complete so we never race against it
}

// decorate putToken
// We must issue a wait(t, putTokenSync) after each call to checkToken that should be successful
originalPutToken := timeouts.putTokenHandler
putTokenSync := make(chan struct{})
timeouts.putTokenHandler = func(td *tokenData) {
originalPutToken(td)
putTokenSync <- struct{}{} // signal that putToken is complete so we never race against it
}

// add some padding to all sleep invocations to make sure we are not failing on any boundary values
buffer := time.Nanosecond

tokenAuthenticator := NewTokenAuthenticator(accessTokenGetter, userRegistry, identitymapper.NoopGroupMapper{}, timeouts)

go timeouts.Run(stopCh)

// wait to see that the other thread has updated the timeouts
time.Sleep(1 * time.Second)

// TIME: 1 seconds have passed here
// TIME: 0 seconds have passed here

// first time should succeed for all
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync) // from emergency flush because quickToken has a short enough timeout

checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

// this should cause an emergency flush, if not the next auth will fail,
// as the token will be timed out
checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync) // from emergency flush because emergToken has a super short timeout

// wait 5 seconds
time.Sleep(5 * time.Second)
// wait 6 seconds
testClock.Sleep(5*time.Second + buffer)

// a tick happens every 3 seconds
wait(t, timeoutsSync)

// TIME: 6th second

// See if emergency flush happened
checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "emergToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync) // from emergency flush because emergToken has a super short timeout

// wait for timeout (minTimeout + 1 - the previously waited 6 seconds)
time.Sleep(time.Duration(minTimeout-5) * time.Second)
testClock.Sleep(time.Duration(minTimeout-5)*time.Second + buffer)
wait(t, timeoutsSync)

// TIME: 11th second

Expand All @@ -625,21 +702,34 @@ func TestAuthenticateTokenTimeout(t *testing.T) {
}
}

// this should fail
checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, false)
// this should fail, thus no call to wait(t, putTokenSync)
checkToken(t, "quickToken", tokenAuthenticator, accessTokenGetter, testClock, false)

// while this should get updated
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync)

// wait for timeout
time.Sleep(time.Duration(clientTimeout+1) * time.Second)
testClock.Sleep(time.Duration(clientTimeout+1)*time.Second + buffer)

// 16 seconds equals 5 more flushes, but the fake clock will only tick once during this time
wait(t, timeoutsSync)

// TIME: 27th second

// this should get updated
checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "slowToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync)

// while this should not fail
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync)
// and should be updated to last at least till the 31st second
token, err := accessTokenGetter.Get("testToken", metav1.GetOptions{})
if err != nil {
Expand All @@ -663,10 +753,15 @@ func TestAuthenticateTokenTimeout(t *testing.T) {
}

// and wait until test token should time out, and has been flushed for sure
time.Sleep(time.Duration(minTimeout) * time.Second)
testClock.Sleep(time.Duration(minTimeout)*time.Second + buffer)
wait(t, timeoutsSync)

// while this should not fail
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, true)
checkToken(t, "testToken", tokenAuthenticator, accessTokenGetter, testClock, true)
wait(t, putTokenSync)

wait(t, timeoutsSync)

// and should be updated to have a ZERO timeout
token, err = accessTokenGetter.Get("testToken", metav1.GetOptions{})
if err != nil {
Expand Down
55 changes: 47 additions & 8 deletions pkg/auth/oauth/registry/timeoutvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,48 @@ func timeoutAsDuration(timeout int32) time.Duration {
return time.Duration(timeout) * time.Second
}

// This interface is used to allow mocking time from unit tests
// NOTE: Never use time.Now() directly in the code, use this interface
// Now() function instead.
type internalTickerInterface interface {
Now() time.Time
NewTicker(d time.Duration)
C() <-chan time.Time
Stop()
}

type internalTicker struct {
ticker *time.Ticker
}

func (t *internalTicker) Now() time.Time {
return time.Now()
}

func (t *internalTicker) C() <-chan time.Time {
return t.ticker.C
}

func (t *internalTicker) Stop() {
t.ticker.Stop()
}

func (t *internalTicker) NewTicker(d time.Duration) {
t.ticker = time.NewTicker(d)
}

type TimeoutValidator struct {
oauthClients oauthclientlister.OAuthClientLister
tokens oauthclient.OAuthAccessTokenInterface
tokenChannel chan *tokenData
data *rankedset.RankedSet
defaultTimeout time.Duration
tickerInterval time.Duration

// fields that are used to have a deterministic order of events in unit tests
flushHandler func(flushHorizon time.Time) // allows us to decorate this func during unit tests
putTokenHandler func(td *tokenData) // allows us to decorate this func during unit tests
ticker internalTickerInterface // allows us to control time during unit tests
}

func NewTimeoutValidator(tokens oauthclient.OAuthAccessTokenInterface, oauthClients oauthclientlister.OAuthClientLister, defaultTimeout int32, minValidTimeout int32) *TimeoutValidator {
Expand All @@ -60,7 +95,10 @@ func NewTimeoutValidator(tokens oauthclient.OAuthAccessTokenInterface, oauthClie
data: rankedset.New(),
defaultTimeout: timeoutAsDuration(defaultTimeout),
tickerInterval: timeoutAsDuration(minValidTimeout / 3), // we tick at least 3 times within each timeout period
ticker: &internalTicker{},
}
a.flushHandler = a.flush
a.putTokenHandler = a.putToken
glog.V(5).Infof("Token Timeout Validator primed with defaultTimeout=%s tickerInterval=%s", a.defaultTimeout, a.tickerInterval)
return a
}
Expand All @@ -75,7 +113,7 @@ func (a *TimeoutValidator) Validate(token *oauth.OAuthAccessToken, _ *user.User)

td := &tokenData{
token: token,
seen: time.Now(),
seen: a.ticker.Now(),
}
if td.timeout().Before(td.seen) {
return errTimedout
Expand All @@ -88,7 +126,7 @@ func (a *TimeoutValidator) Validate(token *oauth.OAuthAccessToken, _ *user.User)
// After a positive timeout check we need to update the timeout and
// schedule an update so that we can either set or update the Timeout
// we do that launching a micro goroutine to avoid blocking
go a.putToken(td)
go a.putTokenHandler(td)

return nil
}
Expand Down Expand Up @@ -186,16 +224,16 @@ func (a *TimeoutValidator) flush(flushHorizon time.Time) {
func (a *TimeoutValidator) nextTick() time.Time {
// Add a small safety Margin so flushes tend to
// overlap a little rather than have gaps
return time.Now().Add(a.tickerInterval + 10*time.Second)
return a.ticker.Now().Add(a.tickerInterval + 10*time.Second)
}

func (a *TimeoutValidator) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.V(5).Infof("Started Token Timeout Flush Handling thread!")

ticker := time.NewTicker(a.tickerInterval)
a.ticker.NewTicker(a.tickerInterval)
// make sure to kill the ticker when we exit
defer ticker.Stop()
defer a.ticker.Stop()

nextTick := a.nextTick()

Expand All @@ -204,19 +242,20 @@ func (a *TimeoutValidator) Run(stopCh <-chan struct{}) {
case <-stopCh:
// if channel closes terminate
return

case td := <-a.tokenChannel:
a.data.Insert(td)
// if this token is going to time out before the timer, flush now
tokenTimeout := td.timeout()
if tokenTimeout.Before(nextTick) {
glog.V(5).Infof("Timeout for user=%q client=%q scopes=%v falls before next ticker (%s < %s), forcing flush!",
td.token.UserName, td.token.ClientName, td.token.Scopes, tokenTimeout, nextTick)
a.flush(nextTick)
a.flushHandler(nextTick)
}

case <-ticker.C:
case <-a.ticker.C():
nextTick = a.nextTick()
a.flush(nextTick)
a.flushHandler(nextTick)
}
}
}

0 comments on commit 83811da

Please sign in to comment.