Skip to content

Commit

Permalink
Merge pull request #14030 from pravisankar/sdn-use-shared-informers
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored May 18, 2017
2 parents 2ea9f5a + abc81c6 commit fb92178
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 248 deletions.
7 changes: 4 additions & 3 deletions pkg/cmd/server/kubernetes/node/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable
if err != nil {
return nil, fmt.Errorf("Cannot parse the provided ip-tables sync period (%s) : %v", options.IPTablesSyncPeriod, err)
}
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, options.NodeName, options.NodeIP, iptablesSyncPeriod, options.NetworkConfig.MTU)

internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod)

sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, options.NodeName, options.NodeIP, iptablesSyncPeriod, options.NetworkConfig.MTU, internalKubeInformers)
if err != nil {
return nil, fmt.Errorf("SDN initialization failed: %v", err)
}
Expand Down Expand Up @@ -311,8 +314,6 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable
return nil, fmt.Errorf("SDN proxy initialization failed: %v", err)
}

internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod)

config := &NodeConfig{
BindAddress: options.ServingInfo.BindAddress,

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (c *MasterConfig) RunDeploymentTriggerController() {
// RunSDNController runs openshift-sdn if the said network plugin is provided
func (c *MasterConfig) RunSDNController() {
oClient, kClient := c.SDNControllerClients()
if err := sdnplugin.StartMaster(c.Options.NetworkConfig, oClient, kClient); err != nil {
if err := sdnplugin.StartMaster(c.Options.NetworkConfig, oClient, kClient, c.Informers); err != nil {
glog.Fatalf("SDN initialization failed: %v", err)
}
}
Expand Down
75 changes: 55 additions & 20 deletions pkg/sdn/plugin/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugin
import (
"fmt"
"net"
"reflect"
"strings"
"time"

Expand All @@ -14,9 +15,11 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
kcache "k8s.io/client-go/tools/cache"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

Expand Down Expand Up @@ -135,14 +138,6 @@ func RunEventQueue(client kcache.Getter, resourceName ResourceName, process Proc
expectedType = &osapi.HostSubnet{}
case NetNamespaces:
expectedType = &osapi.NetNamespace{}
case Nodes:
expectedType = &kapi.Node{}
case Namespaces:
expectedType = &kapi.Namespace{}
case Services:
expectedType = &kapi.Service{}
case Pods:
expectedType = &kapi.Pod{}
case EgressNetworkPolicies:
expectedType = &osapi.EgressNetworkPolicy{}
case NetworkPolicies:
Expand All @@ -157,17 +152,57 @@ func RunEventQueue(client kcache.Getter, resourceName ResourceName, process Proc
}
}

func RunNamespacedPodEventQueue(client kcache.Getter, namespace string, closeChan chan struct{}, process ProcessEventFunc) {
eventQueue := newEventQueue(client, Pods, &kapi.Pod{}, namespace)
// Loop calling eventQueue.Pop() until closeChan is closed. process() will be called
// once after closeChan is closed; this possibility is unavoidable anyway due to race
// conditions.
for {
select {
case <-closeChan:
return
default:
eventQueue.Pop(process, &kapi.Pod{})
}
// RegisterSharedInformerEventHandlers registers addOrUpdateFunc and delFunc event handlers with
// kubernetes shared informers for the given resource name.
func RegisterSharedInformerEventHandlers(kubeInformers kinternalinformers.SharedInformerFactory,
addOrUpdateFunc func(interface{}, interface{}, watch.EventType),
delFunc func(interface{}), resourceName ResourceName) {

var expectedObjType interface{}
var informer kcache.SharedIndexInformer

internalVersion := kubeInformers.Core().InternalVersion()

switch resourceName {
case Nodes:
informer = internalVersion.Nodes().Informer()
expectedObjType = &kapi.Node{}
case Namespaces:
informer = internalVersion.Namespaces().Informer()
expectedObjType = &kapi.Namespace{}
case Services:
informer = internalVersion.Services().Informer()
expectedObjType = &kapi.Service{}
case Pods:
informer = internalVersion.Pods().Informer()
expectedObjType = &kapi.Pod{}
default:
glog.Errorf("Unknown resource name: %s", resourceName)
return
}

informer.AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addOrUpdateFunc(obj, nil, watch.Added)
},
UpdateFunc: func(old, cur interface{}) {
addOrUpdateFunc(cur, old, watch.Modified)
},
DeleteFunc: func(obj interface{}) {
if reflect.TypeOf(expectedObjType) != reflect.TypeOf(obj) {
tombstone, ok := obj.(kcache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone: %+v", obj)
return
}

obj = tombstone.Obj
if reflect.TypeOf(expectedObjType) != reflect.TypeOf(obj) {
glog.Errorf("Tombstone contained object that is not a %s: %+v", resourceName, obj)
return
}
}
delFunc(obj)
},
})
}
14 changes: 11 additions & 3 deletions pkg/sdn/plugin/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

osclient "github.com/openshift/origin/pkg/client"
osconfigapi "github.com/openshift/origin/pkg/cmd/server/api"
"github.com/openshift/origin/pkg/controller/shared"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/util/netutils"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)
Expand All @@ -22,18 +24,24 @@ type OsdnMaster struct {
networkInfo *NetworkInfo
subnetAllocator *netutils.SubnetAllocator
vnids *masterVNIDMap
informers shared.InformerFactory

// Holds Node IP used in creating host subnet for a node
hostSubnetNodeIPs map[ktypes.UID]string
}

func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclient.Client, kClient kclientset.Interface) error {
func StartMaster(networkConfig osconfigapi.MasterNetworkConfig, osClient *osclient.Client, kClient kclientset.Interface, informers shared.InformerFactory) error {
if !osapi.IsOpenShiftNetworkPlugin(networkConfig.NetworkPluginName) {
return nil
}

log.Infof("Initializing SDN master of type %q", networkConfig.NetworkPluginName)

master := &OsdnMaster{
kClient: kClient,
osClient: osClient,
kClient: kClient,
osClient: osClient,
informers: informers,
hostSubnetNodeIPs: map[ktypes.UID]string{},
}

var err error
Expand Down
Loading

0 comments on commit fb92178

Please sign in to comment.