Skip to content

Commit

Permalink
Merge pull request #13200 from danwinship/ovs-flake-1.5
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Mar 3, 2017
2 parents 1681de3 + 876affc commit 90fb864
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 51 deletions.
9 changes: 6 additions & 3 deletions pkg/sdn/plugin/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package plugin
import (
"fmt"
"net"
"sort"
"strconv"
"strings"

log "github.com/golang/glog"

Expand Down Expand Up @@ -266,13 +268,14 @@ func (plugin *OsdnNode) updateVXLANMulticastRules(subnets hostSubnetMap) {
otx := plugin.ovs.NewTransaction()

// Build the list of all nodes for multicast forwarding
tun_dsts := ""
tun_dsts := make([]string, 0, len(subnets))
for _, subnet := range subnets {
if subnet.HostIP != plugin.localIP {
tun_dsts += fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP)
tun_dsts = append(tun_dsts, fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP))
}
}
otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", tun_dsts)
sort.Strings(sort.StringSlice(tun_dsts))
otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", strings.Join(tun_dsts, ""))

if err := otx.EndTransaction(); err != nil {
log.Errorf("Error updating OVS VXLAN multicast flows: %v", err)
Expand Down
149 changes: 101 additions & 48 deletions test/extended/networking/ovs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package networking

import (
"fmt"
"net"
"reflect"
"regexp"
Expand All @@ -12,6 +13,7 @@ import (

kapi "k8s.io/kubernetes/pkg/api"
kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned"
utilwait "k8s.io/kubernetes/pkg/util/wait"
e2e "k8s.io/kubernetes/test/e2e/framework"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -40,27 +42,20 @@ var _ = Describe("[networking] OVS", func() {
ipPort := e2e.LaunchWebserverPod(f1, podName, deployNodeName)
ip := strings.Split(ipPort, ":")[0]

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for _, node := range nodes.Items {
if node.Name != deployNodeName {
Expect(reflect.DeepEqual(origFlows[node.Name], newFlows[node.Name])).To(BeTrue(), "Flows on non-deployed-to nodes should be unchanged")
}
}

foundPodFlow := false
for _, flow := range newFlows[deployNodeName] {
if strings.Contains(flow, "="+ip+",") || strings.Contains(flow, "="+ip+" ") {
foundPodFlow = true
break
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
if nodeName == deployNodeName {
return findFlowOrError("Should have flows referring to pod IP address", newFlows, ip)
} else {
return matchFlowsOrError("Flows on non-deployed-to nodes should be unchanged", newFlows, origFlows[nodeName])
}
}
Expect(foundPodFlow).To(BeTrue(), "Should have flows referring to pod IP address")
})

err := f1.ClientSet.Core().Pods(f1.Namespace.Name).Delete(podName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

postDeleteFlows := getFlowsForNode(oc, deployNodeName)
Expect(reflect.DeepEqual(origFlows[deployNodeName], postDeleteFlows)).To(BeTrue(), "Flows after deleting pod should be same as before creating it")
checkFlowsForNode(oc, deployNodeName, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting pod should be same as before creating it", flows, origFlows[nodeName])
})
})

It("should add and remove flows when nodes are added and removed", func() {
Expand Down Expand Up @@ -122,17 +117,9 @@ var _ = Describe("[networking] OVS", func() {
}
Expect(err).NotTo(HaveOccurred())

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for nodeName := range newFlows {
foundNodeFlow := false
for _, flow := range newFlows[nodeName] {
if strings.Contains(flow, "="+newNodeIP+",") || strings.Contains(flow, "="+newNodeIP+" ") {
foundNodeFlow = true
break
}
}
Expect(foundNodeFlow).To(BeTrue(), "Should have flows referring to node IP address")
}
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
return findFlowOrError("Should have flows referring to node IP address", newFlows, newNodeIP)
})

err = f1.ClientSet.Core().Nodes().Delete(node.Name, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -145,8 +132,9 @@ var _ = Describe("[networking] OVS", func() {
}
Expect(err).NotTo(BeNil())

postDeleteFlows := getFlowsForAllNodes(oc, nodes.Items)
Expect(reflect.DeepEqual(origFlows, postDeleteFlows)).To(BeTrue(), "Flows after deleting node should be same as before creating it")
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting node should be same as before creating it", flows, origFlows[nodeName])
})
})
})

