Skip to content

Commit

Permalink
Merge pull request #19018 from smarterclayton/avoid_loss
Browse files Browse the repository at this point in the history
  • Loading branch information
openshift-merge-robot authored Mar 20, 2018
2 parents 08506eb + 2acf88d commit b5f97cd
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 44 deletions.
93 changes: 51 additions & 42 deletions pkg/util/writerlease/writerlease.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ const (

var nowFn = time.Now

type work struct {
id int
fn WorkFunc
}

type WriterLease struct {
name string
backoff wait.Backoff
Expand All @@ -74,7 +79,8 @@ type WriterLease struct {
once chan struct{}

lock sync.Mutex
queued map[string]WorkFunc
id int
queued map[string]*work
queue workqueue.DelayingInterface
state State
expires time.Time
Expand All @@ -95,7 +101,7 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease {
maxBackoff: leaseDuration,
retryInterval: retryInterval,

queued: make(map[string]WorkFunc),
queued: make(map[string]*work),
queue: workqueue.NewDelayingQueue(),
once: make(chan struct{}),
}
Expand All @@ -110,7 +116,7 @@ func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, bac
maxBackoff: leaseDuration,
retryInterval: retryInterval,

queued: make(map[string]WorkFunc),
queued: make(map[string]*work),
queue: workqueue.NewNamedDelayingQueue(name),
once: make(chan struct{}),
}
Expand All @@ -122,7 +128,8 @@ func (l *WriterLease) Run(stopCh <-chan struct{}) {

go func() {
defer utilruntime.HandleCrash()
l.work()
for l.work() {
}
glog.V(4).Infof("[%s] Worker stopped", l.name)
}()

Expand Down Expand Up @@ -154,7 +161,8 @@ func (l *WriterLease) WaitUntil(t time.Duration) (bool, bool) {
func (l *WriterLease) Try(key string, fn WorkFunc) {
l.lock.Lock()
defer l.lock.Unlock()
l.queued[key] = fn
l.id++
l.queued[key] = &work{fn: fn, id: l.id}
if l.state == Follower {
delay := l.expires.Sub(nowFn())
// no matter what, always wait at least some amount of time as a follower to give the nominal
Expand Down Expand Up @@ -195,7 +203,7 @@ func (l *WriterLease) Remove(key string) {
delete(l.queued, key)
}

func (l *WriterLease) get(key string) WorkFunc {
func (l *WriterLease) get(key string) *work {
l.lock.Lock()
defer l.lock.Unlock()
return l.queued[key]
Expand All @@ -207,49 +215,48 @@ func (l *WriterLease) leaseState() (State, time.Time, int) {
return l.state, l.expires, l.tick
}

func (l *WriterLease) work() {
for {
item, shutdown := l.queue.Get()
if shutdown {
return
}
key := item.(string)

fn := l.get(key)
if fn == nil {
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
l.queue.Done(key)
continue
}
func (l *WriterLease) work() bool {
item, shutdown := l.queue.Get()
if shutdown {
return false
}
key := item.(string)

leaseState, leaseExpires, _ := l.leaseState()
if leaseState == Follower {
// if we are following, continue to defer work until the lease expires
if remaining := leaseExpires.Sub(nowFn()); remaining > 0 {
glog.V(4).Infof("[%s] Follower, %s remaining in lease", l.name, remaining)
l.queue.AddAfter(key, remaining)
l.queue.Done(key)
continue
}
glog.V(4).Infof("[%s] Lease expired, running %s", l.name, key)
} else {
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
}
work := l.get(key)
if work == nil {
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
l.queue.Done(key)
return true
}

isLeader, retry := fn()
if retry {
// come back in a bit
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
l.queue.AddAfter(key, l.retryInterval)
leaseState, leaseExpires, _ := l.leaseState()
if leaseState == Follower {
// if we are following, continue to defer work until the lease expires
if remaining := leaseExpires.Sub(nowFn()); remaining > 0 {
glog.V(4).Infof("[%s] Follower, %s remaining in lease", l.name, remaining)
l.queue.AddAfter(key, remaining)
l.queue.Done(key)
continue
return true
}
glog.V(4).Infof("[%s] Lease expired, running %s", l.name, key)
} else {
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
}

l.finishKey(key, isLeader)
isLeader, retry := work.fn()
if retry {
// come back in a bit
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
l.queue.AddAfter(key, l.retryInterval)
l.queue.Done(key)
return true
}

l.finishKey(key, isLeader, work.id)
return true
}

func (l *WriterLease) finishKey(key string, isLeader bool) {
func (l *WriterLease) finishKey(key string, isLeader bool, id int) {
l.lock.Lock()
defer l.lock.Unlock()

Expand All @@ -271,7 +278,9 @@ func (l *WriterLease) finishKey(key string, isLeader bool) {
}
l.expires = nowFn().Add(l.nextBackoff())
}
delete(l.queued, key)
if work, ok := l.queued[key]; ok && work.id == id {
delete(l.queued, key)
}
// close the channel before we remove the key from the queue to prevent races in Wait
if resolvedElection {
close(l.once)
Expand Down
40 changes: 40 additions & 0 deletions pkg/util/writerlease/writerlease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,46 @@ func TestBecomeFollowerAfterRetry(t *testing.T) {
}
}

func TestRunOverlappingWork(t *testing.T) {
l := New(0, 0)
l.backoff.Steps = 0
l.backoff.Duration = 0
done := make(chan struct{})
defer func() {
<-done
if len(l.queued) > 0 {
t.Fatalf("queue was not empty on shutdown: %#v", l.queued)
}
}()

go func() {
t.Logf("processing first")
l.work()
t.Logf("processing second")
l.work()
t.Logf("processing done")
close(done)
}()

first := make(chan struct{})
l.Try("test", func() (bool, bool) {
first <- struct{}{}
t.Logf("waiting for second item to be added")
first <- struct{}{}
return true, false
})
<-first
second := make(chan struct{}, 1)
l.Try("test", func() (bool, bool) {
second <- struct{}{}
return true, false
})
t.Logf("second item added")
<-first
<-second
<-done
}

func TestExtend(t *testing.T) {
nowFn = func() time.Time { return time.Unix(0, 0) }
defer func() { nowFn = time.Now }()
Expand Down
3 changes: 1 addition & 2 deletions test/extended/router/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,11 @@ var _ = g.Describe("[Conformance][Area:Networking][Feature:Router]", func() {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
o.Expect(i).To(o.BeNumerically("<", 3))
case <-timer.C:
break Wait
}
}
e2e.Logf("Recorded %d writes total", writes)
o.Expect(writes).To(o.BeNumerically("<", 5))
}()
})
})
Expand Down

0 comments on commit b5f97cd

Please sign in to comment.