Skip to content

Commit

Permalink
Track node online/offline state in master egress IP allocator
Browse files Browse the repository at this point in the history
and don't allocate egress IPs to offline nodes
  • Loading branch information
danwinship committed Jul 31, 2018
1 parent 9d1e4a1 commit 0a40be0
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 7 deletions.
27 changes: 20 additions & 7 deletions pkg/network/common/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,32 @@ func (eit *EgressIPTracker) SetNodeOffline(nodeIP string, offline bool) {
eit.egressIPChanged(eg)
}
}

if node.requestedCIDRs.Len() != 0 {
eit.updateEgressCIDRs = true
}

eit.syncEgressIPs()
}

func (eit *EgressIPTracker) lookupNodeIP(ip string) string {
eit.Lock()
defer eit.Unlock()

if node := eit.nodesByNodeIP[ip]; node != nil {
return node.sdnIP
}
return ip
}

// Ping a node and return whether or not it is online. We do this by trying to open a TCP
// connection to the "discard" service (port 9); if the node is offline, the attempt will
// time out with no response (and we will return false). If the node is online then we
// presumably will get a "connection refused" error; the code below assumes that anything
// other than timing out indicates that the node is online.
func (eit *EgressIPTracker) Ping(ip string, timeout time.Duration) bool {
eit.Lock()
defer eit.Unlock()

// If the caller used a public node IP, replace it with the SDN IP
if node := eit.nodesByNodeIP[ip]; node != nil {
ip = node.sdnIP
}
ip = eit.lookupNodeIP(ip)

conn, err := net.DialTimeout("tcp", ip+":9", timeout)
if conn != nil {
Expand All @@ -477,6 +487,9 @@ func (eit *EgressIPTracker) findEgressIPAllocation(ip net.IP, allocation map[str
otherNodes := false

for _, node := range eit.nodes {
if node.offline {
continue
}
egressIPs, exists := allocation[node.nodeName]
if !exists {
continue
Expand Down Expand Up @@ -524,7 +537,7 @@ func (eit *EgressIPTracker) ReallocateEgressIPs() map[string][]string {
break
}
}
if found {
if found && !node.offline {
allocation[node.nodeName] = append(allocation[node.nodeName], egressIP)
}
// (We set alreadyAllocated even if the egressIP will be removed from
Expand Down
109 changes: 109 additions & 0 deletions pkg/network/common/egressip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (w *testEIPWatcher) assertNoChanges() error {
return w.assertChanges()
}

func (w *testEIPWatcher) flushChanges() {
w.changes = []string{}
}

func (w *testEIPWatcher) assertUpdateEgressCIDRsNotification() error {
for _, change := range w.changes {
if change == "update egress CIDRs" {
w.flushChanges()
return nil
}
}
return fmt.Errorf("expected change \"update egress CIDRs\", got %#v", w.changes)
}

func setupEgressIPTracker(t *testing.T) (*EgressIPTracker, *testEIPWatcher) {
watcher := &testEIPWatcher{}
return NewEgressIPTracker(watcher), watcher
Expand Down Expand Up @@ -998,3 +1012,98 @@ func TestEgressNodeRenumbering(t *testing.T) {
t.Fatalf("%v", err)
}
}

func TestEgressCIDRAllocationOffline(t *testing.T) {
eit, w := setupEgressIPTracker(t)

// Create nodes...
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.3",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24", "172.17.1.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.4",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.0.0/24"},
})
updateHostSubnetEgress(eit, &networkapi.HostSubnet{
HostIP: "172.17.0.5",
EgressIPs: []string{},
EgressCIDRs: []string{"172.17.1.0/24"},
})

// Create namespaces
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 100,
EgressIPs: []string{"172.17.0.100"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 101,
EgressIPs: []string{"172.17.0.101"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 102,
EgressIPs: []string{"172.17.0.102"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 200,
EgressIPs: []string{"172.17.1.200"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 201,
EgressIPs: []string{"172.17.1.201"},
})
updateNetNamespaceEgress(eit, &networkapi.NetNamespace{
NetID: 202,
EgressIPs: []string{"172.17.1.202"},
})

// In a perfect world, we'd get 2 IPs on each node, but depending on processing
// order, this isn't guaranteed. Eg, if the three 172.17.0.x nodes get processed
// first, we could get two of them on node-3 and one on node-4. Then the first two
// 172.17.1.x nodes get assigned to node-5, and the last one could go to either
// node-3 or node-5. Regardless of order, node-3 is guaranteed to get at least
// two nodes since there's no way either node-4 or node-5 could be assigned a
// third IP if node-3 still only had one.
allocation := eit.ReallocateEgressIPs()
node3ips := allocation["node-3"]
node4ips := allocation["node-4"]
node5ips := allocation["node-5"]
if len(node3ips) < 2 || len(node4ips) == 0 || len(node5ips) == 0 ||
len(node3ips)+len(node4ips)+len(node5ips) != 6 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

w.flushChanges()

// Now take node-3 offline
eit.SetNodeOffline("172.17.0.3", true)
err := w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// First reallocation should empty out node-3
allocation = eit.ReallocateEgressIPs()
if node3ips, ok := allocation["node-3"]; !ok || len(node3ips) != 0 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)

err = w.assertUpdateEgressCIDRsNotification()
if err != nil {
t.Fatalf("%v", err)
}

// Next reallocation should reassign egress IPs to node-4 and node-5
allocation = eit.ReallocateEgressIPs()
node3ips = allocation["node-3"]
node4ips = allocation["node-4"]
node5ips = allocation["node-5"]
if len(node3ips) != 0 || len(node4ips) != 3 || len(node5ips) != 3 {
t.Fatalf("Bad IP allocation: %#v", allocation)
}
updateAllocations(eit, allocation)
}
91 changes: 91 additions & 0 deletions pkg/network/master/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"github.com/golang/glog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -24,6 +26,15 @@ type egressIPManager struct {

updatePending bool
updatedAgain bool

monitorNodes map[string]*egressNode
stop chan struct{}
}

type egressNode struct {
ip string
offline bool
retries int
}

func newEgressIPManager() *egressIPManager {
Expand Down Expand Up @@ -76,13 +87,20 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
// we won't process that until this reallocation is complete.

allocation := eim.tracker.ReallocateEgressIPs()
monitorNodes := make(map[string]*egressNode, len(allocation))
for nodeName, egressIPs := range allocation {
resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
hs, err := eim.hostSubnetInformer.Lister().Get(nodeName)
if err != nil {
return err
}

if node := eim.monitorNodes[hs.HostIP]; node != nil {
monitorNodes[hs.HostIP] = node
} else {
monitorNodes[hs.HostIP] = &egressNode{ip: hs.HostIP}
}

oldIPs := sets.NewString(hs.EgressIPs...)
newIPs := sets.NewString(egressIPs...)
if !oldIPs.Equal(newIPs) {
Expand All @@ -96,9 +114,82 @@ func (eim *egressIPManager) maybeDoUpdateEgressCIDRs() (bool, error) {
}
}

eim.monitorNodes = monitorNodes
if len(monitorNodes) > 0 {
if eim.stop == nil {
eim.stop = make(chan struct{})
go eim.poll(eim.stop)
}
} else {
if eim.stop != nil {
close(eim.stop)
eim.stop = nil
}
}

return true, nil
}

const (
pollInterval = 5 * time.Second
repollInterval = time.Second
maxRetries = 2
)

func (eim *egressIPManager) poll(stop chan struct{}) {
retry := false
for {
select {
case <-stop:
return
default:
}

start := time.Now()
retry := eim.check(retry)
if !retry {
// If less than pollInterval has passed since start, then sleep until it has
time.Sleep(start.Add(pollInterval).Sub(time.Now()))
}
}
}

func (eim *egressIPManager) check(retrying bool) bool {
var timeout time.Duration
if retrying {
timeout = repollInterval
} else {
timeout = pollInterval
}

needRetry := false
for _, node := range eim.monitorNodes {
if retrying && node.retries == 0 {
continue
}

online := eim.tracker.Ping(node.ip, timeout)
if node.offline && online {
glog.Infof("Node %s is back online", node.ip)
node.offline = false
eim.tracker.SetNodeOffline(node.ip, false)
} else if !node.offline && !online {
node.retries++
if node.retries > maxRetries {
glog.Warningf("Node %s is offline", node.ip)
node.retries = 0
node.offline = true
eim.tracker.SetNodeOffline(node.ip, true)
} else {
glog.V(2).Infof("Node %s may be offline... retrying", node.ip)
needRetry = true
}
}
}

return needRetry
}

func (eim *egressIPManager) ClaimEgressIP(vnid uint32, egressIP, nodeIP string) {
}

Expand Down

0 comments on commit 0a40be0

Please sign in to comment.