Expand All @@ -163,29 +151,49 @@ var _ = Describe("[networking] OVS", func() {
ipPort := launchWebserverService(f1, serviceName, deployNodeName)
ip := strings.Split(ipPort, ":")[0]

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for _, node := range nodes.Items {
foundServiceFlow := false
for _, flow := range newFlows[node.Name] {
if strings.Contains(flow, "nw_dst="+ip+",") || strings.Contains(flow, "nw_dst="+ip+" ") {
foundServiceFlow = true
break
}
}
Expect(foundServiceFlow).To(BeTrue(), "Each node contains a rule for the service")
}
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
return findFlowOrError("Should have flows referring to service IP address", newFlows, ip)
})

err := f1.ClientSet.Core().Pods(f1.Namespace.Name).Delete(serviceName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
err = f1.ClientSet.Core().Services(f1.Namespace.Name).Delete(serviceName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

postDeleteFlows := getFlowsForAllNodes(oc, nodes.Items)
Expect(reflect.DeepEqual(origFlows, postDeleteFlows)).To(BeTrue(), "Flows after deleting service should be same as before creating it")
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting service should be same as before creating it", flows, origFlows[nodeName])
})
})
})
})

type FlowError struct {
msg string
flows []string
expected []string
}

func (err FlowError) Error() string {
return err.msg
}

func matchFlowsOrError(msg string, flows, expected []string) error {
if reflect.DeepEqual(flows, expected) {
return nil
} else {
return FlowError{msg, flows, expected}
}
}

func findFlowOrError(msg string, flows []string, ip string) error {
for _, flow := range flows {
if strings.Contains(flow, "="+ip+",") || strings.Contains(flow, "="+ip+" ") {
return nil
}
}
return FlowError{fmt.Sprintf("%s (%s)", msg, ip), flows, nil}
}

func doGetFlowsForNode(oc *testexutil.CLI, nodeName string) ([]string, error) {
pod := &kapi.Pod{
TypeMeta: kapiunversioned.TypeMeta{
Expand Down Expand Up @@ -252,12 +260,6 @@ func doGetFlowsForNode(oc *testexutil.CLI, nodeName string) ([]string, error) {
return flows, nil
}

func getFlowsForNode(oc *testexutil.CLI, nodeName string) []string {
flows, err := doGetFlowsForNode(oc, nodeName)
expectNoError(err)
return flows
}

func getFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node) map[string][]string {
var err error
flows := make(map[string][]string, len(nodes))
Expand All @@ -267,3 +269,54 @@ func getFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node) map[string][]str
}
return flows
}

type CheckFlowFunc func(nodeName string, flows []string) error

var checkFlowBackoff = utilwait.Backoff{
Duration: time.Second,
Factor: 2,
Steps: 5,
}

func checkFlowsForNode(oc *testexutil.CLI, nodeName string, checkFlow CheckFlowFunc) {
var lastCheckErr error
e2e.Logf("Checking OVS flows for node %q up to %d times", nodeName, checkFlowBackoff.Steps)
err := utilwait.ExponentialBackoff(checkFlowBackoff, func() (bool, error) {
flows, err := doGetFlowsForNode(oc, nodeName)
if err != nil {
return false, err
}
if lastCheckErr = checkFlow(nodeName, flows); lastCheckErr != nil {
e2e.Logf("Check failed (%v)", lastCheckErr)
return false, nil
}
return true, nil
})
if err != nil && lastCheckErr != nil {
err = lastCheckErr
}
expectNoError(err)
}

func checkFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node, checkFlow CheckFlowFunc) {
var lastCheckErr error
e2e.Logf("Checking OVS flows for all nodes up to %d times", checkFlowBackoff.Steps)
err := utilwait.ExponentialBackoff(checkFlowBackoff, func() (bool, error) {
lastCheckErr = nil
for _, node := range nodes {
flows, err := doGetFlowsForNode(oc, node.Name)
if err != nil {
return false, err
}
if lastCheckErr = checkFlow(node.Name, flows); lastCheckErr != nil {
e2e.Logf("Check failed for node %q (%v)", node.Name, lastCheckErr)
return false, nil
}
}
return true, nil
})
if err != nil && lastCheckErr != nil {
err = lastCheckErr
}
expectNoError(err)
}

0 comments on commit 90fb864

Please sign in to comment.