Skip to content

Commit

Permalink
Merge pull request #12650 from danwinship/multicast-fixes-and-annotation
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Jan 28, 2017
2 parents 6ceb412 + 8ba9e66 commit 4bc0be6
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 339 deletions.
3 changes: 3 additions & 0 deletions pkg/sdn/api/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const (
// HostSubnet annotations. (Note: should be "hostsubnet.network.openshift.io/", but the incorrect name is now part of the API.)
AssignHostSubnetAnnotation = "pod.network.openshift.io/assign-subnet"
FixedVNIDHostAnnotation = "pod.network.openshift.io/fixed-vnid-host"

// NetNamespace annotations
MulticastEnabledAnnotation = "netnamespace.network.openshift.io/multicast-enabled"
)

func IsOpenShiftNetworkPlugin(pluginName string) bool {
Expand Down
10 changes: 8 additions & 2 deletions pkg/sdn/plugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,16 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
// eg, "table=100, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop
otx.AddFlow("table=100, priority=0, actions=output:2")

// Table 110: multicast delivery from local pods to the VXLAN
// Table 110: outbound multicast filtering, updated by updateLocalMulticastFlows() in pod.go
// eg, "table=110, priority=100, reg0=${tenant_id}, actions=goto_table:111
otx.AddFlow("table=110, priority=0, actions=drop")

// Table 120: multicast delivery to local pods (either from VXLAN or local pods)
// Table 111: multicast delivery from local pods to the VXLAN; only one rule, updated by updateVXLANMulticastRules() in subnets.go
// eg, "table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:${remote_node_ip_1}->tun_dst,output:1,set_field:${remote_node_ip_2}->tun_dst,output:1,goto_table:120"
otx.AddFlow("table=111, priority=0, actions=drop")

// Table 120: multicast delivery to local pods (either from VXLAN or local pods); updated by updateLocalMulticastFlows() in pod.go
// eg, "table=120, priority=100, reg0=${tenant_id}, actions=output:${ovs_port_1},output:${ovs_port_2}"
otx.AddFlow("table=120, priority=0, actions=drop")

err = otx.EndTransaction()
Expand Down
55 changes: 34 additions & 21 deletions pkg/sdn/plugin/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,41 @@ func (mp *multiTenantPlugin) updatePodNetwork(namespace string, oldNetID, netID
services = &kapi.ServiceList{}
}

movedVNIDRefs := 0
if oldNetID != netID {
movedVNIDRefs := 0

// Update OF rules for the existing/old pods in the namespace
for _, pod := range pods {
err = mp.node.UpdatePod(pod)
if err == nil {
movedVNIDRefs++
} else {
glog.Errorf("Could not update pod %q in namespace %q: %v", pod.Name, namespace, err)
}
}

// Update OF rules for the old services in the namespace
for _, svc := range services.Items {
if !kapi.IsServiceIPSet(&svc) {
continue
}

// Update OF rules for the existing/old pods in the namespace
for _, pod := range pods {
err = mp.node.UpdatePod(pod)
if err == nil {
mp.node.DeleteServiceRules(&svc)
mp.node.AddServiceRules(&svc, netID)
movedVNIDRefs++
} else {
glog.Errorf("Could not update pod %q in namespace %q: %v", pod.Name, namespace, err)
}
}

// Update OF rules for the old services in the namespace
for _, svc := range services.Items {
if !kapi.IsServiceIPSet(&svc) {
continue
if movedVNIDRefs > 0 {
mp.moveVNIDRefs(movedVNIDRefs, oldNetID, netID)
}

mp.node.DeleteServiceRules(&svc)
mp.node.AddServiceRules(&svc, netID)
movedVNIDRefs++
}

if movedVNIDRefs > 0 {
mp.moveVNIDRefs(movedVNIDRefs, oldNetID, netID)
// Update namespace references in egress firewall rules
mp.node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID)
}

// Update namespace references in egress firewall rules
mp.node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID)
// Update local multicast rules
mp.node.podManager.UpdateLocalMulticastRules(oldNetID)
mp.node.podManager.UpdateLocalMulticastRules(netID)
}

func (mp *multiTenantPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
Expand All @@ -115,6 +121,13 @@ func (mp *multiTenantPlugin) GetNamespaces(vnid uint32) []string {
return mp.vnids.GetNamespaces(vnid)
}

func (mp *multiTenantPlugin) GetMulticastEnabled(vnid uint32) bool {
if vnid == osapi.GlobalVNID {
return false
}
return mp.vnids.GetMulticastEnabled(vnid)
}

func (mp *multiTenantPlugin) RefVNID(vnid uint32) {
if vnid == 0 {
return
Expand Down
10 changes: 9 additions & 1 deletion pkg/sdn/plugin/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
}

func (np *networkPolicyPlugin) UpdateNetNamespace(netns *osapi.NetNamespace, oldNetID uint32) {
glog.Warning("Got UpdateNetNamespace for namespace %s (%d) while using %s plugin", netns.NetName, netns.NetID, osapi.NetworkPolicyPluginName)
if netns.NetID != oldNetID {
glog.Warning("Got VNID change for namespace %s while using %s plugin", netns.NetName, osapi.NetworkPolicyPluginName)
}

np.node.podManager.UpdateLocalMulticastRules(netns.NetID)
}

func (np *networkPolicyPlugin) DeleteNetNamespace(netns *osapi.NetNamespace) {
Expand All @@ -178,6 +182,10 @@ func (np *networkPolicyPlugin) GetNamespaces(vnid uint32) []string {
return np.vnids.GetNamespaces(vnid)
}

func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
return np.vnids.GetMulticastEnabled(vnid)
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
inUse := npns.refs > 0
if !inUse && !npns.inUse {
Expand Down
1 change: 1 addition & 0 deletions pkg/sdn/plugin/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type osdnPolicy interface {

GetVNID(namespace string) (uint32, error)
GetNamespaces(vnid uint32) []string
GetMulticastEnabled(vnid uint32) bool

RefVNID(vnid uint32)
UnrefVNID(vnid uint32)
Expand Down
185 changes: 91 additions & 94 deletions pkg/sdn/plugin/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"sort"
"sync"

"github.com/openshift/origin/pkg/sdn/plugin/cniserver"
"github.com/openshift/origin/pkg/util/netutils"
Expand All @@ -21,7 +22,7 @@ import (

type podHandler interface {
setup(req *cniserver.PodRequest) (*cnitypes.Result, *runningPod, error)
update(req *cniserver.PodRequest) (*runningPod, error)
update(req *cniserver.PodRequest) (uint32, error)
teardown(req *cniserver.PodRequest) error
}

Expand All @@ -38,7 +39,8 @@ type podManager struct {
// Request queue for pod operations incoming from the CNIServer
requests chan (*cniserver.PodRequest)
// Tracks pod :: IP address for hostport handling
runningPods map[string]*runningPod
runningPods map[string]*runningPod
runningPodsLock sync.Mutex

// Live pod setup/teardown stuff not used in testing code
kClient *kclientset.Clientset
Expand Down Expand Up @@ -99,7 +101,7 @@ func getIPAMConfig(clusterNetwork *net.IPNet, localSubnet string) ([]byte, error
IPAM *hostLocalIPAM `json:"ipam"`
}

mcaddr := net.ParseIP("224.0.0.0")
_, mcnet, _ := net.ParseCIDR("224.0.0.0/3")
return json.Marshal(&cniNetworkConfig{
Name: "openshift-sdn",
Type: "openshift-sdn",
Expand All @@ -111,19 +113,20 @@ func getIPAMConfig(clusterNetwork *net.IPNet, localSubnet string) ([]byte, error
},
Routes: []cnitypes.Route{
{
// Default route
Dst: net.IPNet{
IP: net.IPv4zero,
Mask: net.IPMask(net.IPv4zero),
},
GW: netutils.GenerateDefaultGateway(nodeNet),
},
{Dst: *clusterNetwork},
{
// Cluster network
Dst: *clusterNetwork,
},
{
// Multicast
Dst: net.IPNet{
IP: mcaddr,
Mask: net.IPMask(mcaddr),
},
Dst: *mcnet,
},
},
},
Expand Down Expand Up @@ -179,112 +182,106 @@ func (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, er
return result.Response, result.Err
}

type runningPodsSlice []*runningPod

func (l runningPodsSlice) Len() int { return len(l) }
func (l runningPodsSlice) Less(i, j int) bool { return l[i].ofport < l[j].ofport }
func (l runningPodsSlice) Swap(i, j int) { l[i], l[j] = l[j], l[i] }

// FIXME: instead of calculating all this ourselves, figure out a way to pass
// the old VNID through the Update() call (or get it from somewhere else).
func updateMulticastFlows(runningPods map[string]*runningPod, ovs *ovs.Interface, podKey string, changedPod *runningPod) error {
// FIXME: prevents TestPodUpdate() from crashing. (We separately test this function anyway.)
if ovs == nil {
return nil
}

// Build map of pods by their VNID, excluding the changed pod
podsByVNID := make(map[uint32]runningPodsSlice)
for key, runningPod := range runningPods {
if key != podKey {
podsByVNID[runningPod.vnid] = append(podsByVNID[runningPod.vnid], runningPod)
func localMulticastOutputs(runningPods map[string]*runningPod, vnid uint32) string {
var ofports []int
for _, pod := range runningPods {
if pod.vnid == vnid {
ofports = append(ofports, pod.ofport)
}
}
if len(ofports) == 0 {
return ""
}

// Figure out what two VNIDs changed so we can update only those two flows
changedVNIDs := make([]uint32, 0)
oldPod, exists := runningPods[podKey]
if changedPod != nil {
podsByVNID[changedPod.vnid] = append(podsByVNID[changedPod.vnid], changedPod)
changedVNIDs = append(changedVNIDs, changedPod.vnid)
if exists {
// VNID changed
changedVNIDs = append(changedVNIDs, oldPod.vnid)
sort.Ints(ofports)
outputs := ""
for _, ofport := range ofports {
if len(outputs) > 0 {
outputs += ","
}
} else if exists {
// Pod deleted
changedVNIDs = append(changedVNIDs, oldPod.vnid)
outputs += fmt.Sprintf("output:%d", ofport)
}
return outputs
}

if len(changedVNIDs) == 0 {
// Shouldn't happen, but whatever
return fmt.Errorf("Multicast update requested but not required!")
func (m *podManager) updateLocalMulticastRulesWithLock(vnid uint32) {
var outputs string
otx := m.ovs.NewTransaction()
if m.policy.GetMulticastEnabled(vnid) {
outputs = localMulticastOutputs(m.runningPods, vnid)
otx.AddFlow("table=110, reg0=%d, actions=goto_table:111", vnid)
} else {
otx.DeleteFlows("table=110, reg0=%d", vnid)
}

otx := ovs.NewTransaction()
for _, vnid := range changedVNIDs {
// Sort pod array to ensure consistent ordering for testcases and readability
pods := podsByVNID[vnid]
sort.Sort(pods)

// build up list of ports on this VNID
outputs := ""
for _, pod := range pods {
if len(outputs) > 0 {
outputs += ","
}
outputs += fmt.Sprintf("output:%d", pod.ofport)
}

// Update or delete the flows for the vnid
if len(outputs) > 0 {
otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, outputs)
} else {
otx.DeleteFlows("table=120, reg0=%d", vnid)
}
if len(outputs) > 0 {
otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, outputs)
} else {
otx.DeleteFlows("table=120, reg0=%d", vnid)
}
if err := otx.EndTransaction(); err != nil {
glog.Errorf("Error updating OVS multicast flows for VNID %d: %v", vnid, err)
}
return otx.EndTransaction()
}

// Update multicast OVS rules for the given vnid
func (m *podManager) UpdateLocalMulticastRules(vnid uint32) {
m.runningPodsLock.Lock()
defer m.runningPodsLock.Unlock()
m.updateLocalMulticastRulesWithLock(vnid)
}

// Process all CNI requests from the request queue serially. Our OVS interaction
// and scripts currently cannot run in parallel, and doing so greatly complicates
// setup/teardown logic
func (m *podManager) processCNIRequests() {
for request := range m.requests {
pk := getPodKey(request)

var pod *runningPod
var ipamResult *cnitypes.Result

glog.V(5).Infof("Processing pod network request %v", request)
result := &cniserver.PodResult{}
switch request.Command {
case cniserver.CNI_ADD:
ipamResult, pod, result.Err = m.podHandler.setup(request)
if ipamResult != nil {
result.Response, result.Err = json.Marshal(ipamResult)
result := m.processRequest(request)
glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err)
request.Result <- result
}
panic("stopped processing CNI pod requests!")
}

func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.PodResult {
m.runningPodsLock.Lock()
defer m.runningPodsLock.Unlock()

pk := getPodKey(request)
result := &cniserver.PodResult{}
switch request.Command {
case cniserver.CNI_ADD:
ipamResult, runningPod, err := m.podHandler.setup(request)
if ipamResult != nil {
result.Response, err = json.Marshal(ipamResult)
if result.Err == nil {
m.runningPods[pk] = runningPod
if m.ovs != nil {
m.updateLocalMulticastRulesWithLock(runningPod.vnid)
}
}
case cniserver.CNI_UPDATE:
pod, result.Err = m.podHandler.update(request)
case cniserver.CNI_DEL:
result.Err = m.podHandler.teardown(request)
default:
result.Err = fmt.Errorf("unhandled CNI request %v", request.Command)
}

if result.Err == nil {
if err := updateMulticastFlows(m.runningPods, m.ovs, pk, pod); err != nil {
glog.Warningf("Failed to update multicast flows: %v", err)
if err != nil {
result.Err = err
}
case cniserver.CNI_UPDATE:
vnid, err := m.podHandler.update(request)
if err == nil {
if runningPod, exists := m.runningPods[pk]; exists {
runningPod.vnid = vnid
}
if pod != nil {
m.runningPods[pk] = pod
} else {
delete(m.runningPods, pk)
}
result.Err = err
case cniserver.CNI_DEL:
if runningPod, exists := m.runningPods[pk]; exists {
delete(m.runningPods, pk)
if m.ovs != nil {
m.updateLocalMulticastRulesWithLock(runningPod.vnid)
}
}

glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err)
request.Result <- result
result.Err = m.podHandler.teardown(request)
default:
result.Err = fmt.Errorf("unhandled CNI request %v", request.Command)
}
panic("stopped processing CNI pod requests!")
return result
}
Loading

0 comments on commit 4bc0be6

Please sign in to comment.