diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/BUILD b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/BUILD index ec50e3eb8a75..8c2c78da06c5 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/BUILD +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/BUILD @@ -22,6 +22,7 @@ go_test( deps = [ "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index 9c7839e57ad4..9530ea1a7ce4 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -341,7 +341,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(c.copier, watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget) + watcher := newCacheWatcher(c.copier, watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -757,24 +757,26 @@ func (c *errWatcher) Stop() { // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex - copier runtime.ObjectCopier - input chan *watchCacheEvent - result chan watch.Event - done chan struct{} - filter watchFilterFunc - stopped bool - forget func(bool) + copier runtime.ObjectCopier + input chan *watchCacheEvent + result chan watch.Event + done chan struct{} + filter watchFilterFunc + stopped bool + forget func(bool) + versioner Versioner } -func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher { watcher := &cacheWatcher{ - copier: copier, - input: make(chan *watchCacheEvent, chanSize), - result: make(chan watch.Event, chanSize), - done: make(chan struct{}), - filter: filter, - stopped: false, - forget: forget, + copier: copier, + input: make(chan *watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + stopped: false, + forget: forget, + versioner: versioner, } go watcher.process(initEvents, resourceVersion) return watcher @@ -873,11 +875,15 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } watchEvent = watch.Event{Type: watch.Modified, Object: object} case !curObjPasses && oldObjPasses: + // return a delete event with the previous object content, but with the event's resource version object, err := c.copier.Copy(event.PrevObject) if err != nil { utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err)) return } + if err := c.versioner.UpdateObject(object, event.ResourceVersion); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, object, err)) + } watchEvent = watch.Event{Type: watch.Deleted, Object: object} } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index b8973d0b6858..b7021f2f665e 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -17,14 +17,18 @@ limitations under the License. package storage import ( + "fmt" "reflect" + "strconv" "sync" "testing" "time" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -49,7 +53,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w := newCacheWatcher(scheme.Scheme, 0, 0, initEvents, filter, forget) + w := newCacheWatcher(scheme.Scheme, 0, 0, initEvents, filter, forget, testVersioner{}) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -74,28 +78,31 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) { { events: []*watchCacheEvent{ { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 1, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, }, }, @@ -103,50 +110,56 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) { { events: []*watchCacheEvent{ { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 1, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 4, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 5, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 6, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, }, }, } @@ -158,7 +171,7 @@ TestCase: for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget) + w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget, testVersioner{}) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch @@ -176,3 +189,18 @@ TestCase: w.Stop() } } + +type testVersioner struct{} + +func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { + return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) +} +func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { + return fmt.Errorf("unimplemented") +} +func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { + return fmt.Errorf("unimplemented") +} +func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { + return 0, fmt.Errorf("unimplemented") +}