Skip to content

Commit

Permalink
Ensure that two functions added for the same work key get run
Browse files Browse the repository at this point in the history
Only delete the key if the function we executed is the last function.
  • Loading branch information
smarterclayton committed Mar 19, 2018
1 parent d3a7139 commit 5237978
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions pkg/util/writerlease/writerlease.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ const (

var nowFn = time.Now

type work struct {
id int
fn WorkFunc
}

type WriterLease struct {
name string
backoff wait.Backoff
Expand All @@ -74,7 +79,8 @@ type WriterLease struct {
once chan struct{}

lock sync.Mutex
queued map[string]WorkFunc
id int
queued map[string]*work
queue workqueue.DelayingInterface
state State
expires time.Time
Expand All @@ -95,7 +101,7 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease {
maxBackoff: leaseDuration,
retryInterval: retryInterval,

queued: make(map[string]WorkFunc),
queued: make(map[string]*work),
queue: workqueue.NewDelayingQueue(),
once: make(chan struct{}),
}
Expand All @@ -110,7 +116,7 @@ func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, bac
maxBackoff: leaseDuration,
retryInterval: retryInterval,

queued: make(map[string]WorkFunc),
queued: make(map[string]*work),
queue: workqueue.NewNamedDelayingQueue(name),
once: make(chan struct{}),
}
Expand Down Expand Up @@ -155,7 +161,8 @@ func (l *WriterLease) WaitUntil(t time.Duration) (bool, bool) {
func (l *WriterLease) Try(key string, fn WorkFunc) {
l.lock.Lock()
defer l.lock.Unlock()
l.queued[key] = fn
l.id++
l.queued[key] = &work{fn: fn, id: l.id}
if l.state == Follower {
delay := l.expires.Sub(nowFn())
// no matter what, always wait at least some amount of time as a follower to give the nominal
Expand Down Expand Up @@ -196,7 +203,7 @@ func (l *WriterLease) Remove(key string) {
delete(l.queued, key)
}

func (l *WriterLease) get(key string) WorkFunc {
func (l *WriterLease) get(key string) *work {
l.lock.Lock()
defer l.lock.Unlock()
return l.queued[key]
Expand All @@ -215,8 +222,8 @@ func (l *WriterLease) work() bool {
}
key := item.(string)

fn := l.get(key)
if fn == nil {
work := l.get(key)
if work == nil {
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
l.queue.Done(key)
return true
Expand All @@ -236,7 +243,7 @@ func (l *WriterLease) work() bool {
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
}

isLeader, retry := fn()
isLeader, retry := work.fn()
if retry {
// come back in a bit
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
Expand All @@ -245,11 +252,11 @@ func (l *WriterLease) work() bool {
return true
}

l.finishKey(key, isLeader)
l.finishKey(key, isLeader, work.id)
return true
}

func (l *WriterLease) finishKey(key string, isLeader bool) {
func (l *WriterLease) finishKey(key string, isLeader bool, id int) {
l.lock.Lock()
defer l.lock.Unlock()

Expand All @@ -271,7 +278,9 @@ func (l *WriterLease) finishKey(key string, isLeader bool) {
}
l.expires = nowFn().Add(l.nextBackoff())
}
delete(l.queued, key)
if work, ok := l.queued[key]; ok && work.id == id {
delete(l.queued, key)
}
// close the channel before we remove the key from the queue to prevent races in Wait
if resolvedElection {
close(l.once)
Expand Down

0 comments on commit 5237978

Please sign in to comment.