Skip to content

Commit

Permalink
make project watch work for namespace deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Jun 7, 2016
1 parent f6fd6e9 commit 4bf081b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 90 deletions.
38 changes: 17 additions & 21 deletions pkg/project/auth/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func (s *neverSkipSynchronizer) SkipSynchronize(prevState string, versionedObjec

// AuthorizationCache maintains a cache on the set of namespaces a user or group can access.
type AuthorizationCache struct {
// allKnownNamespaces we track all the known namespaces, so we can detect deletes.
// TODO remove this in favor of a list/watch mechanism for projects
allKnownNamespaces sets.String
namespaceStore cache.Store
namespaceInterface kclient.NamespaceInterface
lastSyncResourceVersioner LastSyncResourceVersioner
Expand Down Expand Up @@ -132,6 +135,7 @@ type AuthorizationCache struct {
// NewAuthorizationCache creates a new AuthorizationCache
func NewAuthorizationCache(reviewer Reviewer, namespaceInterface kclient.NamespaceInterface, policyClient policyclient.ReadOnlyPolicyClient) *AuthorizationCache {
result := &AuthorizationCache{
allKnownNamespaces: sets.String{},
namespaceStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
namespaceInterface: namespaceInterface,
lastSyncResourceVersioner: &unchangingLastSyncResourceVersioner{},
Expand Down Expand Up @@ -203,7 +207,7 @@ func (ac *AuthorizationCache) RemoveWatcher(watcher CacheWatcher) {
}

// synchronizeNamespaces synchronizes access over each namespace and returns a set of namespace names that were looked at in last sync
func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) *sets.String {
func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) sets.String {
namespaceSet := sets.NewString()
items := ac.namespaceStore.List()
for i := range items {
Expand All @@ -217,7 +221,7 @@ func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache
utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err))
}
}
return &namespaceSet
return namespaceSet
}

// synchronizePolicies synchronizes access over each policy
Expand Down Expand Up @@ -257,16 +261,20 @@ func (ac *AuthorizationCache) synchronizePolicyBindings(userSubjectRecordStore c
}

// purgeDeletedNamespaces will remove all namespaces enumerated in a reviewRecordStore that are not in the namespace set
func purgeDeletedNamespaces(namespaceSet *sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
func (ac *AuthorizationCache) purgeDeletedNamespaces(oldNamespaces, newNamespaces sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
reviewRecordItems := reviewRecordStore.List()
for i := range reviewRecordItems {
reviewRecord := reviewRecordItems[i].(*reviewRecord)
if !namespaceSet.Has(reviewRecord.namespace) {
if !newNamespaces.Has(reviewRecord.namespace) {
deleteNamespaceFromSubjects(userSubjectRecordStore, reviewRecord.users, reviewRecord.namespace)
deleteNamespaceFromSubjects(groupSubjectRecordStore, reviewRecord.groups, reviewRecord.namespace)
reviewRecordStore.Delete(reviewRecord)
}
}

for namespace := range oldNamespaces.Difference(newNamespaces) {
ac.notifyWatchers(namespace, nil, sets.String{}, sets.String{})
}
}

// invalidateCache returns true if there was a change in the cluster namespace that holds cluster policy and policy bindings
Expand Down Expand Up @@ -327,17 +335,18 @@ func (ac *AuthorizationCache) synchronize() {
}

// iterate over caches and synchronize our three caches
namespaceSet := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
newKnownNamespaces := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.synchronizePolicies(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.synchronizePolicyBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
purgeDeletedNamespaces(namespaceSet, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)

// if we did a full rebuild, now we swap the fully rebuilt cache
if invalidateCache {
ac.userSubjectRecordStore = userSubjectRecordStore
ac.groupSubjectRecordStore = groupSubjectRecordStore
ac.reviewRecordStore = reviewRecordStore
}
ac.allKnownNamespaces = newKnownNamespaces

// we were able to update our cache since this last observation period
ac.lastState = currentState
Expand Down Expand Up @@ -486,24 +495,11 @@ func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, n
}
}

func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, latestUsers, latestGroups sets.String) {
existingGroups := sets.String{}
existingUsers := sets.String{}
if exists != nil {
existingGroups = sets.NewString(exists.groups...)
existingUsers = sets.NewString(exists.users...)
}

// calculate once to avoid fanning out.
removedUsers := existingUsers.Difference(latestUsers)
removedGroups := existingGroups.Difference(latestGroups)
addedUsers := latestUsers.Difference(existingUsers)
addedGroups := latestGroups.Difference(existingGroups)

func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, users, groups sets.String) {
ac.watcherLock.Lock()
defer ac.watcherLock.Unlock()
for _, watcher := range ac.watchers {
watcher.GroupMembershipChanged(namespace, latestUsers, latestGroups, removedUsers, removedGroups, addedUsers, addedGroups)
watcher.GroupMembershipChanged(namespace, users, groups)
}
}

Expand Down
15 changes: 7 additions & 8 deletions pkg/project/auth/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package auth

import (
"errors"
"fmt"
"sync"

"github.com/golang/glog"
Expand All @@ -21,7 +22,7 @@ import (
type CacheWatcher interface {
// GroupMembershipChanged is called serially for all changes for all watchers. This method MUST NOT BLOCK.
// The serial nature makes reasoning about the code easy, but if you block in this method you will doom all watchers.
GroupMembershipChanged(namespaceName string, latestUsers, latestGroups, removedUsers, removedGroups, addedUsers, addedGroups sets.String)
GroupMembershipChanged(namespaceName string, users, groups sets.String)
}

type WatchableCache interface {
Expand Down Expand Up @@ -106,15 +107,13 @@ func NewUserProjectWatcher(username string, groups []string, projectCache *proje
return w
}

func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, latestUsers, lastestGroups, removedUsers, removedGroups, addedUsers, addedGroups sets.String) {
hasAccess := latestUsers.Has(w.username) || lastestGroups.HasAny(w.groups...)
removed := !hasAccess && (removedUsers.Has(w.username) || removedGroups.HasAny(w.groups...))
func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users, groups sets.String) {
hasAccess := users.Has(w.username) || groups.HasAny(w.groups...)
_, known := w.knownProjects[namespaceName]

switch {
case removed:
if _, known := w.knownProjects[namespaceName]; !known {
return
}
// this means that we were removed from the project
case !hasAccess && known:
delete(w.knownProjects, namespaceName)

select {
Expand Down
14 changes: 7 additions & 7 deletions pkg/project/auth/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestFullIncoming(t *testing.T) {
watcher.cacheIncoming <- watch.Event{Type: watch.Added}

// this call should not block and we should see a failure
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("bob"), sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
if len(fakeAuthCache.removed) != 1 {
t.Errorf("should have removed self")
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestAddModifyDeleteEventsByUser(t *testing.T) {
watcher, _ := newTestWatcher("bob", nil, newNamespaces("ns-01")...)
go watcher.Watch()

watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("bob"), sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Added {
Expand All @@ -117,14 +117,14 @@ func TestAddModifyDeleteEventsByUser(t *testing.T) {
}

// the object didn't change, we shouldn't observe it
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.String{}, sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
select {
case event := <-watcher.ResultChan():
t.Fatalf("unexpected event %v", event)
case <-time.After(3 * time.Second):
}

watcher.GroupMembershipChanged("ns-01", sets.NewString("alice"), sets.String{}, sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.NewString("alice"), sets.String{})
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Deleted {
Expand All @@ -142,7 +142,7 @@ func TestAddModifyDeleteEventsByGroup(t *testing.T) {
watcher, _ := newTestWatcher("bob", []string{"group-one"}, newNamespaces("ns-01")...)
go watcher.Watch()

watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{}, sets.String{}, sets.NewString("group-one"))
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"))
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Added {
Expand All @@ -156,14 +156,14 @@ func TestAddModifyDeleteEventsByGroup(t *testing.T) {
}

// the object didn't change, we shouldn't observe it
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{}, sets.String{}, sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"))
select {
case event := <-watcher.ResultChan():
t.Fatalf("unexpected event %v", event)
case <-time.After(3 * time.Second):
}

watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-two"), sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{})
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-two"))
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Deleted {
Expand Down
100 changes: 46 additions & 54 deletions test/integration/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestProjectMustExist(t *testing.T) {

func TestProjectWatch(t *testing.T) {
testutil.RequireEtcd(t)
_, clusterAdminKubeConfig, err := testserver.StartTestMasterAPI()
_, clusterAdminKubeConfig, err := testserver.StartTestMaster()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -181,21 +181,9 @@ func TestProjectWatch(t *testing.T) {
if _, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-01", "bob"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
waitForAdd("ns-01", w, t)

select {
case event := <-w.ResultChan():
if event.Type != watch.Added {
t.Errorf("expected added, got %v", event)
}
project := event.Object.(*projectapi.Project)
if project.Name != "ns-01" {
t.Fatalf("expected %v, got %#v", "ns-01", project)
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

// TEST FOR ADD/REMOVE ACCESS
joeClient, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-02", "joe")
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -209,58 +197,31 @@ func TestProjectWatch(t *testing.T) {
if err := addBob.AddRole(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// wait for the add
for {
select {
case event := <-w.ResultChan():
project := event.Object.(*projectapi.Project)
t.Logf("got %#v %#v", event, project)
if event.Type == watch.Added && project.Name == "ns-02" {
return
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}
}
waitForAdd("ns-02", w, t)

if err := addBob.RemoveRole(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
waitForDelete("ns-02", w, t)

// wait for the delete
for {
select {
case event := <-w.ResultChan():
project := event.Object.(*projectapi.Project)
t.Logf("got %#v %#v", event, project)
if event.Type == watch.Deleted && project.Name == "ns-02" {
return
}
// TEST FOR DELETE PROJECT
if _, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-03", "bob"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
waitForAdd("ns-03", w, t)

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}
if err := bobClient.Projects().Delete("ns-03"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// wait for the delete
waitForDelete("ns-03", w, t)

// test the "start from beginning watch"
beginningWatch, err := bobClient.Projects().Watch(kapi.ListOptions{ResourceVersion: "0"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
select {
case event := <-beginningWatch.ResultChan():
if event.Type != watch.Added {
t.Errorf("expected added, got %v", event)
}
project := event.Object.(*projectapi.Project)
if project.Name != "ns-01" {
t.Fatalf("expected %v, got %#v", "ns-01", project)
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}
waitForAdd("ns-01", beginningWatch, t)

fromNowWatch, err := bobClient.Projects().Watch(kapi.ListOptions{})
if err != nil {
Expand All @@ -274,3 +235,34 @@ func TestProjectWatch(t *testing.T) {
}

}

func waitForDelete(projectName string, w watch.Interface, t *testing.T) {
for {
select {
case event := <-w.ResultChan():
project := event.Object.(*projectapi.Project)
t.Logf("got %#v %#v", event, project)
if event.Type == watch.Deleted && project.Name == projectName {
return
}

case <-time.After(30 * time.Second):
t.Fatalf("timeout: %v", projectName)
}
}
}
func waitForAdd(projectName string, w watch.Interface, t *testing.T) {
for {
select {
case event := <-w.ResultChan():
project := event.Object.(*projectapi.Project)
t.Logf("got %#v %#v", event, project)
if event.Type == watch.Added && project.Name == projectName {
return
}

case <-time.After(30 * time.Second):
t.Fatalf("timeout: %v", projectName)
}
}
}

0 comments on commit 4bf081b

Please sign in to comment.