Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA for fully-automatic egress IPs #20485

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/network/common/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,22 +450,32 @@ func (eit *EgressIPTracker) SetNodeOffline(nodeIP string, offline bool) {
eit.egressIPChanged(eg)
}
}

if node.requestedCIDRs.Len() != 0 {
eit.updateEgressCIDRs = true
}

eit.syncEgressIPs()
}

func (eit *EgressIPTracker) lookupNodeIP(ip string) string {
eit.Lock()
defer eit.Unlock()

if node := eit.nodesByNodeIP[ip]; node != nil {
return node.sdnIP
}
return ip
}

// Ping a node and return whether or not it is online. We do this by trying to open a TCP
// connection to the "discard" service (port 9); if the node is offline, the attempt will
// time out with no response (and we will return false). If the node is online then we
// presumably will get a "connection refused" error; the code below assumes that anything
// other than timing out indicates that the node is online.
func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool {
eit.Lock()
defer eit.Unlock()

// If the caller used a public node IP, replace it with the SDN IP
if node := eit.nodesByNodeIP[ip]; node != nil {
ip = node.sdnIP
}
ip = eit.lookupNodeIP(ip)

conn, err := net.DialTimeout("tcp", ip+":9", timeout)
if conn != nil {
Expand All @@ -485,6 +495,9 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str
otherNodes := false

for _, node := range eit.nodes {
if node.offline {
continue
}
egressIPs, exists := allocation[node.nodeName]
if !exists {
continue
Expand Down Expand Up @@ -532,7 +545,7 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
break
}
}
if found {
if found && !node.offline {
allocation[node.nodeName] = append(allocation[node.nodeName], egressIP)
}
// (We set alreadyAllocated even if the egressIP will be removed from
Expand Down
109 changes: 109 additions & 0 deletions pkg/network/common/egressip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (w *testEIPWatcher) assertNoChanges() error {
return w.assertChanges()
}

func (w *testEIPWatcher) flushChanges() {
w.changes = []string{}
}

func (w *testEIPWatcher) assertUpdateEgressCIDRsNotification() error {
for _, change := range w.changes {
if change == "update egress CIDRs" {
w.flushChanges()
return nil
}
}
return fmt.Errorf("expected change \"update egress CIDRs\", got %#v", w.changes)
}

func setupEgressIPTracker(t *testing.T) (*EgressIPTracker, *testEIPWatcher) {
watcher := &testEIPWatcher{}
return NewEgressIPTracker(watcher), watcher
Expand Down Expand Up @@ -1028,3 +1042,98 @@ func TestEgressNodeRenumbering(t *testing.T) {
t.Fatalf("%v", err)
}
}

func TestEgressCIDRAllocationOffline(t *testing.T) {
eit, w := setupEgressIPTracker(t)

// Create nodes...
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.3",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24", "172.17.1.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.4",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.5",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.1.0/24"},
})

// Create namespaces
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 100,
EgressIPs: []string{"172.17.0.100"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 101,
EgressIPs: []string{"172.17.0.101"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 102,
EgressIPs: []string{"172.17.0.102"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 200,
EgressIPs: []string{"172.17.1.200"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 201,
EgressIPs: []string{"172.17.1.201"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 202,
EgressIPs: []string{"172.17.1.202"},
})

// In a perfect world, we'd get 2 IPs on each node, but depending on processing
// order, this isn't guaranteed. Eg, if the three 172.17.0.x IPs get processed
// first, we could get two of them on node-3 and one on node-4. Then the first two
// 172.17.1.x IPs get assigned to node-5, and the last one could go to either
// node-3 or node-5. Regardless of order, node-3 is guaranteed to get at least
// two IPs since there's no way either node-4 or node-5 could be assigned a
// third IP if node-3 still only had one.
allocation := eit.ReallocateEgressIPs()
node3ips := allocation["node-3"]
node4ips := allocation["node-4"]
node5ips := allocation["node-5"]
if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 ||
len(node3ips)+len(node4ips)+len(node5ips) != 6 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

w.flushChanges()

// Now take node-3 offline
eit.SetNodeOffline("172.17.0.3", true)
err := w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// First reallocation should empty out node-3
allocation = eit.ReallocateEgressIPs()
if node3ips, ok := allocation["node-3"]; !ok || len(node3ips) != 0 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

err = w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// Next reallocation should reassign egress IPs to node-4 and node-5
allocation = eit.ReallocateEgressIPs()
node3ips = allocation["node-3"]
node4ips = allocation["node-4"]
node5ips = allocation["node-5"]
if len(node3ips) != 0 || len(node4ips) != 3 || len(node5ips) != 3 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)
}
91 changes: 91 additions & 0 deletions pkg/network/master/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"github.com/golang/glog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -24,6 +26,15 @@ type egressIPManager struct {

updatePending bool
updatedAgain bool

monitorNodes map[string]*egressNode
stop chan struct{}
}

type egressNode struct {
ip string
offline bool
retries int
}

func newEgressIPManager() *egressIPManager {
Expand Down Expand Up @@ -76,13 +87,20 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
// we won't process that until this reallocation is complete.

allocation := eim.tracker.ReallocateEgressIPs()
monitorNodes := make(map[string]*egressNode, len(allocation))
for nodeName, egressIPs := range allocation {
resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
hs, err := eim.hostSubnetInformer.Lister().Get(nodeName)
if err != nil {
return err
}

if node := eim.monitorNodes[hs.HostIP]; node != nil {
monitorNodes[hs.HostIP] = node
} else {
monitorNodes[hs.HostIP] = &egressNode{ip: hs.HostIP}
}

oldIPs := sets.NewString(hs.EgressIPs...)
newIPs := sets.NewString(egressIPs...)
if !oldIPs.Equal(newIPs) {
Expand All @@ -96,9 +114,82 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
}
}

eim.monitorNodes = monitorNodes
if len(monitorNodes) > 0 {
if eim.stop == nil {
eim.stop = make(chan struct{})
go eim.poll(eim.stop)
}
} else {
if eim.stop != nil {
close(eim.stop)
eim.stop = nil
}
}

return true, nil
}

const (
pollInterval = 5 * time.Second
repollInterval = time.Second
maxRetries = 2
)

func (eim *egressIPManager) poll(stop chan struct{}) {
retry := false
for {
select {
case <-stop:
return
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more idomatic to do something like:

var timeout time.Duration
select {
case <-stop:
	return
case <-time.After(timeout):
	start := time.Now()
	retry := eim.check(retry)
	if !retry {
		//  If less than pollInterval has passed since start, then sleep until it has
		timeout = time.Until(start.Add(pollInterval))
	}
}

}

start := time.Now()
retry := eim.check(retry)
if !retry {
// If less than pollInterval has passed since start, then sleep until it has
time.Sleep(start.Add(pollInterval).Sub(time.Now()))
}
}
}

func (eim *egressIPManager) check(retrying bool) bool {
var timeout time.Duration
if retrying {
timeout = repollInterval
} else {
timeout = pollInterval
}

needRetry := false
for _, node := range eim.monitorNodes {
if retrying && node.retries == 0 {
continue
}

online := eim.tracker.Ping(node.ip, timeout)
if node.offline && online {
glog.Infof("Node %s is back online", node.ip)
node.offline = false
eim.tracker.SetNodeOffline(node.ip, false)
} else if !node.offline && !online {
node.retries++
if node.retries > maxRetries {
glog.Warningf("Node %s is offline", node.ip)
node.retries = 0
node.offline = true
eim.tracker.SetNodeOffline(node.ip, true)
} else {
glog.V(2).Infof("Node %s may be offline... retrying", node.ip)
needRetry = true
}
}
}

return needRetry
}

func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) {
}

Expand Down