Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OVS test flake on release-1.5 #13200

Merged
merged 2 commits into from
Mar 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/sdn/plugin/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package plugin
import (
"fmt"
"net"
"sort"
"strconv"
"strings"

log "github.com/golang/glog"

Expand Down Expand Up @@ -266,13 +268,14 @@ func (plugin *OsdnNode) updateVXLANMulticastRules(subnets hostSubnetMap) {
otx := plugin.ovs.NewTransaction()

// Build the list of all nodes for multicast forwarding
tun_dsts := ""
tun_dsts := make([]string, 0, len(subnets))
for _, subnet := range subnets {
if subnet.HostIP != plugin.localIP {
tun_dsts += fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP)
tun_dsts = append(tun_dsts, fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP))
}
}
otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", tun_dsts)
sort.Strings(sort.StringSlice(tun_dsts))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need the sort.StringSlice() ? Doesn't sort.Strings() take a []string ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. However, this is a cherry-pick from master, so we should keep it the same. (Or fix it there as well I guess.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I say send a PR to fix in master and we'll allow the double casting since it apparently doesn't hurt much.

otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", strings.Join(tun_dsts, ""))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are going to keep each string in the slice why not handle the commas in the strings.Join() and make the fmt a little less finiky? (Not that the fmt is wrong)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't want "a comma between each element", we want "a comma before each element". If you look at how the tun_dsts get fit into the rest of the flow string, the string has to be "" when there are no tun_dsts, but it has to start with "," otherwise.


if err := otx.EndTransaction(); err != nil {
log.Errorf("Error updating OVS VXLAN multicast flows: %v", err)
Expand Down
149 changes: 101 additions & 48 deletions test/extended/networking/ovs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package networking

import (
"fmt"
"net"
"reflect"
"regexp"
Expand All @@ -12,6 +13,7 @@ import (

kapi "k8s.io/kubernetes/pkg/api"
kapiunversioned "k8s.io/kubernetes/pkg/api/unversioned"
utilwait "k8s.io/kubernetes/pkg/util/wait"
e2e "k8s.io/kubernetes/test/e2e/framework"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -40,27 +42,20 @@ var _ = Describe("[networking] OVS", func() {
ipPort := e2e.LaunchWebserverPod(f1, podName, deployNodeName)
ip := strings.Split(ipPort, ":")[0]

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for _, node := range nodes.Items {
if node.Name != deployNodeName {
Expect(reflect.DeepEqual(origFlows[node.Name], newFlows[node.Name])).To(BeTrue(), "Flows on non-deployed-to nodes should be unchanged")
}
}

foundPodFlow := false
for _, flow := range newFlows[deployNodeName] {
if strings.Contains(flow, "="+ip+",") || strings.Contains(flow, "="+ip+" ") {
foundPodFlow = true
break
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
if nodeName == deployNodeName {
return findFlowOrError("Should have flows referring to pod IP address", newFlows, ip)
} else {
return matchFlowsOrError("Flows on non-deployed-to nodes should be unchanged", newFlows, origFlows[nodeName])
}
}
Expect(foundPodFlow).To(BeTrue(), "Should have flows referring to pod IP address")
})

err := f1.ClientSet.Core().Pods(f1.Namespace.Name).Delete(podName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

postDeleteFlows := getFlowsForNode(oc, deployNodeName)
Expect(reflect.DeepEqual(origFlows[deployNodeName], postDeleteFlows)).To(BeTrue(), "Flows after deleting pod should be same as before creating it")
checkFlowsForNode(oc, deployNodeName, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting pod should be same as before creating it", flows, origFlows[nodeName])
})
})

It("should add and remove flows when nodes are added and removed", func() {
Expand Down Expand Up @@ -122,17 +117,9 @@ var _ = Describe("[networking] OVS", func() {
}
Expect(err).NotTo(HaveOccurred())

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for nodeName := range newFlows {
foundNodeFlow := false
for _, flow := range newFlows[nodeName] {
if strings.Contains(flow, "="+newNodeIP+",") || strings.Contains(flow, "="+newNodeIP+" ") {
foundNodeFlow = true
break
}
}
Expect(foundNodeFlow).To(BeTrue(), "Should have flows referring to node IP address")
}
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
return findFlowOrError("Should have flows referring to node IP address", newFlows, newNodeIP)
})

err = f1.ClientSet.Core().Nodes().Delete(node.Name, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -145,8 +132,9 @@ var _ = Describe("[networking] OVS", func() {
}
Expect(err).NotTo(BeNil())

postDeleteFlows := getFlowsForAllNodes(oc, nodes.Items)
Expect(reflect.DeepEqual(origFlows, postDeleteFlows)).To(BeTrue(), "Flows after deleting node should be same as before creating it")
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting node should be same as before creating it", flows, origFlows[nodeName])
})
})
})

