Skip to content

Commit

Permalink
Merge pull request #20620 from danwinship/auto-egress-ip-ha
Browse files Browse the repository at this point in the history
Basic high availability for auto egress IPs
  • Loading branch information
openshift-merge-robot authored Aug 14, 2018
2 parents 7eee6f8 + 544a776 commit 1584b2d
Show file tree
Hide file tree
Showing 5 changed files with 553 additions and 84 deletions.
134 changes: 90 additions & 44 deletions pkg/network/node/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import (
networkapi "github.com/openshift/origin/pkg/network/apis/network"
"github.com/openshift/origin/pkg/network/common"
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
"github.com/openshift/origin/pkg/util/netutils"

"github.com/vishvananda/netlink"
)

type nodeEgress struct {
nodeIP string
sdnIP string
requestedIPs sets.String
offline bool
}

type namespaceEgress struct {
Expand All @@ -48,13 +51,14 @@ type egressIPWatcher struct {

networkInformers networkinformers.SharedInformerFactory
iptables *NodeIPTables
vxlanMonitor *egressVXLANMonitor

nodesByNodeIP map[string]*nodeEgress
namespacesByVNID map[uint32]*namespaceEgress
egressIPs map[string]*egressIPInfo

changedEgressIPs []*egressIPInfo
changedNamespaces []*namespaceEgress
changedEgressIPs map[*egressIPInfo]bool
changedNamespaces map[*namespaceEgress]bool

localEgressLink netlink.Link
localEgressNet *net.IPNet
Expand All @@ -70,6 +74,9 @@ func newEgressIPWatcher(oc *ovsController, localIP string, masqueradeBit *int32)
nodesByNodeIP: make(map[string]*nodeEgress),
namespacesByVNID: make(map[uint32]*namespaceEgress),
egressIPs: make(map[string]*egressIPInfo),

changedEgressIPs: make(map[*egressIPInfo]bool),
changedNamespaces: make(map[*namespaceEgress]bool),
}
if masqueradeBit != nil {
eip.masqueradeBit = 1 << uint32(*masqueradeBit)
Expand All @@ -87,6 +94,10 @@ func (eip *egressIPWatcher) Start(networkInformers networkinformers.SharedInform
eip.networkInformers = networkInformers
eip.iptables = iptables

updates := make(chan *egressVXLANNode)
eip.vxlanMonitor = newEgressVXLANMonitor(eip.oc.ovs, updates)
go eip.watchVXLAN(updates)

eip.watchHostSubnets()
eip.watchNetNamespaces()
return nil
Expand Down Expand Up @@ -114,19 +125,15 @@ func (eip *egressIPWatcher) ensureEgressIPInfo(egressIP string) *egressIPInfo {
}

func (eip *egressIPWatcher) egressIPChanged(eg *egressIPInfo) {
eip.changedEgressIPs = append(eip.changedEgressIPs, eg)
eip.changedEgressIPs[eg] = true
for _, ns := range eg.namespaces {
eip.changedNamespaces = append(eip.changedNamespaces, ns)
eip.changedNamespaces[ns] = true
}
}

func (eip *egressIPWatcher) addNode(egressIP string, node *nodeEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.nodes) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", eg.ip, node.nodeIP, eg.nodes[0].nodeIP))
}
eg.nodes = append(eg.nodes, node)

eip.egressIPChanged(eg)
}

Expand All @@ -147,11 +154,7 @@ func (eip *egressIPWatcher) deleteNode(egressIP string, node *nodeEgress) {

func (eip *egressIPWatcher) addNamespace(egressIP string, ns *namespaceEgress) {
eg := eip.ensureEgressIPInfo(egressIP)
if len(eg.namespaces) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple namespaces claiming EgressIP %q (NetIDs %d, %d)", eg.ip, ns.vnid, eg.namespaces[0].vnid))
}
eg.namespaces = append(eg.namespaces, ns)

eip.egressIPChanged(eg)
}

Expand Down Expand Up @@ -179,17 +182,23 @@ func (eip *egressIPWatcher) handleAddOrUpdateHostSubnet(obj, _ interface{}, even
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name)

eip.updateNodeEgress(hs.HostIP, hs.EgressIPs)
_, cidr, err := net.ParseCIDR(hs.Subnet)
if err != nil {
utilruntime.HandleError(fmt.Errorf("could not parse HostSubnet %q CIDR: %v", hs.Name, err))
}
sdnIP := netutils.GenerateDefaultGateway(cidr).String()

eip.updateNodeEgress(hs.HostIP, sdnIP, hs.EgressIPs)
}

