diff --git a/plugins/osdn/common.go b/plugins/osdn/common.go index b9df94e9aa6d..8a4b936df845 100644 --- a/plugins/osdn/common.go +++ b/plugins/osdn/common.go @@ -42,12 +42,10 @@ type OsdnController struct { localSubnet *osapi.HostSubnet HostName string subnetAllocator *netutils.SubnetAllocator - sig chan struct{} podNetworkReady chan struct{} VNIDMap map[string]uint netIDManager *netutils.NetIDAllocator adminNamespaces []string - services map[string]*kapi.Service } // Called by plug factory functions to initialize the generic plugin instance @@ -84,10 +82,8 @@ func (oc *OsdnController) BaseInit(registry *Registry, pluginHooks PluginHooks, oc.localIP = selfIP oc.HostName = hostname oc.VNIDMap = make(map[string]uint) - oc.sig = make(chan struct{}) oc.podNetworkReady = make(chan struct{}) oc.adminNamespaces = make([]string, 0) - oc.services = make(map[string]*kapi.Service) return nil } @@ -119,7 +115,7 @@ func (oc *OsdnController) validateNetworkConfig(clusterNetwork, serviceNetwork * } // Ensure each host subnet is within the cluster network - subnets, _, err := oc.Registry.GetSubnets() + subnets, err := oc.Registry.GetSubnets() if err != nil { return fmt.Errorf("Error in initializing/fetching subnets: %v", err) } @@ -135,7 +131,7 @@ func (oc *OsdnController) validateNetworkConfig(clusterNetwork, serviceNetwork * } // Ensure each service is within the services network - services, _, err := oc.Registry.GetServices() + services, err := oc.Registry.GetServices() if err != nil { return err } @@ -242,57 +238,6 @@ func (oc *OsdnController) WaitForPodNetworkReady() error { return fmt.Errorf("SDN pod network is not ready(timeout: 2 mins)") } -func (oc *OsdnController) Stop() { - close(oc.sig) -} - -// Wait for ready signal from Watch interface for the given resource -// Closes the ready channel as we don't need it anymore after this point -func waitForWatchReadiness(ready chan bool, resourceName string) { - timeout := time.Minute - select { - case <-ready: - close(ready) - case <-time.After(timeout): - log.Fatalf("Watch for resource %s is not ready(timeout: %v)", resourceName, timeout) - } - return -} - -type watchWatcher func(oc *OsdnController, ready chan<- bool, start <-chan string) -type watchGetter func(registry *Registry) (interface{}, string, error) - -// watchAndGetResource will fetch current items in etcd and watch for any new -// changes for the given resource. -// Supported resources: nodes, subnets, namespaces, services, netnamespaces, and pods. -// -// To avoid any potential race conditions during this process, these steps are followed: -// 1. Initiator(master/node): Watch for a resource as an async op, lets say WatchProcess -// 2. WatchProcess: When ready for watching, send ready signal to initiator -// 3. Initiator: Wait for watch resource to be ready -// This is needed as step-1 is an asynchronous operation -// 4. WatchProcess: Collect new changes in the queue but wait for initiator -// to indicate which version to start from -// 5. Initiator: Get existing items with their latest version for the resource -// 6. Initiator: Send version from step-5 to WatchProcess -// 7. WatchProcess: Ignore any items with version <= start version got from initiator on step-6 -// 8. WatchProcess: Handle new changes -func (oc *OsdnController) watchAndGetResource(resourceName string, watcher watchWatcher, getter watchGetter) (interface{}, error) { - ready := make(chan bool) - start := make(chan string) - - go watcher(oc, ready, start) - waitForWatchReadiness(ready, strings.ToLower(resourceName)) - getOutput, version, err := getter(oc.Registry) - if err != nil { - return nil, err - } - - start <- version - - return getOutput, nil -} - type FirewallRule struct { table string chain string diff --git a/plugins/osdn/registry.go b/plugins/osdn/registry.go index 0cc03b4fbb9e..ab71f3e4c14b 100644 --- a/plugins/osdn/registry.go +++ b/plugins/osdn/registry.go @@ -3,21 +3,17 @@ package osdn import ( "fmt" "net" - "strconv" "strings" - "time" log "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" kclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" pconfig "k8s.io/kubernetes/pkg/proxy/config" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/watch" @@ -29,8 +25,8 @@ import ( ) type Registry struct { - oClient osclient.Interface - kClient kclient.Interface + oClient *osclient.Client + kClient *kclient.Client podsByIP map[string]*kapi.Pod serviceNetwork *net.IPNet clusterNetwork *net.IPNet @@ -43,9 +39,8 @@ type Registry struct { type EventType string const ( - Added EventType = "ADDED" - Deleted EventType = "DELETED" - Modified EventType = "MODIFIED" + Added EventType = "ADDED" + Deleted EventType = "DELETED" ) type HostSubnetEvent struct { @@ -81,12 +76,12 @@ func NewRegistry(osClient *osclient.Client, kClient *kclient.Client) *Registry { } } -func (registry *Registry) GetSubnets() ([]osapi.HostSubnet, string, error) { +func (registry *Registry) GetSubnets() ([]osapi.HostSubnet, error) { hostSubnetList, err := registry.oClient.HostSubnets().List(kapi.ListOptions{}) if err != nil { - return nil, "", err + return nil, err } - return hostSubnetList.Items, hostSubnetList.ListMeta.ResourceVersion, nil + return hostSubnetList.Items, nil } func (registry *Registry) GetSubnet(nodeName string) (*osapi.HostSubnet, error) { @@ -109,12 +104,11 @@ func (registry *Registry) CreateSubnet(nodeName, nodeIP, subnetCIDR string) erro return err } -func (registry *Registry) WatchSubnets(receiver chan<- *HostSubnetEvent, ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("HostSubnet", ready, start) +func (registry *Registry) WatchSubnets(receiver chan<- *HostSubnetEvent) error { + eventQueue := registry.runEventQueue("HostSubnets") - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } @@ -129,26 +123,23 @@ func (registry *Registry) WatchSubnets(receiver chan<- *HostSubnetEvent, ready c } } -func (registry *Registry) GetPods() ([]kapi.Pod, string, error) { +func (registry *Registry) PopulatePodsByIP() error { podList, err := registry.kClient.Pods(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { - return nil, "", err + return err } for _, pod := range podList.Items { - if pod.Status.PodIP != "" { - registry.trackPod(&pod) - } + registry.trackPod(&pod) } - return podList.Items, podList.ListMeta.ResourceVersion, nil + return nil } -func (registry *Registry) WatchPods(ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("Pod", ready, start) +func (registry *Registry) WatchPods() error { + eventQueue := registry.runEventQueue("Pods") - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } @@ -203,41 +194,12 @@ func (registry *Registry) GetPod(nodeName, namespace, podName string) (*kapi.Pod return nil, nil } -func (registry *Registry) GetNodes() ([]kapi.Node, string, error) { - nodes, err := registry.kClient.Nodes().List(kapi.ListOptions{}) - if err != nil { - return nil, "", err - } - - return nodes.Items, nodes.ListMeta.ResourceVersion, nil -} - -func (registry *Registry) getNodeAddressMap() (map[types.UID]string, error) { +func (registry *Registry) WatchNodes(receiver chan<- *NodeEvent) error { + eventQueue := registry.runEventQueue("Nodes") nodeAddressMap := map[types.UID]string{} - nodes, err := registry.kClient.Nodes().List(kapi.ListOptions{}) - if err != nil { - return nodeAddressMap, err - } - for _, node := range nodes.Items { - if len(node.Status.Addresses) > 0 { - nodeAddressMap[node.ObjectMeta.UID] = node.Status.Addresses[0].Address - } - } - return nodeAddressMap, nil -} - -func (registry *Registry) WatchNodes(receiver chan<- *NodeEvent, ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("Node", ready, start) - - nodeAddressMap, err := registry.getNodeAddressMap() - if err != nil { - return err - } - - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } @@ -254,16 +216,13 @@ func (registry *Registry) WatchNodes(receiver chan<- *NodeEvent, ready chan<- bo } switch eventType { - case watch.Added: - receiver <- &NodeEvent{Type: Added, Node: node} - nodeAddressMap[node.ObjectMeta.UID] = nodeIP - case watch.Modified: + case watch.Added, watch.Modified: oldNodeIP, ok := nodeAddressMap[node.ObjectMeta.UID] - if ok && oldNodeIP != nodeIP { - // Node Added event will handle update subnet if there is ip mismatch - receiver <- &NodeEvent{Type: Added, Node: node} - nodeAddressMap[node.ObjectMeta.UID] = nodeIP + if ok && (oldNodeIP == nodeIP) { + continue } + receiver <- &NodeEvent{Type: Added, Node: node} + nodeAddressMap[node.ObjectMeta.UID] = nodeIP case watch.Deleted: receiver <- &NodeEvent{Type: Deleted, Node: node} delete(nodeAddressMap, node.ObjectMeta.UID) @@ -342,56 +301,30 @@ func (registry *Registry) GetClusterNetwork() (*net.IPNet, error) { return registry.clusterNetwork, nil } -func (registry *Registry) GetHostSubnetLength() (int, error) { - if err := registry.cacheClusterNetwork(); err != nil { - return -1, err - } - return registry.hostSubnetLength, nil -} - -func (registry *Registry) GetServicesNetwork() (*net.IPNet, error) { - if err := registry.cacheClusterNetwork(); err != nil { - return nil, err - } - return registry.serviceNetwork, nil -} - -func (registry *Registry) GetNamespaces() ([]kapi.Namespace, string, error) { - namespaceList, err := registry.kClient.Namespaces().List(kapi.ListOptions{}) - if err != nil { - return nil, "", err - } - return namespaceList.Items, namespaceList.ListMeta.ResourceVersion, nil -} - -func (registry *Registry) WatchNamespaces(receiver chan<- *NamespaceEvent, ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("Namespace", ready, start) +func (registry *Registry) WatchNamespaces(receiver chan<- *NamespaceEvent) error { + eventQueue := registry.runEventQueue("Namespaces") - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } ns := obj.(*kapi.Namespace) switch eventType { - case watch.Added: + case watch.Added, watch.Modified: receiver <- &NamespaceEvent{Type: Added, Namespace: ns} case watch.Deleted: receiver <- &NamespaceEvent{Type: Deleted, Namespace: ns} - case watch.Modified: - // Ignore, we don't need to update SDN in case of namespace updates } } } -func (registry *Registry) WatchNetNamespaces(receiver chan<- *NetNamespaceEvent, ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("NetNamespace", ready, start) +func (registry *Registry) WatchNetNamespaces(receiver chan<- *NetNamespaceEvent) error { + eventQueue := registry.runEventQueue("NetNamespaces") - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } @@ -406,12 +339,12 @@ func (registry *Registry) WatchNetNamespaces(receiver chan<- *NetNamespaceEvent, } } -func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, string, error) { +func (registry *Registry) GetNetNamespaces() ([]osapi.NetNamespace, error) { netNamespaceList, err := registry.oClient.NetNamespaces().List(kapi.ListOptions{}) if err != nil { - return nil, "", err + return nil, err } - return netNamespaceList.Items, netNamespaceList.ListMeta.ResourceVersion, nil + return netNamespaceList.Items, nil } func (registry *Registry) GetNetNamespace(name string) (*osapi.NetNamespace, error) { @@ -434,18 +367,17 @@ func (registry *Registry) DeleteNetNamespace(name string) error { } func (registry *Registry) GetServicesForNamespace(namespace string) ([]kapi.Service, error) { - services, _, err := registry.getServices(namespace) - return services, err + return registry.getServices(namespace) } -func (registry *Registry) GetServices() ([]kapi.Service, string, error) { +func (registry *Registry) GetServices() ([]kapi.Service, error) { return registry.getServices(kapi.NamespaceAll) } -func (registry *Registry) getServices(namespace string) ([]kapi.Service, string, error) { +func (registry *Registry) getServices(namespace string) ([]kapi.Service, error) { kServList, err := registry.kClient.Services(namespace).List(kapi.ListOptions{}) if err != nil { - return nil, "", err + return nil, err } servList := make([]kapi.Service, 0, len(kServList.Items)) @@ -455,15 +387,14 @@ func (registry *Registry) getServices(namespace string) ([]kapi.Service, string, } servList = append(servList, service) } - return servList, kServList.ListMeta.ResourceVersion, nil + return servList, nil } -func (registry *Registry) WatchServices(receiver chan<- *ServiceEvent, ready chan<- bool, start <-chan string, stop <-chan bool) error { - eventQueue, startVersion := registry.createAndRunEventQueue("Service", ready, start) +func (registry *Registry) WatchServices(receiver chan<- *ServiceEvent) error { + eventQueue := registry.runEventQueue("Services") - checkCondition := true for { - eventType, obj, err := getEvent(eventQueue, startVersion, &checkCondition) + eventType, obj, err := eventQueue.Pop() if err != nil { return err } @@ -475,154 +406,46 @@ func (registry *Registry) WatchServices(receiver chan<- *ServiceEvent, ready cha } switch eventType { - case watch.Added: + case watch.Added, watch.Modified: receiver <- &ServiceEvent{Type: Added, Service: serv} case watch.Deleted: receiver <- &ServiceEvent{Type: Deleted, Service: serv} - case watch.Modified: - receiver <- &ServiceEvent{Type: Modified, Service: serv} } } } // Run event queue for the given resource -func (registry *Registry) runEventQueue(resourceName string) (*oscache.EventQueue, *cache.Reflector) { - eventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) - lw := &cache.ListWatch{} +func (registry *Registry) runEventQueue(resourceName string) *oscache.EventQueue { + var client cache.Getter var expectedType interface{} + switch strings.ToLower(resourceName) { - case "hostsubnet": + case "hostsubnets": expectedType = &osapi.HostSubnet{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.oClient.HostSubnets().List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.oClient.HostSubnets().Watch(options) - } - case "node": + client = registry.oClient + case "netnamespaces": + expectedType = &osapi.NetNamespace{} + client = registry.oClient + case "nodes": expectedType = &kapi.Node{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.kClient.Nodes().List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.kClient.Nodes().Watch(options) - } - case "namespace": + client = registry.kClient + case "namespaces": expectedType = &kapi.Namespace{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.kClient.Namespaces().List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.kClient.Namespaces().Watch(options) - } - case "netnamespace": - expectedType = &osapi.NetNamespace{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.oClient.NetNamespaces().List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.oClient.NetNamespaces().Watch(options) - } - case "service": + client = registry.kClient + case "services": expectedType = &kapi.Service{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.kClient.Services(kapi.NamespaceAll).List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.kClient.Services(kapi.NamespaceAll).Watch(options) - } - case "pod": + client = registry.kClient + case "pods": expectedType = &kapi.Pod{} - lw.ListFunc = func(options kapi.ListOptions) (runtime.Object, error) { - return registry.kClient.Pods(kapi.NamespaceAll).List(options) - } - lw.WatchFunc = func(options kapi.ListOptions) (watch.Interface, error) { - return registry.kClient.Pods(kapi.NamespaceAll).Watch(options) - } + client = registry.kClient default: log.Fatalf("Unknown resource %s during initialization of event queue", resourceName) } - reflector := cache.NewReflector(lw, expectedType, eventQueue, 4*time.Minute) - reflector.Run() - return eventQueue, reflector -} - -// Ensures given event queue is ready for watching new changes -// and unblock other end of the ready channel -func sendWatchReadiness(reflector *cache.Reflector, ready chan<- bool) { - // timeout: 1min - retries := 120 - retryInterval := 500 * time.Millisecond - // Try every retryInterval and bail-out if it exceeds max retries - for i := 0; i < retries; i++ { - // Reflector does list and watch of the resource - // when listing of the resource is done, resourceVersion will be populated - // and the event queue will be ready to watch any new changes - version := reflector.LastSyncResourceVersion() - if len(version) > 0 { - ready <- true - return - } - time.Sleep(retryInterval) - } - log.Fatalf("SDN event queue is not ready for watching new changes(timeout: 1min)") -} - -// Get resource version from start channel -// Watch interface for the resource will process any item after this version -func getStartVersion(start <-chan string, resourceName string) uint64 { - var version uint64 - var err error - - timeout := time.Minute - select { - case rv := <-start: - version, err = strconv.ParseUint(rv, 10, 64) - if err != nil { - log.Fatalf("Invalid start version %s for %s, error: %v", rv, resourceName, err) - } - case <-time.After(timeout): - log.Fatalf("Error fetching resource version for %s (timeout: %v)", resourceName, timeout) - } - return version -} -// createAndRunEventQueue will create and run event queue and also returns start version for watching any new changes -func (registry *Registry) createAndRunEventQueue(resourceName string, ready chan<- bool, start <-chan string) (*oscache.EventQueue, uint64) { - eventQueue, reflector := registry.runEventQueue(resourceName) - sendWatchReadiness(reflector, ready) - startVersion := getStartVersion(start, resourceName) - return eventQueue, startVersion -} - -// getEvent returns next item in the event queue which satisfies item version greater than given start version -// checkCondition is an optimization that ignores version check when it is not needed -func getEvent(eventQueue *oscache.EventQueue, startVersion uint64, checkCondition *bool) (watch.EventType, interface{}, error) { - if *checkCondition { - // Ignore all events with version <= given start version - for { - eventType, obj, err := eventQueue.Pop() - if err != nil { - return watch.Error, nil, err - } - accessor, err := meta.Accessor(obj) - if err != nil { - return watch.Error, nil, err - } - currentVersion, err := strconv.ParseUint(accessor.GetResourceVersion(), 10, 64) - if err != nil { - return watch.Error, nil, err - } - if currentVersion <= startVersion { - log.V(5).Infof("Ignoring %s with version %d, start version: %d", accessor.GetName(), currentVersion, startVersion) - continue - } - *checkCondition = false - return eventType, obj, nil - } - } else { - return eventQueue.Pop() - } + lw := cache.NewListWatchFromClient(client, strings.ToLower(resourceName), kapi.NamespaceAll, fields.Everything()) + eventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) + cache.NewReflector(lw, expectedType, eventQueue, 0).Run() + return eventQueue } // FilteringEndpointsConfigHandler implementation diff --git a/plugins/osdn/subnets.go b/plugins/osdn/subnets.go index b8439f4a66f7..b393e6830a94 100644 --- a/plugins/osdn/subnets.go +++ b/plugins/osdn/subnets.go @@ -9,13 +9,11 @@ import ( "github.com/openshift/openshift-sdn/pkg/netutils" osapi "github.com/openshift/origin/pkg/sdn/api" - - kapi "k8s.io/kubernetes/pkg/api" ) func (oc *OsdnController) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubnetLength uint) error { subrange := make([]string, 0) - subnets, _, err := oc.Registry.GetSubnets() + subnets, err := oc.Registry.GetSubnets() if err != nil { log.Errorf("Error in initializing/fetching subnets: %v", err) return err @@ -33,40 +31,7 @@ func (oc *OsdnController) SubnetStartMaster(clusterNetwork *net.IPNet, hostSubne return err } - getNodes := func(registry *Registry) (interface{}, string, error) { - return registry.GetNodes() - } - result, err := oc.watchAndGetResource("Node", watchNodes, getNodes) - if err != nil { - return err - } - - // Make sure each node has a Subnet allocated - nodes := result.([]kapi.Node) - for _, node := range nodes { - nodeIP, err := GetNodeIP(&node) - if err != nil { - // Don't error out; just warn so the error can be corrected by admin - log.Errorf("Failed to get Node %s IP: %v", node.Name, err) - continue - } - - err = oc.validateNode(nodeIP) - if err != nil { - // Don't error out; just warn so the error can be corrected by admin - log.Errorf("Failed to validate Node %s: %v", node.Name, err) - continue - } - _, err = oc.Registry.GetSubnet(node.Name) - if err == nil { - log.V(5).Infof("HostSubnet for node %q already exists", node.Name) - continue - } - err = oc.addNode(node.Name, nodeIP) - if err != nil { - return err - } - } + go watchNodes(oc) return nil } @@ -124,20 +89,7 @@ func (oc *OsdnController) SubnetStartNode(mtu uint) (bool, error) { return false, err } - getSubnets := func(registry *Registry) (interface{}, string, error) { - return registry.GetSubnets() - } - result, err := oc.watchAndGetResource("HostSubnet", watchSubnets, getSubnets) - if err != nil { - return false, err - } - subnets := result.([]osapi.HostSubnet) - for _, s := range subnets { - if s.HostIP != oc.localIP { - oc.pluginHooks.AddHostSubnetRules(&s) - } - } - + go watchSubnets(oc) return networkChanged, nil } @@ -171,87 +123,76 @@ func (oc *OsdnController) initSelfSubnet() error { } // Only run on the master -func watchNodes(oc *OsdnController, ready chan<- bool, start <-chan string) { - stop := make(chan bool) +func watchNodes(oc *OsdnController) { nodeEvent := make(chan *NodeEvent) - go oc.Registry.WatchNodes(nodeEvent, ready, start, stop) + go oc.Registry.WatchNodes(nodeEvent) for { - select { - case ev := <-nodeEvent: - switch ev.Type { - case Added: - nodeIP, nodeErr := GetNodeIP(ev.Node) - if nodeErr == nil { - nodeErr = oc.validateNode(nodeIP) - } + ev := <-nodeEvent + switch ev.Type { + case Added: + nodeIP, nodeErr := GetNodeIP(ev.Node) + if nodeErr == nil { + nodeErr = oc.validateNode(nodeIP) + } - sub, err := oc.Registry.GetSubnet(ev.Node.Name) - if err != nil { - if nodeErr == nil { - // subnet does not exist already - err = oc.addNode(ev.Node.Name, nodeIP) - if err != nil { - log.Errorf("Error adding node: %v", err) - } - } else { - log.Errorf("Ignoring invalid node %s/%s: %v", ev.Node.Name, nodeIP, nodeErr) + sub, err := oc.Registry.GetSubnet(ev.Node.Name) + if err != nil { + if nodeErr == nil { + // subnet does not exist already + err = oc.addNode(ev.Node.Name, nodeIP) + if err != nil { + log.Errorf("Error adding node: %v", err) + continue } } else { - if sub.HostIP != nodeIP { - err = oc.Registry.DeleteSubnet(ev.Node.Name) + log.Errorf("Ignoring invalid node %s/%s: %v", ev.Node.Name, nodeIP, nodeErr) + } + } else { + if sub.HostIP != nodeIP { + err = oc.Registry.DeleteSubnet(ev.Node.Name) + if err != nil { + log.Errorf("Error deleting subnet for node %s, old ip %s: %v", ev.Node.Name, sub.HostIP, err) + continue + } + if nodeErr == nil { + err = oc.Registry.CreateSubnet(ev.Node.Name, nodeIP, sub.Subnet) if err != nil { - log.Errorf("Error deleting subnet for node %s, old ip %s: %v", ev.Node.Name, sub.HostIP, err) + log.Errorf("Error creating subnet for node %s, ip %s: %v", ev.Node.Name, sub.HostIP, err) continue } - if nodeErr == nil { - err = oc.Registry.CreateSubnet(ev.Node.Name, nodeIP, sub.Subnet) - if err != nil { - log.Errorf("Error creating subnet for node %s, ip %s: %v", ev.Node.Name, sub.HostIP, err) - continue - } - } else { - log.Errorf("Ignoring creating invalid node %s/%s: %v", ev.Node.Name, nodeIP, nodeErr) - } + } else { + log.Errorf("Ignoring creating invalid node %s/%s: %v", ev.Node.Name, nodeIP, nodeErr) } } - case Deleted: - err := oc.deleteNode(ev.Node.Name) - if err != nil { - log.Errorf("Error deleting node: %v", err) - } } - case <-oc.sig: - log.Error("Signal received. Stopping watching of nodes.") - stop <- true - return + case Deleted: + err := oc.deleteNode(ev.Node.Name) + if err != nil { + log.Errorf("Error deleting node: %v", err) + continue + } } } } // Only run on the nodes -func watchSubnets(oc *OsdnController, ready chan<- bool, start <-chan string) { - stop := make(chan bool) +func watchSubnets(oc *OsdnController) { clusterEvent := make(chan *HostSubnetEvent) - go oc.Registry.WatchSubnets(clusterEvent, ready, start, stop) + go oc.Registry.WatchSubnets(clusterEvent) for { - select { - case ev := <-clusterEvent: - if ev.HostSubnet.HostIP == oc.localIP { + ev := <-clusterEvent + if ev.HostSubnet.HostIP == oc.localIP { + continue + } + switch ev.Type { + case Added: + if err := oc.validateNode(ev.HostSubnet.HostIP); err != nil { + log.Errorf("Ignoring invalid subnet for node %s: %v", ev.HostSubnet.HostIP, err) continue } - switch ev.Type { - case Added: - if err := oc.validateNode(ev.HostSubnet.HostIP); err != nil { - log.Errorf("Ignoring invalid subnet for node %s: %v", ev.HostSubnet.HostIP, err) - continue - } - oc.pluginHooks.AddHostSubnetRules(ev.HostSubnet) - case Deleted: - oc.pluginHooks.DeleteHostSubnetRules(ev.HostSubnet) - } - case <-oc.sig: - stop <- true - return + oc.pluginHooks.AddHostSubnetRules(ev.HostSubnet) + case Deleted: + oc.pluginHooks.DeleteHostSubnetRules(ev.HostSubnet) } } } diff --git a/plugins/osdn/vnids.go b/plugins/osdn/vnids.go index 57df8da5770d..c77a985f4882 100644 --- a/plugins/osdn/vnids.go +++ b/plugins/osdn/vnids.go @@ -6,7 +6,6 @@ import ( log "github.com/golang/glog" "github.com/openshift/openshift-sdn/pkg/netutils" - osapi "github.com/openshift/origin/pkg/sdn/api" kapi "k8s.io/kubernetes/pkg/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/container" @@ -19,29 +18,33 @@ const ( AdminVNID = uint(0) ) -func (oc *OsdnController) VnidStartMaster() error { - nets, _, err := oc.Registry.GetNetNamespaces() +func populateVNIDMap(oc *OsdnController) error { + nets, err := oc.Registry.GetNetNamespaces() if err != nil { return err } - inUse := make([]uint, 0) + for _, net := range nets { - if net.NetID != AdminVNID { - inUse = append(inUse, net.NetID) - } oc.VNIDMap[net.Name] = net.NetID } - // VNID: 0 reserved for default namespace and can reach any network in the cluster - // VNID: 1 to 9 are internally reserved for any special cases in the future - oc.netIDManager, err = netutils.NewNetIDAllocator(10, MaxVNID, inUse) + return nil +} + +func (oc *OsdnController) VnidStartMaster() error { + err := populateVNIDMap(oc) if err != nil { return err } - getNamespaces := func(registry *Registry) (interface{}, string, error) { - return registry.GetNamespaces() + inUse := make([]uint, 0) + for _, netid := range oc.VNIDMap { + if netid != AdminVNID { + inUse = append(inUse, netid) + } } - result, err := oc.watchAndGetResource("Namespace", watchNamespaces, getNamespaces) + // VNID: 0 reserved for default namespace and can reach any network in the cluster + // VNID: 1 to 9 are internally reserved for any special cases in the future + oc.netIDManager, err = netutils.NewNetIDAllocator(10, MaxVNID, inUse) if err != nil { return err } @@ -49,30 +52,7 @@ func (oc *OsdnController) VnidStartMaster() error { // 'default' namespace is currently always an admin namespace oc.adminNamespaces = append(oc.adminNamespaces, "default") - // Handle existing namespaces - namespaces := result.([]kapi.Namespace) - for _, ns := range namespaces { - nsName := ns.ObjectMeta.Name - // Revoke invalid VNID for admin namespaces - if oc.isAdminNamespace(nsName) { - netid, ok := oc.VNIDMap[nsName] - if ok && (netid != AdminVNID) { - err := oc.revokeVNID(nsName) - if err != nil { - return err - } - } - } - _, found := oc.VNIDMap[nsName] - // Assign VNID for the namespace if it doesn't exist - if !found { - err := oc.assignVNID(nsName) - if err != nil { - return err - } - } - } - + go watchNamespaces(oc) return nil } @@ -150,73 +130,44 @@ func (oc *OsdnController) revokeVNID(namespaceName string) error { return nil } -func watchNamespaces(oc *OsdnController, ready chan<- bool, start <-chan string) { +func watchNamespaces(oc *OsdnController) { nsevent := make(chan *NamespaceEvent) - stop := make(chan bool) - go oc.Registry.WatchNamespaces(nsevent, ready, start, stop) + go oc.Registry.WatchNamespaces(nsevent) for { - select { - case ev := <-nsevent: - switch ev.Type { - case Added: - err := oc.assignVNID(ev.Namespace.Name) - if err != nil { - log.Errorf("Error assigning Net ID: %v", err) - continue - } - case Deleted: - err := oc.revokeVNID(ev.Namespace.Name) - if err != nil { - log.Errorf("Error revoking Net ID: %v", err) - continue - } + ev := <-nsevent + switch ev.Type { + case Added: + err := oc.assignVNID(ev.Namespace.Name) + if err != nil { + log.Errorf("Error assigning Net ID: %v", err) + continue + } + case Deleted: + err := oc.revokeVNID(ev.Namespace.Name) + if err != nil { + log.Errorf("Error revoking Net ID: %v", err) + continue } - case <-oc.sig: - log.Error("Signal received. Stopping watching of nodes.") - stop <- true - return } } } func (oc *OsdnController) VnidStartNode() error { - getNetNamespaces := func(registry *Registry) (interface{}, string, error) { - return registry.GetNetNamespaces() - } - result, err := oc.watchAndGetResource("NetNamespace", watchNetNamespaces, getNetNamespaces) + // Populate vnid map synchronously so that existing services can fetch vnid + err := populateVNIDMap(oc) if err != nil { return err } - nslist := result.([]osapi.NetNamespace) - for _, ns := range nslist { - oc.VNIDMap[ns.Name] = ns.NetID - } - getServices := func(registry *Registry) (interface{}, string, error) { - return registry.GetServices() - } - result, err = oc.watchAndGetResource("Service", watchServices, getServices) + // Populate pod info map synchronously so that kube proxy can filter endpoints to support isolation + err = oc.Registry.PopulatePodsByIP() if err != nil { return err } - services := result.([]kapi.Service) - for _, svc := range services { - netid, found := oc.VNIDMap[svc.Namespace] - if !found { - return fmt.Errorf("Error fetching Net ID for namespace: %s", svc.Namespace) - } - oc.services[string(svc.UID)] = &svc - oc.pluginHooks.AddServiceRules(&svc, netid) - } - - getPods := func(registry *Registry) (interface{}, string, error) { - return registry.GetPods() - } - _, err = oc.watchAndGetResource("Pod", watchPods, getPods) - if err != nil { - return err - } + go watchNetNamespaces(oc) + go watchPods(oc) + go watchServices(oc) return nil } @@ -246,95 +197,76 @@ func (oc *OsdnController) updatePodNetwork(namespace string, netID uint) error { return nil } -func watchNetNamespaces(oc *OsdnController, ready chan<- bool, start <-chan string) { - stop := make(chan bool) +func watchNetNamespaces(oc *OsdnController) { netNsEvent := make(chan *NetNamespaceEvent) - go oc.Registry.WatchNetNamespaces(netNsEvent, ready, start, stop) + go oc.Registry.WatchNetNamespaces(netNsEvent) for { - select { - case ev := <-netNsEvent: - switch ev.Type { - case Added: - // Skip this event if the old and new network ids are same - if oldNetID, ok := oc.VNIDMap[ev.NetNamespace.NetName]; ok && (oldNetID == ev.NetNamespace.NetID) { - continue - } - oc.VNIDMap[ev.NetNamespace.Name] = ev.NetNamespace.NetID - err := oc.updatePodNetwork(ev.NetNamespace.NetName, ev.NetNamespace.NetID) - if err != nil { - log.Errorf("Failed to update pod network for namespace '%s', error: %s", ev.NetNamespace.NetName, err) - } - case Deleted: - err := oc.updatePodNetwork(ev.NetNamespace.NetName, AdminVNID) - if err != nil { - log.Errorf("Failed to update pod network for namespace '%s', error: %s", ev.NetNamespace.NetName, err) - } - delete(oc.VNIDMap, ev.NetNamespace.NetName) + ev := <-netNsEvent + switch ev.Type { + case Added: + // Skip this event if the old and new network ids are same + if oldNetID, ok := oc.VNIDMap[ev.NetNamespace.NetName]; ok && (oldNetID == ev.NetNamespace.NetID) { + continue + } + oc.VNIDMap[ev.NetNamespace.Name] = ev.NetNamespace.NetID + err := oc.updatePodNetwork(ev.NetNamespace.NetName, ev.NetNamespace.NetID) + if err != nil { + log.Errorf("Failed to update pod network for namespace '%s', error: %s", ev.NetNamespace.NetName, err) + } + case Deleted: + err := oc.updatePodNetwork(ev.NetNamespace.NetName, AdminVNID) + if err != nil { + log.Errorf("Failed to update pod network for namespace '%s', error: %s", ev.NetNamespace.NetName, err) + } + delete(oc.VNIDMap, ev.NetNamespace.NetName) + } + } +} + +func isServiceChanged(oldsvc, newsvc *kapi.Service) bool { + if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) { + for i := range oldsvc.Spec.Ports { + if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol || + oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port { + return true } - case <-oc.sig: - log.Error("Signal received. Stopping watching of NetNamespaces.") - stop <- true - return } + return false } + return true } -func watchServices(oc *OsdnController, ready chan<- bool, start <-chan string) { - stop := make(chan bool) +func watchServices(oc *OsdnController) { svcevent := make(chan *ServiceEvent) - go oc.Registry.WatchServices(svcevent, ready, start, stop) + services := make(map[string]*kapi.Service) + go oc.Registry.WatchServices(svcevent) + for { - select { - case ev := <-svcevent: - var netid uint - if ev.Type != Deleted { - var found bool - netid, found = oc.VNIDMap[ev.Service.Namespace] - if !found { - log.Errorf("Error fetching Net ID for namespace: %s, skipped serviceEvent: %v", ev.Service.Namespace, ev) - continue - } + ev := <-svcevent + switch ev.Type { + case Added: + netid, found := oc.VNIDMap[ev.Service.Namespace] + if !found { + log.Errorf("Error fetching Net ID for namespace: %s, skipped serviceEvent: %v", ev.Service.Namespace, ev) + continue } - switch ev.Type { - case Added: - oc.services[string(ev.Service.UID)] = ev.Service - oc.pluginHooks.AddServiceRules(ev.Service, netid) - case Deleted: - delete(oc.services, string(ev.Service.UID)) - oc.pluginHooks.DeleteServiceRules(ev.Service) - case Modified: - oldsvc, exists := oc.services[string(ev.Service.UID)] - if exists && len(oldsvc.Spec.Ports) == len(ev.Service.Spec.Ports) { - same := true - for i := range oldsvc.Spec.Ports { - if oldsvc.Spec.Ports[i].Protocol != ev.Service.Spec.Ports[i].Protocol || oldsvc.Spec.Ports[i].Port != ev.Service.Spec.Ports[i].Port { - same = false - break - } - } - if same { - continue - } - } - if exists { - oc.pluginHooks.DeleteServiceRules(oldsvc) + + oldsvc, exists := services[string(ev.Service.UID)] + if exists { + if !isServiceChanged(oldsvc, ev.Service) { + continue } - oc.services[string(ev.Service.UID)] = ev.Service - oc.pluginHooks.AddServiceRules(ev.Service, netid) + oc.pluginHooks.DeleteServiceRules(oldsvc) } - case <-oc.sig: - log.Error("Signal received. Stopping watching of services.") - stop <- true - return + services[string(ev.Service.UID)] = ev.Service + oc.pluginHooks.AddServiceRules(ev.Service, netid) + case Deleted: + delete(services, string(ev.Service.UID)) + oc.pluginHooks.DeleteServiceRules(ev.Service) } } } -func watchPods(oc *OsdnController, ready chan<- bool, start <-chan string) { - stop := make(chan bool) - go oc.Registry.WatchPods(ready, start, stop) - - <-oc.sig - log.Error("Signal received. Stopping watching of pods.") - stop <- true +func watchPods(oc *OsdnController) { + oc.Registry.WatchPods() }