Skip to content

Commit

Permalink
Basic high-availability for auto egress IPs
Browse files Browse the repository at this point in the history
If a namespace has multiple egress IPs, monitor egress traffic and
switch to an alternate egress IP if the currently-selected one appears
dead.
  • Loading branch information
danwinship committed Jun 7, 2018
1 parent 0900e92 commit 6044355
Show file tree
Hide file tree
Showing 4 changed files with 538 additions and 66 deletions.
109 changes: 81 additions & 28 deletions pkg/network/node/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import (
networkapi "github.com/openshift/origin/pkg/network/apis/network"
"github.com/openshift/origin/pkg/network/common"
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
"github.com/openshift/origin/pkg/util/netutils"

"github.com/vishvananda/netlink"
)

type nodeEgress struct {
nodeIP string
sdnIP string
requestedIPs sets.String
offline bool
}

type namespaceEgress struct {
Expand All @@ -48,6 +51,7 @@ type egressIPWatcher struct {

networkInformers networkinformers.SharedInformerFactory
iptables *NodeIPTables
vxlanMonitor *egressVXLANMonitor

nodesByNodeIP map[string]*nodeEgress
namespacesByVNID map[uint32]*namespaceEgress
Expand Down Expand Up @@ -90,6 +94,10 @@ func (eip *egressIPWatcher) Start(networkInformers networkinformers.SharedInform
eip.networkInformers = networkInformers
eip.iptables = iptables

updates := make(chan *egressVXLANNode)
eip.vxlanMonitor = newEgressVXLANMonitor(eip.oc.ovs, updates)
go eip.watchVXLAN(updates)

eip.watchHostSubnets()
eip.watchNetNamespaces()
return nil
Expand Down Expand Up @@ -125,11 +133,7 @@ func (eip *egressIPWatcher) egressIPChanged(eg *egressIPInfo) {

func (eip *egressIPWatcher) addNode(egressIP string, node *nodeEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.nodes) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", eg.ip, node.nodeIP, eg.nodes[0].nodeIP))
}
eg.nodes = append(eg.nodes, node)

eip.egressIPChanged(eg)
}

Expand All @@ -150,11 +154,7 @@ func (eip *egressIPWatcher) deleteNode(egressIP string, node *nodeEgress) {

func (eip *egressIPWatcher) addNamespace(egressIP string, ns *namespaceEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.namespaces) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple namespaces claiming EgressIP %q (NetIDs %d, %d)", eg.ip, ns.vnid, eg.namespaces[0].vnid))
}
eg.namespaces = append(eg.namespaces, ns)

eip.egressIPChanged(eg)
}

Expand Down Expand Up @@ -182,17 +182,23 @@ func (eip *egressIPWatcher) handleAddOrUpdateHostSubnet(obj, _ interface{}, even
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name)

eip.updateNodeEgress(hs.HostIP, hs.EgressIPs)
_, cidr, err := net.ParseCIDR(hs.Subnet)
if err != nil {
utilruntime.HandleError(fmt.Errorf("could not parse HostSubnet %q CIDR: %v", hs.Name, err))
}
sdnIP := netutils.GenerateDefaultGateway(cidr).String()

eip.updateNodeEgress(hs.HostIP, sdnIP, hs.EgressIPs)
}

func (eip *egressIPWatcher) handleDeleteHostSubnet(obj interface{}) {
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name)

eip.updateNodeEgress(hs.HostIP, nil)
eip.updateNodeEgress(hs.HostIP, "", nil)
}

func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []string) {
func (eip *egressIPWatcher) updateNodeEgress(nodeIP, sdnIP string, nodeEgressIPs []string) {
eip.Lock()
defer eip.Unlock()

Expand All @@ -203,11 +209,18 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri
}
node = &nodeEgress{
nodeIP: nodeIP,
sdnIP: sdnIP,
requestedIPs: sets.NewString(),
}
eip.nodesByNodeIP[nodeIP] = node
if eip.vxlanMonitor != nil && node.nodeIP != eip.localIP {
eip.vxlanMonitor.AddNode(node.nodeIP, node.sdnIP)
}
} else if len(nodeEgressIPs) == 0 {
delete(eip.nodesByNodeIP, nodeIP)
if eip.vxlanMonitor != nil {
eip.vxlanMonitor.RemoveNode(node.nodeIP)
}
}
oldRequestedIPs := node.requestedIPs
node.requestedIPs = sets.NewString(nodeEgressIPs...)
Expand All @@ -232,11 +245,6 @@ func (eip *egressIPWatcher) handleAddOrUpdateNetNamespace(obj, _ interface{}, ev
netns := obj.(*networkapi.NetNamespace)
glog.V(5).Infof("Watch %s event for NetNamespace %q", eventType, netns.Name)

if len(netns.EgressIPs) != 0 {
if len(netns.EgressIPs) > 1 {
glog.Warningf("Ignoring extra EgressIPs (%v) in NetNamespace %q", netns.EgressIPs[1:], netns.Name)
}
}
eip.updateNamespaceEgress(netns.NetID, netns.EgressIPs)
}

Expand Down Expand Up @@ -287,9 +295,32 @@ func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) {
eip.updateNamespaceEgress(vnid, nil)
}

