Skip to content

Commit

Permalink
Use RankedSet for data bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
simo5 committed Dec 8, 2017
1 parent 07ec515 commit ffaf271
Showing 1 changed file with 28 additions and 52 deletions.
80 changes: 28 additions & 52 deletions pkg/auth/oauth/registry/timeoutvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/golang/glog"
"github.com/google/btree"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,30 +15,27 @@ import (
oauthclient "github.com/openshift/origin/pkg/oauth/generated/internalclientset/typed/oauth/internalversion"
oauthclientlister "github.com/openshift/origin/pkg/oauth/generated/listers/oauth/internalversion"
"github.com/openshift/origin/pkg/user/apis/user"
"github.com/openshift/origin/pkg/util/rankedset"
)

var errTimedout = errors.New("token timed out")

// Implements rankedset.Item
type tokenData struct {
token *oauth.OAuthAccessToken
seen time.Time
}

func (a *tokenData) timeout() time.Time {
func (a tokenData) timeout() time.Time {
return a.token.CreationTimestamp.Time.Add(time.Duration(a.token.TimeoutsIn) * time.Second)
}

type tokenDataRef struct {
name string
timeout time.Time
func (a tokenData) Key() string {
return a.token.Name
}

func (a *tokenDataRef) Less(than btree.Item) bool {

if a.timeout.Equal(than.(*tokenDataRef).timeout) {
return a.name < than.(*tokenDataRef).name
}
return a.timeout.Before(than.(*tokenDataRef).timeout)
func (a tokenData) Rank() int64 {
return a.timeout().Unix()
}

func timeoutAsDuration(timeout int32) time.Duration {
Expand All @@ -50,8 +46,7 @@ type TimeoutValidator struct {
oauthClients oauthclientlister.OAuthClientLister
tokens oauthclient.OAuthAccessTokenInterface
tokenChannel chan tokenData
data map[string]tokenData
tree *btree.BTree
data *rankedset.RankedSet
minValidTimeout int32
maxFlushTimeout int32
defaultTimeout time.Duration
Expand All @@ -61,12 +56,10 @@ type TimeoutValidator struct {

func NewTimeoutValidator(tokens oauthclient.OAuthAccessTokenInterface, oauthClients oauthclientlister.OAuthClientLister, defaultTimeout int32, minValidTimeout int32, maxFlushTimeout int32) *TimeoutValidator {
a := &TimeoutValidator{
oauthClients: oauthClients,
tokens: tokens,
tokenChannel: make(chan tokenData),
data: make(map[string]tokenData),
// FIXME: what is the right degree for the btree
tree: btree.New(32),
oauthClients: oauthClients,
tokens: tokens,
tokenChannel: make(chan tokenData),
data: rankedset.New(),
minValidTimeout: minValidTimeout,
maxFlushTimeout: maxFlushTimeout,
}
Expand Down Expand Up @@ -163,21 +156,6 @@ func (a *TimeoutValidator) updateTimeouts() {
}
}

func (a *TimeoutValidator) insert(td tokenData) {
// if there is a token we have to remove it first
oldtd, exists := a.data[td.token.Name]
if exists {
a.delete(oldtd, &tokenDataRef{oldtd.token.Name, oldtd.timeout()})
}
a.data[td.token.Name] = td
a.tree.ReplaceOrInsert(&tokenDataRef{td.token.Name, td.timeout()})
}

func (a *TimeoutValidator) delete(td tokenData, tdr *tokenDataRef) {
a.tree.Delete(tdr)
delete(a.data, td.token.Name)
}

func (a *TimeoutValidator) update(td tokenData) error {
// Obtain the timeout interval for this client
delta := a.clientTimeout(td.token.ClientName)
Expand Down Expand Up @@ -207,37 +185,34 @@ func (a *TimeoutValidator) update(td tokenData) error {
}

func (a *TimeoutValidator) flush(flushHorizon time.Time) {
flushedTokens := 0
totalTokens := len(a.data)
var retrylist []tokenData

glog.V(5).Infof("Flushing tokens timing out before %s", flushHorizon)

for item := a.tree.Min(); item != nil; item = a.tree.Min() {
tdr := item.(*tokenDataRef)
if tdr.timeout.After(flushHorizon) {
// out of items within the flush Horizon
break
}
td := a.data[tdr.name]
// grab all tokens that need to be update in this flush interval
// and remove them from the stored data, they either flush now or never
tokenList := a.data.LessThan(flushHorizon.Unix(), true)

var retryList []tokenData
flushedTokens := 0

for _, item := range tokenList {
td := item.(tokenData)
err := a.update(td)
switch {
case err == nil:
flushedTokens++
case apierrors.IsConflict(err) || apierrors.IsServerTimeout(err):
glog.V(5).Infof("Token update deferred for token=%q, retriable error: %v",
td.token.Name, err)
retrylist = append(retrylist, td)
retryList = append(retryList, td)
default:
glog.V(5).Infof("Token timeout for user=%q client=%q scopes=%v was not updated: %v",
td.token.UserName, td.token.ClientName, td.token.Scopes, err)
}
// In all cases we remove the token from the data set
a.delete(td, tdr)
}

// we try once more and if it still fails we stop trying here and defer to a future regular update
for _, td := range retrylist {
// we try once more and if it still fails we stop trying here and defer
// to a future regular update if the token is used again
for _, td := range retryList {
err := a.update(td)
if err != nil {
glog.V(5).Infof("Token timeout for user=%q client=%q scopes=%v was not updated: %v",
Expand All @@ -247,7 +222,8 @@ func (a *TimeoutValidator) flush(flushHorizon time.Time) {
}
}

glog.V(5).Infof("Flushed %d tokens out of %d in bucket", flushedTokens, totalTokens)
glog.V(5).Infof("Successfully flushed %d tokens out of %d",
flushedTokens, len(tokenList))
}

func (a *TimeoutValidator) Run(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -284,7 +260,7 @@ func (a *TimeoutValidator) Run(stopCh <-chan struct{}) {
// if channel closes terminate
return
case td := <-a.tokenChannel:
a.insert(td)
a.data.Insert(td)
// if this token is going to time out before the timer, fire
// immediately (safety margin is added to avoid racing too close)
tokenTimeout := td.timeout()
Expand Down

0 comments on commit ffaf271

Please sign in to comment.