Skip to content

Commit

Permalink
Implement NetworkPolicies with PodSelectors
Browse files Browse the repository at this point in the history
  • Loading branch information
danwinship committed Jan 19, 2017
1 parent a26e3e3 commit 55ae2ce
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 17 deletions.
42 changes: 28 additions & 14 deletions pkg/sdn/plugin/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,22 @@ const (
NetworkPolicies ResourceName = "NetworkPolicies"
)

// Run event queue for the given resource. The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// to an object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas. The error is logged with call stack information.
func runEventQueueForResource(client kcache.Getter, resourceName ResourceName, expectedType interface{}, selector fields.Selector, process ProcessEventFunc) {
func newEventQueue(client kcache.Getter, resourceName ResourceName, expectedType interface{}, namespace string) *EventQueue {
rn := strings.ToLower(string(resourceName))
lw := kcache.NewListWatchFromClient(client, rn, kapi.NamespaceAll, selector)
lw := kcache.NewListWatchFromClient(client, rn, namespace, fields.Everything())
eventQueue := NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc)
// Repopulate event queue every 30 mins
// Existing items in the event queue will have watch.Modified event type
kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run()

// Run the queue
for {
eventQueue.Pop(process, expectedType)
}
return eventQueue
}

// Run event queue for the given resource.
// Run event queue for the given resource. The 'process' function is called
// repeatedly with each available cache.Delta that describes state changes
// to an object. If the process function returns an error queued changes
// for that object are dropped but processing continues with the next available
// object's cache.Deltas. The error is logged with call stack information.
//
// NOTE: this function will handle DeletedFinalStateUnknown delta objects
// automatically, which may not always be what you want since the now-deleted
// object may be stale.
Expand Down Expand Up @@ -145,5 +141,23 @@ func RunEventQueue(client kcache.Getter, resourceName ResourceName, process Proc
glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
}

runEventQueueForResource(client, resourceName, expectedType, fields.Everything(), process)
eventQueue := newEventQueue(client, resourceName, expectedType, kapi.NamespaceAll)
for {
eventQueue.Pop(process, expectedType)
}
}

func RunNamespacedPodEventQueue(client kcache.Getter, namespace string, closeChan chan struct{}, process ProcessEventFunc) {
eventQueue := newEventQueue(client, Pods, &kapi.Pod{}, namespace)
// Loop calling eventQueue.Pop() until closeChan is closed. process() will be called
// once after closeChan is closed; this possibility is unavoidable anyway due to race
// conditions.
for {
select {
case <-closeChan:
return
default:
eventQueue.Pop(process, &kapi.Pod{})
}
}
}
131 changes: 128 additions & 3 deletions pkg/sdn/plugin/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ type npNamespace struct {
inUse bool

policies map[ktypes.UID]*npPolicy

pods map[ktypes.UID]kapi.Pod
stopPodWatch chan struct{}
}

// npPolicy is a parsed version of a single NetworkPolicy object
type npPolicy struct {
policy extensions.NetworkPolicy
watchesNamespaces bool
watchesPods bool

flows []string
}
Expand Down Expand Up @@ -230,6 +234,92 @@ func (np *networkPolicyPlugin) UnrefVNID(vnid uint32) {
np.syncNamespace(npns)
}

// watchPods watches Pod changes in npns until stopPodWatch is triggered. pods
// and stopPodWatch are passed in as arguments rather than being read from npns
// because it's possible another thread will already have cancelled the watch
// (and changed the npns fields) before this function runs.
func (np *networkPolicyPlugin) watchPods(npns *npNamespace, pods map[ktypes.UID]kapi.Pod, stopPodWatch chan struct{}) {
RunNamespacedPodEventQueue(np.node.kClient.CoreClient.RESTClient(), npns.name, stopPodWatch, func(delta cache.Delta) error {
pod := delta.Object.(*kapi.Pod)
glog.V(5).Infof("Watch %s event for Pod %s/%s", delta.Type, pod.Namespace, pod.Name)

// We don't want to grab np.namespacesLock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := pods[pod.UID]
if pod.Status.PodIP == "" {
delta.Type = cache.Deleted
}
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return nil
}
case cache.Deleted:
if !podExisted {
return nil
}
}

glog.V(5).Infof("Re-checking policies after pod %s", delta.Type)
np.lock.Lock()
defer np.lock.Unlock()

// RunNamespacedPodEventQueue() will call this function at least once more
// after the watch is stopped, so verify that our watch is still running
// before changing anything.
if stopPodWatch != npns.stopPodWatch {
return nil
}

if delta.Type == cache.Deleted {
delete(pods, pod.UID)
} else {
pods[pod.UID] = *pod
}

changed := false
for _, npp := range npns.policies {
if npp.watchesPods {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
}
}
if changed {
np.syncNamespace(npns)
}

return nil
})
}