func (eip *egressIPWatcher) handleDeleteHostSubnet(obj interface{}) {
hs := obj.(*networkapi.HostSubnet)
glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name)

eip.updateNodeEgress(hs.HostIP, nil)
eip.updateNodeEgress(hs.HostIP, "", nil)
}

func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []string) {
func (eip *egressIPWatcher) updateNodeEgress(nodeIP, sdnIP string, nodeEgressIPs []string) {
eip.Lock()
defer eip.Unlock()

Expand All @@ -200,11 +209,18 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri
}
node = &nodeEgress{
nodeIP: nodeIP,
sdnIP: sdnIP,
requestedIPs: sets.NewString(),
}
eip.nodesByNodeIP[nodeIP] = node
if eip.vxlanMonitor != nil && node.nodeIP != eip.localIP {
eip.vxlanMonitor.AddNode(node.nodeIP, node.sdnIP)
}
} else if len(nodeEgressIPs) == 0 {
delete(eip.nodesByNodeIP, nodeIP)
if eip.vxlanMonitor != nil {
eip.vxlanMonitor.RemoveNode(node.nodeIP)
}
}
oldRequestedIPs := node.requestedIPs
node.requestedIPs = sets.NewString(nodeEgressIPs...)
Expand All @@ -229,11 +245,6 @@ func (eip *egressIPWatcher) handleAddOrUpdateNetNamespace(obj, _ interface{}, ev
netns := obj.(*networkapi.NetNamespace)
glog.V(5).Infof("Watch %s event for NetNamespace %q", eventType, netns.Name)

if len(netns.EgressIPs) != 0 {
if len(netns.EgressIPs) > 1 {
glog.Warningf("Ignoring extra EgressIPs (%v) in NetNamespace %q", netns.EgressIPs[1:], netns.Name)
}
}
eip.updateNamespaceEgress(netns.NetID, netns.EgressIPs)
}

Expand Down Expand Up @@ -284,37 +295,46 @@ func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) {
eip.updateNamespaceEgress(vnid, nil)
}

