Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: add instrumenting to chase very short watches #2172

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,31 @@ func (c *cacheWatcher) Stop() {

// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopLocked() {
fmt.Printf("#### 4a groupResource=%v \n", c.groupResource)
if !c.stopped {
fmt.Printf("#### 4b groupResource=%v \n", c.groupResource)
c.stopped = true
// stop without draining the input channel was requested.
if !c.drainInputBuffer {
fmt.Printf("#### 4c groupResource=%v \n", c.groupResource)
close(c.done)
}
fmt.Printf("#### 4d groupResource=%v \n", c.groupResource)
close(c.input)
}

fmt.Printf("#### 4e groupResource=%v \n", c.groupResource)
// Even if the watcher was already stopped, if it previously was
// using draining mode and it's not using it now we need to
// close the done channel now. Otherwise we could leak the
// processing goroutine if it will be trying to put more objects
// into result channel, the channel will be full and there will
// already be noone on the processing the events on the receiving end.
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
fmt.Printf("#### 4f groupResource=%v \n", c.groupResource)
close(c.done)
}
fmt.Printf("#### 4g groupResource=%v \n", c.groupResource)
}

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
Expand Down
21 changes: 21 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
fmt.Printf("#### 2a groupResource=%v startCaching\n", c.groupResource)

// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
Expand All @@ -465,6 +467,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
})
defer func() {
if successfulList {
fmt.Printf("#### 2b groupResource=%v setting to false\n", c.groupResource)
c.ready.set(false)
}
}()
Expand All @@ -477,6 +480,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
}
fmt.Printf("#### 2e groupResource=%v exiting\n", c.groupResource)
}

// Versioner implements storage.Interface.
Expand Down Expand Up @@ -536,17 +540,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions

var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
fmt.Printf("#### 1a groupResource=%v\n", c.groupResource)
var ok bool
readyGeneration, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
}
} else {
fmt.Printf("#### 1b groupResource=%v\n", c.groupResource)
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
}
fmt.Printf("#### 1c groupResource=%v, originalReadyGeneration=%d\n", c.groupResource, readyGeneration)

// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
scope := namespacedName{}
Expand Down Expand Up @@ -654,39 +661,48 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
}

addedWatcher := false
fmt.Printf("#### 1n groupResource=%v \n", c.groupResource)
func() {
c.Lock()
defer c.Unlock()

if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
fmt.Printf("#### 1o groupResource=%v currentReadyGeneration=%d, originalReadyGeneration=%d, ok=%v\n", c.groupResource, generation, readyGeneration, ok)
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
return
}

fmt.Printf("#### 1p groupResource=%v \n", c.groupResource)
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Update the bookMarkAfterResourceVersion
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
addedWatcher = true

fmt.Printf("#### 1q groupResource=%v \n", c.groupResource)
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
}
c.watcherIdx++

fmt.Printf("#### 1r groupResource=%v \n", c.groupResource)
}()
fmt.Printf("#### 1s groupResource=%v \n", c.groupResource)

if !addedWatcher {
fmt.Printf("#### 1x groupResource=%v returning the immediate closer thing\n", c.groupResource)
// Watcher isn't really started at this point, so it's safe to just drop it.
//
// We're simulating the immediate watch termination, which boils down to simply
// closing the watcher.
return newImmediateCloseWatcher(), nil
}

fmt.Printf("#### 1y groupResource=%v \n", c.groupResource)
go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)
return watcher, nil
}
Expand Down Expand Up @@ -1333,13 +1349,18 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
c.Lock()
defer c.Unlock()

fmt.Printf("#### 3a groupResource=%v \n", c.groupResource)

w.setDrainInputBufferLocked(drainWatcher)
fmt.Printf("#### 3b groupResource=%v \n", c.groupResource)

// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
fmt.Printf("#### 3c groupResource=%v \n", c.groupResource)
c.stopWatcherLocked(w)
fmt.Printf("#### 3d groupResource=%v \n", c.groupResource)
}
}

Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (r *ready) checkAndReadGeneration() (int, bool) {
func (r *ready) set(ok bool) {
r.lock.Lock()
defer r.lock.Unlock()

if r.state == Stopped {
return
}
Expand Down