func (np *networkPolicyPlugin) podWatchUntilStopped(npns *npNamespace) {
pods := npns.pods
stop := npns.stopPodWatch
go utilwait.Until(func() { np.watchPods(npns, pods, stop) }, 0, stop)
}

func (np *networkPolicyPlugin) updatePodWatch(npns *npNamespace) {
watchesPods := false
for _, npp := range npns.policies {
if npp.watchesPods {
watchesPods = true
break
}
}

if watchesPods && (npns.stopPodWatch == nil) {
npns.pods = make(map[ktypes.UID]kapi.Pod)
npns.stopPodWatch = make(chan struct{})
np.podWatchUntilStopped(npns)
} else if !watchesPods && (npns.stopPodWatch != nil) {
close(npns.stopPodWatch)
npns.stopPodWatch = nil
npns.pods = nil
}
}

func (np *networkPolicyPlugin) selectNamespaces(lsel *unversioned.LabelSelector) []uint32 {
vnids := []uint32{}
sel, err := unversioned.LabelSelectorAsSelector(lsel)
Expand All @@ -248,10 +338,36 @@ func (np *networkPolicyPlugin) selectNamespaces(lsel *unversioned.LabelSelector)
return vnids
}

func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *unversioned.LabelSelector) []string {
ips := []string{}
sel, err := unversioned.LabelSelectorAsSelector(lsel)
if err != nil {
// Shouldn't happen
glog.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err)
return ips
}
for _, pod := range npns.pods {
if sel.Matches(labels.Set(pod.Labels)) {
ips = append(ips, pod.Status.PodIP)
}
}
return ips
}

func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *extensions.NetworkPolicy) (*npPolicy, error) {
npp := &npPolicy{policy: *policy}
allowAll := false

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesPods = true
for _, ip := range np.selectPods(npns, &policy.Spec.PodSelector) {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
}
} else {
destFlows = []string{""}
}

for _, rule := range policy.Spec.Ingress {
if len(rule.Ports) == 0 && len(rule.From) == 0 {
allowAll = true
Expand Down Expand Up @@ -294,6 +410,11 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *ext
if len(peer.PodSelector.MatchLabels) == 0 && len(peer.PodSelector.MatchExpressions) == 0 {
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
}
} else {
if len(peer.NamespaceSelector.MatchLabels) == 0 && len(peer.NamespaceSelector.MatchExpressions) == 0 {
Expand All @@ -308,9 +429,11 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *ext
}
}

for _, peerFlow := range peerFlows {
for _, portFlow := range portFlows {
npp.flows = append(npp.flows, fmt.Sprintf("%s%s", peerFlow, portFlow))
for _, destFlow := range destFlows {
for _, peerFlow := range peerFlows {
for _, portFlow := range portFlows {
npp.flows = append(npp.flows, fmt.Sprintf("%s%s%s", destFlow, peerFlow, portFlow))
}
}
}
}
Expand All @@ -333,6 +456,7 @@ func (np *networkPolicyPlugin) updateNetworkPolicy(npns *npNamespace, policy *ex

oldNPP, existed := npns.policies[policy.UID]
npns.policies[policy.UID] = npp
np.updatePodWatch(npns)

changed := !existed || !reflect.DeepEqual(oldNPP.flows, npp.flows)
if !changed {
Expand Down Expand Up @@ -369,6 +493,7 @@ func (np *networkPolicyPlugin) watchNetworkPolicies() {
delete(npns.policies, policy.UID)
np.syncNamespace(npns)
}

return nil
})
}
Expand Down

0 comments on commit 55ae2ce

Please sign in to comment.