Skip to content

Commit

Permalink
React to changes in watch cache initialization
Browse files Browse the repository at this point in the history
Remove some complexity in RESTOptionsGetter and add default watch cache
sizes for resources that are read by nodes.
  • Loading branch information
smarterclayton committed Sep 17, 2017
1 parent 3e1eebf commit 44530e9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 66 deletions.
20 changes: 14 additions & 6 deletions pkg/cmd/server/kubernetes/master/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,20 @@ func buildKubeApiserverConfig(
return originLongRunningRequestRE.MatchString(r.URL.Path) || kubeLongRunningFunc(r, requestInfo)
}

if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := cachesize.NewHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
if userSpecified, err := genericoptions.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}
apiserverOptions.Etcd.WatchCacheSizes, err = genericoptions.WriteWatchCacheSizes(sizes)
if err != nil {
return nil, err
}
}

if err := apiserverOptions.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, err
}
Expand Down Expand Up @@ -566,12 +580,6 @@ func buildKubeApiserverConfig(
EnableCoreControllers: true,
}

if apiserverOptions.Etcd.EnableWatchCache {
// TODO(rebase): upstream also does the following:
// cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
cachesize.SetWatchCacheSizes(apiserverOptions.GenericServerRunOptions.WatchCacheSizes)
}

if kubeApiserverConfig.EnableCoreControllers {
ttl := masterConfig.KubernetesMasterConfig.MasterEndpointReconcileTTL
interval := ttl * 2 / 3
Expand Down
2 changes: 0 additions & 2 deletions pkg/security/registry/securitycontextconstraints/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/cachesize"

securityapi "github.com/openshift/origin/pkg/security/apis/security"
"github.com/openshift/origin/pkg/security/registry/securitycontextconstraints"
Expand All @@ -30,7 +29,6 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
},
PredicateFunc: securitycontextconstraints.Matcher,
DefaultQualifiedResource: securityapi.Resource("securitycontextconstraints"),
WatchCacheSize: cachesize.GetWatchCacheSizeByResource("securitycontextconstraints"),

CreateStrategy: securitycontextconstraints.Strategy,
UpdateStrategy: securitycontextconstraints.Strategy,
Expand Down
101 changes: 43 additions & 58 deletions pkg/util/restoptions/configgetter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package restoptions

import (
"fmt"
"strconv"
"strings"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/runtime/schema"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"

"github.com/golang/glog"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
)
Expand All @@ -41,7 +35,6 @@ type configRESTOptionsGetter struct {
}

// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
// By default, the etcd watch cache is enabled with a size of 1000 per resource type.
// TODO: this class should either not need to know about configapi.MasterConfig, or not be in pkg/util
func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig *serverstorage.ResourceConfig, resourcePrefixOverrides map[schema.GroupResource]string, enforcedStorageVersions map[schema.GroupResource]schema.GroupVersion, quorumResources map[schema.GroupResource]struct{}) (Getter, error) {
apiserverOptions, err := kubernetes.BuildKubeAPIserverOptions(masterOptions)
Expand All @@ -55,27 +48,24 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
storageFactory.DefaultResourcePrefixes = resourcePrefixOverrides
storageFactory.StorageConfig.Prefix = masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix

// TODO: refactor vendor/k8s.io/kubernetes/pkg/registry/cachesize to remove our custom cache size code
errs := []error{}
cacheSizes := map[schema.GroupResource]int{}
for _, c := range apiserverOptions.GenericServerRunOptions.WatchCacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)", c))
continue
// perform watch cache heuristic like upstream
if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := newHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
if userSpecified, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}

resource := schema.ParseGroupResource(tokens[0])

size, err := strconv.Atoi(tokens[1])
apiserverOptions.Etcd.WatchCacheSizes, err = options.WriteWatchCacheSizes(sizes)
if err != nil {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s': %v", c, err))
continue
return nil, err
}
cacheSizes[resource] = size
}
if len(errs) > 0 {
return nil, kerrors.NewAggregate(errs)

cacheSizes, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes)
if err != nil {
return nil, err
}

return &configRESTOptionsGetter{
Expand Down Expand Up @@ -108,41 +98,14 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
config.Quorum = true
}

configuredCacheSize, specified := g.cacheSizes[resource]
if !specified || configuredCacheSize < 0 {
configuredCacheSize = g.defaultCacheSize
}
storageWithCacher := registry.StorageWithCacher(configuredCacheSize)

decorator := func(
copier runtime.ObjectCopier,
storageConfig *storagebackend.Config,
requestedSize *int,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFn func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFn storage.TriggerPublisherFunc,
) (storage.Interface, factory.DestroyFunc) {
// use the origin default cache size, not the one in registry.StorageWithCacher
capacity := &configuredCacheSize
if requestedSize != nil {
capacity = requestedSize
}

if *capacity == 0 || !g.cacheEnabled {
glog.V(5).Infof("using uncached watch storage for %s (quorum=%t)", resource.String(), storageConfig.Quorum)
return generic.UndecoratedStorage(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
}

glog.V(5).Infof("using watch cache storage (capacity=%v, quorum=%t) for %s %#v", *capacity, storageConfig.Quorum, resource.String(), storageConfig)
return storageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
cacheSize, ok := g.cacheSizes[resource]
if !ok {
cacheSize = g.defaultCacheSize
}

resourceOptions := generic.RESTOptions{
StorageConfig: config,
Decorator: decorator,
Decorator: registry.StorageWithCacher(cacheSize),
DeleteCollectionWorkers: g.deleteCollectionWorkers,
EnableGarbageCollection: g.enableGarbageCollection,
ResourcePrefix: g.storageFactory.ResourcePrefix(resource),
Expand All @@ -151,3 +114,25 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)

return resourceOptions, nil
}

// newHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory. It reuses the upstream heuristic and adds OpenShift specific resources.
func newHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
// TODO: Revisit this heuristic, copied from upstream
clusterSize := expectedRAMCapacityMB / 60

// default enable watch caches for resources that will have a high number of clients accessing it
// and where the write rate may be significant
watchCacheSizes := make(map[schema.GroupResource]int)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "hostsubnets"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "netnamespaces"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "egressnetworkpolicies"}] = maxInt(10*clusterSize, 100)
return watchCacheSizes
}

func maxInt(a, b int) int {
if a > b {
return a
}
return b
}

0 comments on commit 44530e9

Please sign in to comment.