From 0900e929f14dc540da86d0456fa66bc6264e6efe Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 6 Jun 2018 09:12:12 -0400 Subject: [PATCH 1/3] Simplify egress IP change tracking --- pkg/network/node/egressip.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/pkg/network/node/egressip.go b/pkg/network/node/egressip.go index 994631411a26..52ecf68032cb 100644 --- a/pkg/network/node/egressip.go +++ b/pkg/network/node/egressip.go @@ -53,8 +53,8 @@ type egressIPWatcher struct { namespacesByVNID map[uint32]*namespaceEgress egressIPs map[string]*egressIPInfo - changedEgressIPs []*egressIPInfo - changedNamespaces []*namespaceEgress + changedEgressIPs map[*egressIPInfo]bool + changedNamespaces map[*namespaceEgress]bool localEgressLink netlink.Link localEgressNet *net.IPNet @@ -70,6 +70,9 @@ func newEgressIPWatcher(oc *ovsController, localIP string, masqueradeBit *int32) nodesByNodeIP: make(map[string]*nodeEgress), namespacesByVNID: make(map[uint32]*namespaceEgress), egressIPs: make(map[string]*egressIPInfo), + + changedEgressIPs: make(map[*egressIPInfo]bool), + changedNamespaces: make(map[*namespaceEgress]bool), } if masqueradeBit != nil { eip.masqueradeBit = 1 << uint32(*masqueradeBit) @@ -114,9 +117,9 @@ func (eip *egressIPWatcher) ensureEgressIPInfo(egressIP string) *egressIPInfo { } func (eip *egressIPWatcher) egressIPChanged(eg *egressIPInfo) { - eip.changedEgressIPs = append(eip.changedEgressIPs, eg) + eip.changedEgressIPs[eg] = true for _, ns := range eg.namespaces { - eip.changedNamespaces = append(eip.changedNamespaces, ns) + eip.changedNamespaces[ns] = true } } @@ -285,28 +288,18 @@ func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) { } func (eip *egressIPWatcher) syncEgressIPs() { - changedEgressIPs := make(map[*egressIPInfo]bool) - for _, eg := range eip.changedEgressIPs { - changedEgressIPs[eg] = true - } - eip.changedEgressIPs = eip.changedEgressIPs[:0] - - changedNamespaces := make(map[*namespaceEgress]bool) - for _, ns := range eip.changedNamespaces { - changedNamespaces[ns] = true - } - eip.changedNamespaces = eip.changedNamespaces[:0] - - for eg := range changedEgressIPs { + for eg := range eip.changedEgressIPs { eip.syncEgressNodeState(eg) } + eip.changedEgressIPs = make(map[*egressIPInfo]bool) - for ns := range changedNamespaces { + for ns := range eip.changedNamespaces { err := eip.syncEgressNamespaceState(ns) if err != nil { utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules for VNID %d: %v", ns.vnid, err)) } } + eip.changedNamespaces = make(map[*namespaceEgress]bool) } func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) { From 6044355a8613d166ac5d8e1110a56e7ffb4384a8 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 1 May 2018 10:38:17 +0200 Subject: [PATCH 2/3] Basic high-availability for auto egress IPs If a namespace has multiple egress IPs, monitor egress traffic and switch to an alternate egress IP if the currently-selected one appears dead. --- pkg/network/node/egressip.go | 109 +++++++--- pkg/network/node/egressip_test.go | 77 +++---- pkg/network/node/vxlan_monitor.go | 265 +++++++++++++++++++++++++ pkg/network/node/vxlan_monitor_test.go | 153 ++++++++++++++ 4 files changed, 538 insertions(+), 66 deletions(-) create mode 100644 pkg/network/node/vxlan_monitor.go create mode 100644 pkg/network/node/vxlan_monitor_test.go diff --git a/pkg/network/node/egressip.go b/pkg/network/node/egressip.go index 52ecf68032cb..b010e2107de9 100644 --- a/pkg/network/node/egressip.go +++ b/pkg/network/node/egressip.go @@ -15,13 +15,16 @@ import ( networkapi "github.com/openshift/origin/pkg/network/apis/network" "github.com/openshift/origin/pkg/network/common" networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion" + "github.com/openshift/origin/pkg/util/netutils" "github.com/vishvananda/netlink" ) type nodeEgress struct { nodeIP string + sdnIP string requestedIPs sets.String + offline bool } type namespaceEgress struct { @@ -48,6 +51,7 @@ type egressIPWatcher struct { networkInformers networkinformers.SharedInformerFactory iptables *NodeIPTables + vxlanMonitor *egressVXLANMonitor nodesByNodeIP map[string]*nodeEgress namespacesByVNID map[uint32]*namespaceEgress @@ -90,6 +94,10 @@ func (eip *egressIPWatcher) Start(networkInformers networkinformers.SharedInform eip.networkInformers = networkInformers eip.iptables = iptables + updates := make(chan *egressVXLANNode) + eip.vxlanMonitor = newEgressVXLANMonitor(eip.oc.ovs, updates) + go eip.watchVXLAN(updates) + eip.watchHostSubnets() eip.watchNetNamespaces() return nil @@ -125,11 +133,7 @@ func (eip *egressIPWatcher) egressIPChanged(eg *egressIPInfo) { func (eip *egressIPWatcher) addNode(egressIP string, node *nodeEgress) { eg := eip.ensureEgressIPInfo(egressIP) - if len(eg.nodes) != 0 { - utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", eg.ip, node.nodeIP, eg.nodes[0].nodeIP)) - } eg.nodes = append(eg.nodes, node) - eip.egressIPChanged(eg) } @@ -150,11 +154,7 @@ func (eip *egressIPWatcher) deleteNode(egressIP string, node *nodeEgress) { func (eip *egressIPWatcher) addNamespace(egressIP string, ns *namespaceEgress) { eg := eip.ensureEgressIPInfo(egressIP) - if len(eg.namespaces) != 0 { - utilruntime.HandleError(fmt.Errorf("Multiple namespaces claiming EgressIP %q (NetIDs %d, %d)", eg.ip, ns.vnid, eg.namespaces[0].vnid)) - } eg.namespaces = append(eg.namespaces, ns) - eip.egressIPChanged(eg) } @@ -182,17 +182,23 @@ func (eip *egressIPWatcher) handleAddOrUpdateHostSubnet(obj, _ interface{}, even hs := obj.(*networkapi.HostSubnet) glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name) - eip.updateNodeEgress(hs.HostIP, hs.EgressIPs) + _, cidr, err := net.ParseCIDR(hs.Subnet) + if err != nil { + utilruntime.HandleError(fmt.Errorf("could not parse HostSubnet %q CIDR: %v", hs.Name, err)) + } + sdnIP := netutils.GenerateDefaultGateway(cidr).String() + + eip.updateNodeEgress(hs.HostIP, sdnIP, hs.EgressIPs) } func (eip *egressIPWatcher) handleDeleteHostSubnet(obj interface{}) { hs := obj.(*networkapi.HostSubnet) glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name) - eip.updateNodeEgress(hs.HostIP, nil) + eip.updateNodeEgress(hs.HostIP, "", nil) } -func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []string) { +func (eip *egressIPWatcher) updateNodeEgress(nodeIP, sdnIP string, nodeEgressIPs []string) { eip.Lock() defer eip.Unlock() @@ -203,11 +209,18 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri } node = &nodeEgress{ nodeIP: nodeIP, + sdnIP: sdnIP, requestedIPs: sets.NewString(), } eip.nodesByNodeIP[nodeIP] = node + if eip.vxlanMonitor != nil && node.nodeIP != eip.localIP { + eip.vxlanMonitor.AddNode(node.nodeIP, node.sdnIP) + } } else if len(nodeEgressIPs) == 0 { delete(eip.nodesByNodeIP, nodeIP) + if eip.vxlanMonitor != nil { + eip.vxlanMonitor.RemoveNode(node.nodeIP) + } } oldRequestedIPs := node.requestedIPs node.requestedIPs = sets.NewString(nodeEgressIPs...) @@ -232,11 +245,6 @@ func (eip *egressIPWatcher) handleAddOrUpdateNetNamespace(obj, _ interface{}, ev netns := obj.(*networkapi.NetNamespace) glog.V(5).Infof("Watch %s event for NetNamespace %q", eventType, netns.Name) - if len(netns.EgressIPs) != 0 { - if len(netns.EgressIPs) > 1 { - glog.Warningf("Ignoring extra EgressIPs (%v) in NetNamespace %q", netns.EgressIPs[1:], netns.Name) - } - } eip.updateNamespaceEgress(netns.NetID, netns.EgressIPs) } @@ -287,9 +295,32 @@ func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) { eip.updateNamespaceEgress(vnid, nil) } +func (eip *egressIPWatcher) egressIPActive(eg *egressIPInfo) (bool, error) { + if len(eg.nodes) == 0 || len(eg.namespaces) == 0 { + return false, nil + } + if len(eg.nodes) > 1 { + return false, fmt.Errorf("Multiple nodes (%s, %s) claiming EgressIP %s", eg.nodes[0].nodeIP, eg.nodes[1].nodeIP, eg.ip) + } + if len(eg.namespaces) > 1 { + return false, fmt.Errorf("Multiple namespaces (%d, %d) claiming EgressIP %s", eg.namespaces[0].vnid, eg.namespaces[1].vnid, eg.ip) + } + for _, ip := range eg.namespaces[0].requestedIPs { + eg2 := eip.egressIPs[ip] + if eg2 != eg && len(eg2.nodes) == 1 && eg2.nodes[0] == eg.nodes[0] { + return false, fmt.Errorf("Multiple EgressIPs (%s, %s) for VNID %d on node %s", eg.ip, eg2.ip, eg.namespaces[0].vnid, eg.nodes[0].nodeIP) + } + } + return true, nil +} + func (eip *egressIPWatcher) syncEgressIPs() { for eg := range eip.changedEgressIPs { - eip.syncEgressNodeState(eg) + active, err := eip.egressIPActive(eg) + if err != nil { + utilruntime.HandleError(err) + } + eip.syncEgressNodeState(eg, active) } eip.changedEgressIPs = make(map[*egressIPInfo]bool) @@ -302,12 +333,8 @@ func (eip *egressIPWatcher) syncEgressIPs() { eip.changedNamespaces = make(map[*namespaceEgress]bool) } -func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) { - // The egressIPInfo should have an assigned node IP if and only if the - // egress IP is active (ie, it is assigned to exactly 1 node and exactly - // 1 namespace, and it's the first one for its namespace). - egressIPActive := (len(eg.nodes) == 1 && len(eg.namespaces) == 1 && eg.ip == eg.namespaces[0].requestedIPs[0]) - if egressIPActive && eg.assignedNodeIP != eg.nodes[0].nodeIP { +func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo, active bool) { + if active && eg.assignedNodeIP != eg.nodes[0].nodeIP { glog.V(4).Infof("Assigning egress IP %s to node %s", eg.ip, eg.nodes[0].nodeIP) eg.assignedNodeIP = eg.nodes[0].nodeIP eg.assignedIPTablesMark = getMarkForVNID(eg.namespaces[0].vnid, eip.masqueradeBit) @@ -317,7 +344,7 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) { eg.assignedNodeIP = "" } } - } else if !egressIPActive && eg.assignedNodeIP != "" { + } else if !active && eg.assignedNodeIP != "" { glog.V(4).Infof("Removing egress IP %s from node %s", eg.ip, eg.assignedNodeIP) if eg.assignedNodeIP == eip.localIP { if err := eip.releaseEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil { @@ -326,8 +353,6 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) { } eg.assignedNodeIP = "" eg.assignedIPTablesMark = "" - } else if !egressIPActive { - glog.V(4).Infof("Egress IP %s is not assignable (%d namespaces, %d nodes)", eg.ip, len(eg.namespaces), len(eg.nodes)) } } @@ -337,7 +362,7 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error } var active *egressIPInfo - for i, ip := range ns.requestedIPs { + for _, ip := range ns.requestedIPs { eg := eip.egressIPs[ip] if eg == nil { continue @@ -347,9 +372,11 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error glog.V(4).Infof("VNID %d gets no egress due to multiply-assigned egress IP %s", ns.vnid, eg.ip) break } - if active == nil && i == 0 { + if active == nil { if eg.assignedNodeIP == "" { glog.V(4).Infof("VNID %d cannot use unassigned egress IP %s", ns.vnid, eg.ip) + } else if len(ns.requestedIPs) > 1 && eg.nodes[0].offline { + glog.V(4).Infof("VNID %d cannot use egress IP %s on offline node %s", ns.vnid, eg.ip, eg.assignedNodeIP) } else { active = eg } @@ -429,3 +456,29 @@ func (eip *egressIPWatcher) releaseEgressIP(egressIP, mark string) error { return nil } + +func (eip *egressIPWatcher) watchVXLAN(updates chan *egressVXLANNode) { + for node := range updates { + eip.updateNode(node.nodeIP, node.offline) + } +} + +func (eip *egressIPWatcher) updateNode(nodeIP string, offline bool) { + eip.Lock() + defer eip.Unlock() + + node := eip.nodesByNodeIP[nodeIP] + if node == nil { + eip.vxlanMonitor.RemoveNode(nodeIP) + return + } + + node.offline = offline + for _, ip := range node.requestedIPs.UnsortedList() { + eg := eip.egressIPs[ip] + if eg != nil { + eip.egressIPChanged(eg) + } + } + eip.syncEgressIPs() +} diff --git a/pkg/network/node/egressip_test.go b/pkg/network/node/egressip_test.go index bc9c3fa63ba9..6d28b5295bc6 100644 --- a/pkg/network/node/egressip_test.go +++ b/pkg/network/node/egressip_test.go @@ -142,8 +142,8 @@ func setupEgressIPWatcher(t *testing.T) (*egressIPWatcher, []string) { func TestEgressIP(t *testing.T) { eip, flows := setupEgressIPWatcher(t) - eip.updateNodeEgress("172.17.0.3", []string{}) - eip.updateNodeEgress("172.17.0.4", []string{}) + eip.updateNodeEgress("172.17.0.3", "", []string{}) + eip.updateNodeEgress("172.17.0.4", "", []string{}) eip.deleteNamespaceEgress(42) eip.deleteNamespaceEgress(43) @@ -168,7 +168,7 @@ func TestEgressIP(t *testing.T) { t.Fatalf("%v", err) } - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) // Added .100 + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) // Added .100 err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -179,8 +179,8 @@ func TestEgressIP(t *testing.T) { } // Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a remote EgressIP - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.101", "172.17.0.100"}) // Added .101 - eip.updateNodeEgress("172.17.0.5", []string{"172.17.0.105"}) // Added .105 + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.101", "172.17.0.100"}) // Added .101 + eip.updateNodeEgress("172.17.0.5", "", []string{"172.17.0.105"}) // Added .105 err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -222,7 +222,7 @@ func TestEgressIP(t *testing.T) { t.Fatalf("%v", err) } - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102", "172.17.0.104"}) // Added .102, .104 + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102", "172.17.0.104"}) // Added .102, .104 err = assertNetlinkChange(eip, "claim 172.17.0.104") if err != nil { t.Fatalf("%v", err) @@ -245,7 +245,7 @@ func TestEgressIP(t *testing.T) { } // Assign HostSubnet.EgressIP first, then NetNamespace.EgressIP, with a local EgressIP - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102", "172.17.0.103"}) // Added .103, Dropped .104 + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102", "172.17.0.103"}) // Added .103, Dropped .104 err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -288,7 +288,7 @@ func TestEgressIP(t *testing.T) { } // Drop remote node EgressIP - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) // Dropped .101 + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) // Dropped .101 err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -299,7 +299,7 @@ func TestEgressIP(t *testing.T) { } // Drop local node EgressIP - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.102"}) // Dropped .103 + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.102"}) // Dropped .103 err = assertNetlinkChange(eip, "release 172.17.0.103") if err != nil { t.Fatalf("%v", err) @@ -310,7 +310,7 @@ func TestEgressIP(t *testing.T) { } // Add them back, swapped - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100", "172.17.0.103"}) // Added .103 + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100", "172.17.0.103"}) // Added .103 err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -320,7 +320,7 @@ func TestEgressIP(t *testing.T) { t.Fatalf("%v", err) } - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101", "172.17.0.102"}) // Added .101 + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101", "172.17.0.102"}) // Added .101 err = assertNetlinkChange(eip, "claim 172.17.0.101") if err != nil { t.Fatalf("%v", err) @@ -335,7 +335,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { eip, flows := setupEgressIPWatcher(t) eip.updateNamespaceEgress(42, []string{"172.17.0.100"}) - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) err := assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"}, ) @@ -343,18 +343,15 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { t.Fatalf("%v", err) } - // Prepending a second, unavailable, namespace egress IP should block the working - // one, because we still only support using the first EgressIP + // Prepending a second, unavailable, namespace egress IP should have no effect eip.updateNamespaceEgress(42, []string{"172.17.0.101", "172.17.0.100"}) - err = assertOVSChanges(eip, &flows, - egressOVSChange{vnid: 42, egress: Dropped}, - ) + err = assertNoOVSChanges(eip, &flows) if err != nil { t.Fatalf("%v", err) } - // Assigning that IP to a node should now make it work - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101"}) + // Now assigning that IP to a node should switch OVS to use that since it's first in the list + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101"}) err = assertNetlinkChange(eip, "claim 172.17.0.101") if err != nil { t.Fatalf("%v", err) @@ -366,9 +363,9 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { t.Fatalf("%v", err) } - // Swapping the order in the NetNamespace should bring the original Egress IP back + // Swapping the order in the NetNamespace should swap back eip.updateNamespaceEgress(42, []string{"172.17.0.100", "172.17.0.101"}) - err = assertNetlinkChange(eip, "release 172.17.0.101") + err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) } @@ -380,8 +377,8 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { } // Removing the inactive egress IP from its node should have no effect - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.200"}) - err = assertNoNetlinkChanges(eip) + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.200"}) + err = assertNetlinkChange(eip, "release 172.17.0.101") if err != nil { t.Fatalf("%v", err) } @@ -393,7 +390,11 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { } // Removing the remaining egress IP should now kill the namespace - eip.updateNodeEgress("172.17.0.3", nil) + eip.updateNodeEgress("172.17.0.3", "", nil) + err = assertNoNetlinkChanges(eip) + if err != nil { + t.Fatalf("%v", err) + } err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Dropped}, ) @@ -402,9 +403,9 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { } // Now add the egress IPs back... - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.101"}) - err = assertNoNetlinkChanges(eip) + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.101"}) + err = assertNetlinkChange(eip, "claim 172.17.0.101") if err != nil { t.Fatalf("%v", err) } @@ -444,7 +445,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { } eip.updateNamespaceEgress(44, []string{"172.17.0.101"}) - err = assertNoNetlinkChanges(eip) + err = assertNetlinkChange(eip, "release 172.17.0.101") if err != nil { t.Fatalf("%v", err) } @@ -457,7 +458,7 @@ func TestMultipleNamespaceEgressIPs(t *testing.T) { } eip.deleteNamespaceEgress(44) - err = assertNoNetlinkChanges(eip) + err = assertNetlinkChange(eip, "claim 172.17.0.101") if err != nil { t.Fatalf("%v", err) } @@ -474,7 +475,7 @@ func TestNodeIPAsEgressIP(t *testing.T) { eip, flows := setupEgressIPWatcher(t) // Trying to assign node IP as egress IP should fail. (It will log an error but this test doesn't notice that.) - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.4", "172.17.0.102"}) + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.4", "172.17.0.102"}) err := assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -489,7 +490,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) { eip, flows := setupEgressIPWatcher(t) eip.updateNamespaceEgress(42, []string{"172.17.0.100"}) - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) err := assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"}) if err != nil { t.Fatalf("%v", err) @@ -498,7 +499,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) { // Adding the Egress IP to another node should not work and should cause the // namespace to start dropping traffic. (And in particular, even though we're // adding the Egress IP to the local node, there should not be a netlink change.) - eip.updateNodeEgress("172.17.0.4", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.4", "", []string{"172.17.0.100"}) err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -509,7 +510,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) { } // Removing the duplicate node egressIP should restore traffic to the broken namespace - eip.updateNodeEgress("172.17.0.4", []string{}) + eip.updateNodeEgress("172.17.0.4", "", []string{}) err = assertNoNetlinkChanges(eip) if err != nil { t.Fatalf("%v", err) @@ -520,7 +521,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) { } // As above, but with a remote node IP - eip.updateNodeEgress("172.17.0.5", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.5", "", []string{"172.17.0.100"}) err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Dropped}) if err != nil { t.Fatalf("%v", err) @@ -542,7 +543,7 @@ func TestDuplicateNodeEgressIPs(t *testing.T) { // Removing the original egress node should result in the "duplicate" egress node // now being used. - eip.updateNodeEgress("172.17.0.3", []string{}) + eip.updateNodeEgress("172.17.0.3", "", []string{}) err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.5"}) if err != nil { t.Fatalf("%v", err) @@ -553,7 +554,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) { eip, flows := setupEgressIPWatcher(t) eip.updateNamespaceEgress(42, []string{"172.17.0.100"}) - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) err := assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Remote, remote: "172.17.0.3"}) if err != nil { t.Fatalf("%v", err) @@ -595,7 +596,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) { // cause the rules to get deleted and then added back in the opposite order, // which assertNoOVSChanges() would complain about, so we have to use // assertOVSChanges() instead. - eip.updateNodeEgress("172.17.0.3", []string{}) + eip.updateNodeEgress("172.17.0.3", "", []string{}) err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Dropped}, egressOVSChange{vnid: 43, egress: Dropped}, @@ -604,7 +605,7 @@ func TestDuplicateNamespaceEgressIPs(t *testing.T) { t.Fatalf("%v", err) } - eip.updateNodeEgress("172.17.0.3", []string{"172.17.0.100"}) + eip.updateNodeEgress("172.17.0.3", "", []string{"172.17.0.100"}) err = assertOVSChanges(eip, &flows, egressOVSChange{vnid: 42, egress: Dropped}, egressOVSChange{vnid: 43, egress: Dropped}, diff --git a/pkg/network/node/vxlan_monitor.go b/pkg/network/node/vxlan_monitor.go new file mode 100644 index 000000000000..7f9efb876393 --- /dev/null +++ b/pkg/network/node/vxlan_monitor.go @@ -0,0 +1,265 @@ +package node + +import ( + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/glog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilwait "k8s.io/apimachinery/pkg/util/wait" + + "github.com/openshift/origin/pkg/util/ovs" +) + +// egressVXLANMonitor monitors the health of automatic egress IPs by periodically checking +// "ovs-ofctl dump-flows" output and noticing if the number of packets sent over VXLAN to +// an egress node has increased, but the number of packets received over VXLAN from the +// node hasn't. If that happens, it marks the node as being offline. +// +// Specifically, evm.poll() is called every defaultPollInterval and calls evm.check() to +// check the packet counts. If any nodes are seen to be potentially-offline, then +// evm.poll() will call evm.check() again after repollInterval, up to maxRetries more +// times. If the incoming packet count still hasn't increased at that point, then the node +// is considered to be offline. (The retries are needed because it's possible that we +// polled just a few milliseconds after some packets went out, in which case we obviously +// need to give the remote node more time to respond before declaring it offline.) +// +// When the monitor decides a node has gone offline, it alerts its owner via the updates +// channel, and then starts periodically pinging the node's SDN IP address, until the +// incoming packet count increases, at which point it marks the node online again. +// +// The fact that we (normally) use pod-to-egress traffic to do the monitoring rather than +// actively pinging the nodes means that if an egress node falls over while no one is +// using an egress IP on it, we won't notice the problem until someone does try to use it. +// So, eg, the first pod created in a namespace might spend several seconds trying to talk +// to a dead egress node before falling over to its backup egress IP. +type egressVXLANMonitor struct { + sync.Mutex + + ovsif ovs.Interface + updates chan<- *egressVXLANNode + pollInterval time.Duration + + monitorNodes map[string]*egressVXLANNode + stop chan struct{} +} + +type egressVXLANNode struct { + nodeIP string + sdnIP string + offline bool + + in uint64 + out uint64 + + retries int +} + +const ( + // See egressVXLANMonitor docs above for information about these + defaultPollInterval = 5 * time.Second + repollInterval = time.Second + maxRetries = 2 +) + +func newEgressVXLANMonitor(ovsif ovs.Interface, updates chan<- *egressVXLANNode) *egressVXLANMonitor { + return &egressVXLANMonitor{ + ovsif: ovsif, + updates: updates, + pollInterval: defaultPollInterval, + monitorNodes: make(map[string]*egressVXLANNode), + } +} + +func (evm *egressVXLANMonitor) AddNode(nodeIP, sdnIP string) { + evm.Lock() + defer evm.Unlock() + + if evm.monitorNodes[nodeIP] != nil { + return + } + glog.V(4).Infof("Monitoring node %s (SDN IP %s)", nodeIP, sdnIP) + + evm.monitorNodes[nodeIP] = &egressVXLANNode{ + nodeIP: nodeIP, + sdnIP: sdnIP, + } + if len(evm.monitorNodes) == 1 && evm.pollInterval != 0 { + evm.stop = make(chan struct{}) + go utilwait.PollUntil(evm.pollInterval, evm.poll, evm.stop) + } +} + +func (evm *egressVXLANMonitor) RemoveNode(nodeIP string) { + evm.Lock() + defer evm.Unlock() + + if evm.monitorNodes[nodeIP] == nil { + return + } + glog.V(4).Infof("Unmonitoring node %s", nodeIP) + + delete(evm.monitorNodes, nodeIP) + if len(evm.monitorNodes) == 0 && evm.stop != nil { + close(evm.stop) + evm.stop = nil + } +} + +func parseNPackets(of *ovs.OvsFlow) (uint64, error) { + str, _ := of.FindField("n_packets") + if str == nil { + return 0, fmt.Errorf("no packet count") + } + nPackets, err := strconv.ParseUint(str.Value, 10, 64) + if err != nil { + return 0, fmt.Errorf("bad packet count: %v", err) + } + return nPackets, nil +} + +// Assumes the mutex is held +func (evm *egressVXLANMonitor) check(retryOnly bool) bool { + inFlows, err := evm.ovsif.DumpFlows("table=10") + if err != nil { + utilruntime.HandleError(err) + return false + } + outFlows, err := evm.ovsif.DumpFlows("table=100") + if err != nil { + utilruntime.HandleError(err) + return false + } + + inTraffic := make(map[string]uint64) + for _, flow := range inFlows { + parsed, err := ovs.ParseFlow(ovs.ParseForDump, flow) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Error parsing VXLAN input flow: %v", err)) + continue + } + tunSrc, _ := parsed.FindField("tun_src") + if tunSrc == nil { + continue + } + if evm.monitorNodes[tunSrc.Value] == nil { + continue + } + nPackets, err := parseNPackets(parsed) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Could not parse %q: %v", flow, err)) + continue + } + inTraffic[tunSrc.Value] = nPackets + } + + outTraffic := make(map[string]uint64) + for _, flow := range outFlows { + parsed, err := ovs.ParseFlow(ovs.ParseForDump, flow) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Error parsing VXLAN output flow: %v", err)) + continue + } + tunDst := "" + for _, act := range parsed.Actions { + if act.Name == "set_field" && strings.HasSuffix(act.Value, "->tun_dst") { + tunDst = strings.TrimSuffix(act.Value, "->tun_dst") + break + } + } + if tunDst == "" { + continue + } + if evm.monitorNodes[tunDst] == nil { + continue + } + nPackets, err := parseNPackets(parsed) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Could not parse %q: %v", flow, err)) + continue + } + outTraffic[tunDst] = nPackets + } + + retry := false + for _, node := range evm.monitorNodes { + if retryOnly && node.retries == 0 { + continue + } + + in := inTraffic[node.nodeIP] + out := outTraffic[node.nodeIP] + + // If `in` was missing from the OVS output then `out` should be missing + // too, so both variables will be 0 and we won't end up doing anything + // below. If `out` is missing but `in` isn't then that means we know about + // the node but aren't currently routing egress traffic to it. If it is + // currently marked offline, then we'll keep monitoring for it to come + // back online (by watching `in`) but if it is currently marked online + // then we *won't* notice if it goes offline. + // + // Note also that the code doesn't have to worry about n_packets + // overflowing and rolling over; the worst that can happen is that if + // `out` rolls over at the same time as the node goes offline then we + // won't notice the node being offline until the next poll. + + if node.offline { + if in > node.in { + glog.Infof("Node %s is back online", node.nodeIP) + node.offline = false + evm.updates <- node + } else if node.sdnIP != "" { + go ping(node.sdnIP) + } + } else { + if out > node.out && in == node.in { + node.retries++ + if node.retries > maxRetries { + glog.Warningf("Node %s is offline", node.nodeIP) + node.retries = 0 + node.offline = true + evm.updates <- node + } else { + glog.V(2).Infof("Node %s may be offline... retrying", node.nodeIP) + retry = true + continue + } + } + } + + node.in = in + node.out = out + } + + return retry +} + +func (evm *egressVXLANMonitor) poll() (bool, error) { + evm.Lock() + defer evm.Unlock() + + retry := evm.check(false) + for retry { + time.Sleep(repollInterval) + retry = evm.check(true) + } + return false, nil +} + +// ping a node by trying to open a TCP connection to the "discard" service (port 9). If +// the node is still offline, the attempt will time out with no response. But if the node +// has come back then we'll get a TCP RST (indicating that the node is not listening on +// that port), or possibly an ACK (if for some strange reason it *was* listening); either +// way, the input n_packets counter will increase by 1, causing the next poll to mark the +// node as being back online. +func ping(ip string) { + conn, _ := net.DialTimeout("tcp", ip+":9", defaultPollInterval) + if conn != nil { + conn.Close() + } +} diff --git a/pkg/network/node/vxlan_monitor_test.go b/pkg/network/node/vxlan_monitor_test.go new file mode 100644 index 000000000000..a25199305b80 --- /dev/null +++ b/pkg/network/node/vxlan_monitor_test.go @@ -0,0 +1,153 @@ +package node + +import ( + "testing" + + "github.com/openshift/origin/pkg/util/ovs" +) + +func setPacketCounts(ovsif ovs.Interface, nodeIP string, sent, received int64) { + otx := ovsif.NewTransaction() + otx.DeleteFlows("table=10, tun_src=%s", nodeIP) + otx.DeleteFlows("table=100, dummy=%s", nodeIP) + if received >= 0 { + otx.AddFlow("table=10, n_packets=%d, tun_src=%s, actions=goto_table:30", received, nodeIP) + } + if sent >= 0 { + otx.AddFlow("table=100, n_packets=%d, dummy=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", sent, nodeIP, nodeIP) + } + err := otx.Commit() + if err != nil { + panic("can't happen: " + err.Error()) + } +} + +func peekUpdate(updates chan *egressVXLANNode) *egressVXLANNode { + select { + case update := <-updates: + return update + default: + return nil + } +} + +func TestEgressVXLANMonitor(t *testing.T) { + ovsif := ovs.NewFake(Br0) + ovsif.AddBridge() + setPacketCounts(ovsif, "192.168.1.1", 0, 0) + setPacketCounts(ovsif, "192.168.1.2", -1, 0) + setPacketCounts(ovsif, "192.168.1.3", 0, 0) + setPacketCounts(ovsif, "192.168.1.4", -1, 0) + setPacketCounts(ovsif, "192.168.1.5", 0, 0) + + updates := make(chan *egressVXLANNode, 10) + evm := newEgressVXLANMonitor(ovsif, updates) + evm.pollInterval = 0 + + evm.AddNode("192.168.1.1", "") + evm.AddNode("192.168.1.3", "") + evm.AddNode("192.168.1.5", "") + + // Everything should be fine at startup + retry := evm.check(false) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Initial check showed updated node %#v", update) + } + if retry { + t.Fatalf("Initial check requested retry") + } + + // Send and receive some traffic + setPacketCounts(ovsif, "192.168.1.1", 10, 10) + setPacketCounts(ovsif, "192.168.1.2", -1, 20) + setPacketCounts(ovsif, "192.168.1.3", 10, 30) + setPacketCounts(ovsif, "192.168.1.4", -1, 40) + setPacketCounts(ovsif, "192.168.1.5", 70, 50) + + retry = evm.check(false) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if retry { + t.Fatalf("Check erroneously requested retry") + } + + // Send some more traffic to .3 but don't receive any. Receive some more + // traffic from 5 but don't send any. + setPacketCounts(ovsif, "192.168.1.3", 20, 30) + setPacketCounts(ovsif, "192.168.1.5", 70, 100) + + retry = evm.check(false) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if !retry { + t.Fatalf("Check erroneously failed to request retry") + } + retry = evm.check(true) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if !retry { + t.Fatalf("Check erroneously failed to request retry") + } + + // Since we're only doing retries, it should ignore this + setPacketCounts(ovsif, "192.168.1.1", 20, 10) + + retry = evm.check(true) + if update := peekUpdate(updates); update == nil { + t.Fatalf("Check failed to fail after maxRetries") + } else if update.nodeIP != "192.168.1.3" || !update.offline { + t.Fatalf("Unexpected update node %#v", update) + } + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed additional updated node %#v", update) + } + if retry { + t.Fatalf("Check erroneously requested retry") + } + + setPacketCounts(ovsif, "192.168.1.1", 20, 20) + retry = evm.check(false) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if retry { + t.Fatalf("Check erroneously requested retry") + } + + // Have .1 lag a bit but then catch up + setPacketCounts(ovsif, "192.168.1.1", 30, 20) + retry = evm.check(false) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if !retry { + t.Fatalf("Check erroneously failed to request retry") + } + + setPacketCounts(ovsif, "192.168.1.1", 30, 30) + retry = evm.check(true) + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed updated node %#v", update) + } + if retry { + t.Fatalf("Check erroneously requested retry") + } + + // Now bring back the failed node + setPacketCounts(ovsif, "192.168.1.3", 50, 40) + retry = evm.check(false) + if update := peekUpdate(updates); update == nil { + t.Fatalf("Node failed to recover") + } else if update.nodeIP != "192.168.1.3" || update.offline { + t.Fatalf("Unexpected updated node %#v", update) + } + if update := peekUpdate(updates); update != nil { + t.Fatalf("Check erroneously showed additional updated node %#v", update) + } + if retry { + t.Fatalf("Check erroneously requested retry") + } +} From 0439a830dd9633aa373037f7f12180a7eb0d3c88 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 1 May 2018 12:23:20 +0200 Subject: [PATCH 3/3] Log ovs dump-flows at 5, not 4 Most dump-flows calls are part of health checks and don't normally need to be logged about unless they fail. --- pkg/util/ovs/ovs.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/util/ovs/ovs.go b/pkg/util/ovs/ovs.go index b3b4d6abbece..a48df4377cb5 100644 --- a/pkg/util/ovs/ovs.go +++ b/pkg/util/ovs/ovs.go @@ -139,8 +139,12 @@ func New(execer exec.Interface, bridge string, minVersion string) (Interface, er } func (ovsif *ovsExec) execWithStdin(cmd string, stdinArgs []string, args ...string) (string, error) { + logLevel := glog.Level(4) switch cmd { case OVS_OFCTL: + if args[0] == "dump-flows" { + logLevel = glog.Level(5) + } args = append([]string{"-O", "OpenFlow13"}, args...) case OVS_VSCTL: args = append([]string{"--timeout=30"}, args...) @@ -152,9 +156,9 @@ func (ovsif *ovsExec) execWithStdin(cmd string, stdinArgs []string, args ...stri stdin := bytes.NewBufferString(stdinString) kcmd.SetStdin(stdin) - glog.V(4).Infof("Executing: %s %s <<\n%s", cmd, strings.Join(args, " "), stdinString) + glog.V(logLevel).Infof("Executing: %s %s <<\n%s", cmd, strings.Join(args, " "), stdinString) } else { - glog.V(4).Infof("Executing: %s %s", cmd, strings.Join(args, " ")) + glog.V(logLevel).Infof("Executing: %s %s", cmd, strings.Join(args, " ")) } output, err := kcmd.CombinedOutput()