func (eip *egressIPWatcher) egressIPActive(eg *egressIPInfo) (bool, error) {
if len(eg.nodes) == 0 || len(eg.namespaces) == 0 {
return false, nil
}
if len(eg.nodes) > 1 {
return false, fmt.Errorf("Multiple nodes (%s, %s) claiming EgressIP %s", eg.nodes[0].nodeIP, eg.nodes[1].nodeIP, eg.ip)
}
if len(eg.namespaces) > 1 {
return false, fmt.Errorf("Multiple namespaces (%d, %d) claiming EgressIP %s", eg.namespaces[0].vnid, eg.namespaces[1].vnid, eg.ip)
}
for _, ip := range eg.namespaces[0].requestedIPs {
eg2 := eip.egressIPs[ip]
if eg2 != eg && len(eg2.nodes) == 1 && eg2.nodes[0] == eg.nodes[0] {
return false, fmt.Errorf("Multiple EgressIPs (%s, %s) for VNID %d on node %s", eg.ip, eg2.ip, eg.namespaces[0].vnid, eg.nodes[0].nodeIP)
}
}
return true, nil
}

func (eip *egressIPWatcher) syncEgressIPs() {
for eg := range eip.changedEgressIPs {
eip.syncEgressNodeState(eg)
active, err := eip.egressIPActive(eg)
if err != nil {
utilruntime.HandleError(err)
}
eip.syncEgressNodeState(eg, active)
}
eip.changedEgressIPs = make(map[*egressIPInfo]bool)

Expand All @@ -302,12 +333,8 @@ func (eip *egressIPWatcher) syncEgressIPs() {
eip.changedNamespaces = make(map[*namespaceEgress]bool)
}

func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
// The egressIPInfo should have an assigned node IP if and only if the
// egress IP is active (ie, it is assigned to exactly 1 node and exactly
// 1 namespace, and it's the first one for its namespace).
egressIPActive := (len(eg.nodes) == 1 && len(eg.namespaces) == 1 && eg.ip == eg.namespaces[0].requestedIPs[0])
if egressIPActive && eg.assignedNodeIP != eg.nodes[0].nodeIP {
func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo, active bool) {
if active && eg.assignedNodeIP != eg.nodes[0].nodeIP {
glog.V(4).Infof("Assigning egress IP %s to node %s", eg.ip, eg.nodes[0].nodeIP)
eg.assignedNodeIP = eg.nodes[0].nodeIP
eg.assignedIPTablesMark = getMarkForVNID(eg.namespaces[0].vnid, eip.masqueradeBit)
Expand All @@ -317,7 +344,7 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
eg.assignedNodeIP = ""
}
}
} else if !egressIPActive && eg.assignedNodeIP != "" {
} else if !active && eg.assignedNodeIP != "" {
glog.V(4).Infof("Removing egress IP %s from node %s", eg.ip, eg.assignedNodeIP)
if eg.assignedNodeIP == eip.localIP {
if err := eip.releaseEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
Expand All @@ -326,8 +353,6 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
}
eg.assignedNodeIP = ""
eg.assignedIPTablesMark = ""
} else if !egressIPActive {
glog.V(4).Infof("Egress IP %s is not assignable (%d namespaces, %d nodes)", eg.ip, len(eg.namespaces), len(eg.nodes))
}
}

Expand All @@ -337,7 +362,7 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error
}

var active *egressIPInfo
for i, ip := range ns.requestedIPs {
for _, ip := range ns.requestedIPs {
eg := eip.egressIPs[ip]
if eg == nil {
continue
Expand All @@ -347,9 +372,11 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error
glog.V(4).Infof("VNID %d gets no egress due to multiply-assigned egress IP %s", ns.vnid, eg.ip)
break
}
if active == nil && i == 0 {
if active == nil {
if eg.assignedNodeIP == "" {
glog.V(4).Infof("VNID %d cannot use unassigned egress IP %s", ns.vnid, eg.ip)
} else if len(ns.requestedIPs) > 1 && eg.nodes[0].offline {
glog.V(4).Infof("VNID %d cannot use egress IP %s on offline node %s", ns.vnid, eg.ip, eg.assignedNodeIP)
} else {
active = eg
}
Expand Down Expand Up @@ -429,3 +456,29 @@ func (eip *egressIPWatcher) releaseEgressIP(egressIP, mark string) error {

return nil
}

func (eip *egressIPWatcher) watchVXLAN(updates chan *egressVXLANNode) {
for node := range updates {
eip.updateNode(node.nodeIP, node.offline)
}
}

func (eip *egressIPWatcher) updateNode(nodeIP string, offline bool) {
eip.Lock()
defer eip.Unlock()

node := eip.nodesByNodeIP[nodeIP]
if node == nil {
eip.vxlanMonitor.RemoveNode(nodeIP)
return
}

node.offline = offline
for _, ip := range node.requestedIPs.UnsortedList() {
eg := eip.egressIPs[ip]
if eg != nil {
eip.egressIPChanged(eg)
}
}
eip.syncEgressIPs()
}
Loading

0 comments on commit 6044355

Please sign in to comment.