func (eip *egressIPWatcher) syncEgressIPs() {
changedEgressIPs := make(map[*egressIPInfo]bool)
for _, eg := range eip.changedEgressIPs {
changedEgressIPs[eg] = true
func (eip *egressIPWatcher) egressIPActive(eg *egressIPInfo) (bool, error) {
if len(eg.nodes) == 0 || len(eg.namespaces) == 0 {
return false, nil
}
eip.changedEgressIPs = eip.changedEgressIPs[:0]

changedNamespaces := make(map[*namespaceEgress]bool)
for _, ns := range eip.changedNamespaces {
changedNamespaces[ns] = true
if len(eg.nodes) > 1 {
return false, fmt.Errorf("Multiple nodes (%s, %s) claiming EgressIP %s", eg.nodes[0].nodeIP, eg.nodes[1].nodeIP, eg.ip)
}
eip.changedNamespaces = eip.changedNamespaces[:0]
if len(eg.namespaces) > 1 {
return false, fmt.Errorf("Multiple namespaces (%d, %d) claiming EgressIP %s", eg.namespaces[0].vnid, eg.namespaces[1].vnid, eg.ip)
}
for _, ip := range eg.namespaces[0].requestedIPs {
eg2 := eip.egressIPs[ip]
if eg2 != eg && len(eg2.nodes) == 1 && eg2.nodes[0] == eg.nodes[0] {
return false, fmt.Errorf("Multiple EgressIPs (%s, %s) for VNID %d on node %s", eg.ip, eg2.ip, eg.namespaces[0].vnid, eg.nodes[0].nodeIP)
}
}
return true, nil
}

for eg := range changedEgressIPs {
eip.syncEgressNodeState(eg)
func (eip *egressIPWatcher) syncEgressIPs() {
for eg := range eip.changedEgressIPs {
active, err := eip.egressIPActive(eg)
if err != nil {
utilruntime.HandleError(err)
}
eip.syncEgressNodeState(eg, active)
}
eip.changedEgressIPs = make(map[*egressIPInfo]bool)

for ns := range changedNamespaces {
for ns := range eip.changedNamespaces {
err := eip.syncEgressNamespaceState(ns)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules for VNID %d: %v", ns.vnid, err))
}
}
eip.changedNamespaces = make(map[*namespaceEgress]bool)
}

func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
// The egressIPInfo should have an assigned node IP if and only if the
// egress IP is active (ie, it is assigned to exactly 1 node and exactly
// 1 namespace, and it's the first one for its namespace).
egressIPActive := (len(eg.nodes) == 1 && len(eg.namespaces) == 1 && eg.ip == eg.namespaces[0].requestedIPs[0])
if egressIPActive && eg.assignedNodeIP != eg.nodes[0].nodeIP {
func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo, active bool) {
if active && eg.assignedNodeIP != eg.nodes[0].nodeIP {
glog.V(4).Infof("Assigning egress IP %s to node %s", eg.ip, eg.nodes[0].nodeIP)
eg.assignedNodeIP = eg.nodes[0].nodeIP
eg.assignedIPTablesMark = getMarkForVNID(eg.namespaces[0].vnid, eip.masqueradeBit)
Expand All @@ -324,7 +344,7 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
eg.assignedNodeIP = ""
}
}
} else if !egressIPActive && eg.assignedNodeIP != "" {
} else if !active && eg.assignedNodeIP != "" {
glog.V(4).Infof("Removing egress IP %s from node %s", eg.ip, eg.assignedNodeIP)
if eg.assignedNodeIP == eip.localIP {
if err := eip.releaseEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
Expand All @@ -333,8 +353,6 @@ func (eip *egressIPWatcher) syncEgressNodeState(eg *egressIPInfo) {
}
eg.assignedNodeIP = ""
eg.assignedIPTablesMark = ""
} else if !egressIPActive {
glog.V(4).Infof("Egress IP %s is not assignable (%d namespaces, %d nodes)", eg.ip, len(eg.namespaces), len(eg.nodes))
}
}

Expand All @@ -344,7 +362,7 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error
}

var active *egressIPInfo
for i, ip := range ns.requestedIPs {
for _, ip := range ns.requestedIPs {
eg := eip.egressIPs[ip]
if eg == nil {
continue
Expand All @@ -354,9 +372,11 @@ func (eip *egressIPWatcher) syncEgressNamespaceState(ns *namespaceEgress) error
glog.V(4).Infof("VNID %d gets no egress due to multiply-assigned egress IP %s", ns.vnid, eg.ip)
break
}
if active == nil && i == 0 {
if active == nil {
if eg.assignedNodeIP == "" {
glog.V(4).Infof("VNID %d cannot use unassigned egress IP %s", ns.vnid, eg.ip)
} else if len(ns.requestedIPs) > 1 && eg.nodes[0].offline {
glog.V(4).Infof("VNID %d cannot use egress IP %s on offline node %s", ns.vnid, eg.ip, eg.assignedNodeIP)
} else {
active = eg
}
Expand Down Expand Up @@ -436,3 +456,29 @@ func (eip *egressIPWatcher) releaseEgressIP(egressIP, mark string) error {

return nil
}

func (eip *egressIPWatcher) watchVXLAN(updates chan *egressVXLANNode) {
for node := range updates {
eip.updateNode(node.nodeIP, node.offline)
}
}

func (eip *egressIPWatcher) updateNode(nodeIP string, offline bool) {
eip.Lock()
defer eip.Unlock()

node := eip.nodesByNodeIP[nodeIP]
if node == nil {
eip.vxlanMonitor.RemoveNode(nodeIP)
return
}

node.offline = offline
for _, ip := range node.requestedIPs.UnsortedList() {
eg := eip.egressIPs[ip]
if eg != nil {
eip.egressIPChanged(eg)
}
}
eip.syncEgressIPs()
}
Loading

0 comments on commit 1584b2d

Please sign in to comment.