Skip to content

Commit

Permalink
Merge pull request #19175 from smarterclayton/fix_lost_unique_hosts
Browse files Browse the repository at this point in the history
The unique_host router filter can lose routes
  • Loading branch information
openshift-merge-robot authored Apr 23, 2018
2 parents a622c07 + 624bf21 commit 1a712f2
Show file tree
Hide file tree
Showing 22 changed files with 1,842 additions and 484 deletions.
10 changes: 7 additions & 3 deletions pkg/cmd/infra/router/f5.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
f5plugin "github.com/openshift/origin/pkg/router/f5"
Expand Down Expand Up @@ -230,6 +231,9 @@ func (o *F5RouterOptions) Run() error {
return err
}

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
factory.RouteModifierFn = o.RouteUpdate

var plugin router.Plugin = f5Plugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
Expand All @@ -238,17 +242,17 @@ func (o *F5RouterOptions) Run() error {
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
if o.ExtendedValidation {
plugin = controller.NewExtendedValidator(plugin, recorder)
}
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewUniqueHost(plugin, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewHostAdmitter(plugin, o.F5RouteAdmitterFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
watchNodes := (len(o.InternalAddress) != 0 && len(o.VxlanGateway) != 0)
controller := factory.Create(plugin, watchNodes)
controller.Run()
Expand Down
41 changes: 21 additions & 20 deletions pkg/cmd/infra/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,31 @@ func (o *RouterSelection) Bind(flag *pflag.FlagSet) {
flag.StringVar(&o.ListenAddr, "listen-addr", cmdutil.Env("ROUTER_LISTEN_ADDR", ""), "The name of an interface to listen on to expose metrics and health checking. If not specified, will not listen. Overrides stats port.")
}

// RouteSelectionFunc returns a func that identifies the host for a route.
func (o *RouterSelection) RouteSelectionFunc() controller.RouteHostFunc {
// RouteUpdate updates the route before it is seen by the cache.
func (o *RouterSelection) RouteUpdate(route *routeapi.Route) {
if len(o.HostnameTemplate) == 0 {
return controller.HostForRoute
return
}
return func(route *routeapi.Route) string {
if !o.OverrideHostname && len(route.Spec.Host) > 0 {
return route.Spec.Host
}
s, err := variable.ExpandStrict(o.HostnameTemplate, func(key string) (string, bool) {
switch key {
case "name":
return route.Name, true
case "namespace":
return route.Namespace, true
default:
return "", false
}
})
if err != nil {
return ""
if !o.OverrideHostname && len(route.Spec.Host) > 0 {
return
}
s, err := variable.ExpandStrict(o.HostnameTemplate, func(key string) (string, bool) {
switch key {
case "name":
return route.Name, true
case "namespace":
return route.Namespace, true
default:
return "", false
}
return strings.Trim(s, "\"'")
})
if err != nil {
return
}

s = strings.Trim(s, "\"'")
glog.V(4).Infof("changing route %s to %s", route.Spec.Host, s)
route.Spec.Host = s
}

func (o *RouterSelection) AdmissionCheck(route *routeapi.Route) error {
Expand Down
12 changes: 8 additions & 4 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cmdversion "github.com/openshift/origin/pkg/cmd/version"
projectinternalclientset "github.com/openshift/origin/pkg/project/generated/internalclientset"
routeinternalclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
routelisters "github.com/openshift/origin/pkg/route/generated/listers/route/internalversion"
"github.com/openshift/origin/pkg/router"
"github.com/openshift/origin/pkg/router/controller"
"github.com/openshift/origin/pkg/router/metrics"
Expand Down Expand Up @@ -405,12 +406,15 @@ func (o *TemplateRouterOptions) Run() error {
return err
}

svcFetcher := templateplugin.NewListWatchServiceLookup(kc.Core(), o.ResyncInterval)
svcFetcher := templateplugin.NewListWatchServiceLookup(kc.Core(), o.ResyncInterval, o.Namespace)
templatePlugin, err := templateplugin.NewTemplatePlugin(pluginCfg, svcFetcher)
if err != nil {
return err
}

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
factory.RouteModifierFn = o.RouteUpdate

var plugin router.Plugin = templatePlugin
var recorder controller.RejectionRecorder = controller.LogRejections
if o.UpdateStatus {
Expand All @@ -419,17 +423,17 @@ func (o *TemplateRouterOptions) Run() error {
tracker := controller.NewSimpleContentionTracker(o.ResyncInterval / 10)
tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName))
go tracker.Run(wait.NeverStop)
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), o.RouterName, o.RouterCanonicalHostname, lease, tracker)
routeLister := routelisters.NewRouteLister(factory.CreateRoutesSharedInformer().GetIndexer())
status := controller.NewStatusAdmitter(plugin, routeclient.Route(), routeLister, o.RouterName, o.RouterCanonicalHostname, lease, tracker)
recorder = status
plugin = status
}
if o.ExtendedValidation {
plugin = controller.NewExtendedValidator(plugin, recorder)
}
plugin = controller.NewUniqueHost(plugin, o.RouteSelectionFunc(), o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewUniqueHost(plugin, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)
plugin = controller.NewHostAdmitter(plugin, o.RouteAdmissionFunc(), o.AllowWildcardRoutes, o.RouterSelection.DisableNamespaceOwnershipCheck, recorder)

factory := o.RouterSelection.NewFactory(routeclient, projectclient.Project().Projects(), kc)
controller := factory.Create(plugin, false)
controller.Run()

Expand Down
46 changes: 35 additions & 11 deletions pkg/router/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type RouterControllerFactory struct {
FieldSelector string
NamespaceLabels labels.Selector
ProjectLabels labels.Selector
RouteModifierFn func(route *routeapi.Route)

informers map[reflect.Type]kcache.SharedIndexInformer
}
Expand Down Expand Up @@ -96,13 +97,13 @@ func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes bool)

func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.createNamespacesSharedInformer(rc)
f.createNamespacesSharedInformer()
}
f.createEndpointsSharedInformer(rc)
f.createRoutesSharedInformer(rc)
f.createEndpointsSharedInformer()
f.CreateRoutesSharedInformer()

if rc.WatchNodes {
f.createNodesSharedInformer(rc)
f.createNodesSharedInformer()
}

// Start informers
Expand All @@ -122,6 +123,7 @@ func (f *RouterControllerFactory) registerInformerEventHandlers(rc *routercontro
f.registerSharedInformerEventHandlers(&kapi.Namespace{}, rc.HandleNamespace)
}
f.registerSharedInformerEventHandlers(&kapi.Endpoints{}, rc.HandleEndpoints)

f.registerSharedInformerEventHandlers(&routeapi.Route{}, rc.HandleRoute)

if rc.WatchNodes {
Expand Down Expand Up @@ -187,7 +189,7 @@ func (f *RouterControllerFactory) setSelectors(options *v1.ListOptions) {
options.FieldSelector = f.FieldSelector
}

func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createEndpointsSharedInformer() {
// we do not scope endpoints by labels or fields because the route labels != endpoints labels
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
Expand All @@ -204,39 +206,61 @@ func (f *RouterControllerFactory) createEndpointsSharedInformer(rc *routercontro
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createRoutesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) CreateRoutesSharedInformer() kcache.SharedIndexInformer {
rt := &routeapi.Route{}
objType := reflect.TypeOf(rt)
if informer, ok := f.informers[objType]; ok {
return informer
}

lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
f.setSelectors(&options)
routeList, err := f.RClient.Route().Routes(f.Namespace).List(options)
if err != nil {
return nil, err
}
if f.RouteModifierFn != nil {
for i := range routeList.Items {
f.RouteModifierFn(&routeList.Items[i])
}
}
// Return routes in order of age to avoid rejections during resync
sort.Sort(routeAge(routeList.Items))
return routeList, nil
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
f.setSelectors(&options)
return f.RClient.Route().Routes(f.Namespace).Watch(options)
w, err := f.RClient.Route().Routes(f.Namespace).Watch(options)
if err != nil {
return nil, err
}
if f.RouteModifierFn != nil {
w = watch.Filter(w, func(in watch.Event) (watch.Event, bool) {
if route, ok := in.Object.(*routeapi.Route); ok {
f.RouteModifierFn(route)
}
return in, true
})
}
return w, nil
},
}
rt := &routeapi.Route{}
objType := reflect.TypeOf(rt)
indexers := kcache.Indexers{kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc}
informer := kcache.NewSharedIndexInformer(lw, rt, f.ResyncInterval, indexers)
f.informers[objType] = informer
return informer
}

func (f *RouterControllerFactory) createNodesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createNodesSharedInformer() {
// Use stock node informer as we don't need namespace/labels/fields filtering on nodes
ifactory := informerfactory.NewSharedInformerFactory(f.KClient, f.ResyncInterval)
informer := ifactory.Core().InternalVersion().Nodes().Informer()
objType := reflect.TypeOf(&kapi.Node{})
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createNamespacesSharedInformer(rc *routercontroller.RouterController) {
func (f *RouterControllerFactory) createNamespacesSharedInformer() {
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = f.NamespaceLabels.String()
Expand Down
9 changes: 7 additions & 2 deletions pkg/router/controller/host_admitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,12 @@ func makeRoute(ns, name, host, path string, wildcard bool, creationTimestamp met
policy = routeapi.WildcardPolicySubdomain
}
return &routeapi.Route{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns, CreationTimestamp: creationTimestamp},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
CreationTimestamp: creationTimestamp,
UID: types.UID(fmt.Sprintf("%d_%s_%s", creationTimestamp.Time.Unix(), ns, name)),
},
Spec: routeapi.RouteSpec{
Host: host,
Path: path,
Expand Down Expand Up @@ -850,7 +855,7 @@ func TestDisableOwnershipChecksFuzzing(t *testing.T) {

admitAll := func(route *routeapi.Route) error { return nil }
recorder := rejectionRecorder{rejections: make(map[string]string)}
uniqueHostPlugin := NewUniqueHost(p, HostForRoute, true, recorder)
uniqueHostPlugin := NewUniqueHost(p, true, recorder)
admitter := NewHostAdmitter(uniqueHostPlugin, RouteAdmissionFunc(admitAll), true, true, recorder)

oldest := metav1.Time{Time: time.Now()}
Expand Down
Loading

0 comments on commit 1a712f2

Please sign in to comment.