Expand All @@ -163,29 +151,49 @@ var _ = Describe("[networking] OVS", func() {
ipPort := launchWebserverService(f1, serviceName, deployNodeName)
ip := strings.Split(ipPort, ":")[0]

newFlows := getFlowsForAllNodes(oc, nodes.Items)
for _, node := range nodes.Items {
foundServiceFlow := false
for _, flow := range newFlows[node.Name] {
if strings.Contains(flow, "nw_dst="+ip+",") || strings.Contains(flow, "nw_dst="+ip+" ") {
foundServiceFlow = true
break
}
}
Expect(foundServiceFlow).To(BeTrue(), "Each node contains a rule for the service")
}
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, newFlows []string) error {
return findFlowOrError("Should have flows referring to service IP address", newFlows, ip)
})

err := f1.ClientSet.Core().Pods(f1.Namespace.Name).Delete(serviceName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
err = f1.ClientSet.Core().Services(f1.Namespace.Name).Delete(serviceName, &kapi.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

postDeleteFlows := getFlowsForAllNodes(oc, nodes.Items)
Expect(reflect.DeepEqual(origFlows, postDeleteFlows)).To(BeTrue(), "Flows after deleting service should be same as before creating it")
checkFlowsForAllNodes(oc, nodes.Items, func(nodeName string, flows []string) error {
return matchFlowsOrError("Flows after deleting service should be same as before creating it", flows, origFlows[nodeName])
})
})
})
})

type FlowError struct {
msg string
flows []string
expected []string
}

func (err FlowError) Error() string {
return err.msg
}

func matchFlowsOrError(msg string, flows, expected []string) error {
if reflect.DeepEqual(flows, expected) {
return nil
} else {
return FlowError{msg, flows, expected}
}
}

func findFlowOrError(msg string, flows []string, ip string) error {
for _, flow := range flows {
if strings.Contains(flow, "="+ip+",") || strings.Contains(flow, "="+ip+" ") {
return nil
}
}
return FlowError{fmt.Sprintf("%s (%s)", msg, ip), flows, nil}
}

func doGetFlowsForNode(oc *testexutil.CLI, nodeName string) ([]string, error) {
pod := &kapi.Pod{
TypeMeta: kapiunversioned.TypeMeta{
Expand Down Expand Up @@ -252,12 +260,6 @@ func doGetFlowsForNode(oc *testexutil.CLI, nodeName string) ([]string, error) {
return flows, nil
}

func getFlowsForNode(oc *testexutil.CLI, nodeName string) []string {
flows, err := doGetFlowsForNode(oc, nodeName)
expectNoError(err)
return flows
}

func getFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node) map[string][]string {
var err error
flows := make(map[string][]string, len(nodes))
Expand All @@ -267,3 +269,54 @@ func getFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node) map[string][]str
}
return flows
}

type CheckFlowFunc func(nodeName string, flows []string) error

var checkFlowBackoff = utilwait.Backoff{
Duration: time.Second,
Factor: 2,
Steps: 5,
}

func checkFlowsForNode(oc *testexutil.CLI, nodeName string, checkFlow CheckFlowFunc) {
var lastCheckErr error
e2e.Logf("Checking OVS flows for node %q up to %d times", nodeName, checkFlowBackoff.Steps)
err := utilwait.ExponentialBackoff(checkFlowBackoff, func() (bool, error) {
flows, err := doGetFlowsForNode(oc, nodeName)
if err != nil {
return false, err
}
if lastCheckErr = checkFlow(nodeName, flows); lastCheckErr != nil {
e2e.Logf("Check failed (%v)", lastCheckErr)
return false, nil
}
return true, nil
})
if err != nil && lastCheckErr != nil {
err = lastCheckErr
}
expectNoError(err)
}

func checkFlowsForAllNodes(oc *testexutil.CLI, nodes []kapi.Node, checkFlow CheckFlowFunc) {
var lastCheckErr error
e2e.Logf("Checking OVS flows for all nodes up to %d times", checkFlowBackoff.Steps)
err := utilwait.ExponentialBackoff(checkFlowBackoff, func() (bool, error) {
lastCheckErr = nil
for _, node := range nodes {
flows, err := doGetFlowsForNode(oc, node.Name)
if err != nil {
return false, err
}
if lastCheckErr = checkFlow(node.Name, flows); lastCheckErr != nil {
e2e.Logf("Check failed for node %q (%v)", node.Name, lastCheckErr)
return false, nil
}
}
return true, nil
})
if err != nil && lastCheckErr != nil {
err = lastCheckErr
}
expectNoError(err)
}