Skip to content

Commit

Permalink
React to changes in watch cache initialization
Browse files Browse the repository at this point in the history
Remove some complexity in RESTOptionsGetter and add default watch cache
sizes for resources that are read by nodes.
  • Loading branch information
smarterclayton committed Sep 18, 2017
1 parent 25e81b6 commit 929dc82
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 68 deletions.
2 changes: 2 additions & 0 deletions contrib/completions/bash/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -32111,6 +32111,8 @@ _openshift_start_kubernetes_apiserver()
local_nonpersistent_flags+=("--contention-profiling")
flags+=("--cors-allowed-origins=")
local_nonpersistent_flags+=("--cors-allowed-origins=")
flags+=("--default-watch-cache-size=")
local_nonpersistent_flags+=("--default-watch-cache-size=")
flags+=("--delete-collection-workers=")
local_nonpersistent_flags+=("--delete-collection-workers=")
flags+=("--deserialization-cache-size=")
Expand Down
2 changes: 2 additions & 0 deletions contrib/completions/zsh/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -32260,6 +32260,8 @@ _openshift_start_kubernetes_apiserver()
local_nonpersistent_flags+=("--contention-profiling")
flags+=("--cors-allowed-origins=")
local_nonpersistent_flags+=("--cors-allowed-origins=")
flags+=("--default-watch-cache-size=")
local_nonpersistent_flags+=("--default-watch-cache-size=")
flags+=("--delete-collection-workers=")
local_nonpersistent_flags+=("--delete-collection-workers=")
flags+=("--deserialization-cache-size=")
Expand Down
20 changes: 14 additions & 6 deletions pkg/cmd/server/kubernetes/master/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,20 @@ func buildKubeApiserverConfig(
return originLongRunningRequestRE.MatchString(r.URL.Path) || kubeLongRunningFunc(r, requestInfo)
}

if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := cachesize.NewHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
if userSpecified, err := genericoptions.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}
apiserverOptions.Etcd.WatchCacheSizes, err = genericoptions.WriteWatchCacheSizes(sizes)
if err != nil {
return nil, err
}
}

if err := apiserverOptions.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return nil, err
}
Expand Down Expand Up @@ -566,12 +580,6 @@ func buildKubeApiserverConfig(
EnableCoreControllers: true,
}

if apiserverOptions.Etcd.EnableWatchCache {
// TODO(rebase): upstream also does the following:
// cachesize.InitializeWatchCacheSizes(s.GenericServerRunOptions.TargetRAMMB)
cachesize.SetWatchCacheSizes(apiserverOptions.GenericServerRunOptions.WatchCacheSizes)
}

if kubeApiserverConfig.EnableCoreControllers {
ttl := masterConfig.KubernetesMasterConfig.MasterEndpointReconcileTTL
interval := ttl * 2 / 3
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/server/kubernetes/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ func TestNewMasterLeasesHasCorrectTTL(t *testing.T) {
}

restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
watchCacheDisabled := 0
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, &watchCacheDisabled, nil, "masterleases", nil, nil, nil, nil)
storageInterface, _ := restOptions.Decorator(kapi.Scheme, restOptions.StorageConfig, nil, "masterleases", nil, nil, nil, nil)
defer server.Terminate(t)

masterLeases := newMasterLeases(storageInterface, 15)
Expand Down
2 changes: 0 additions & 2 deletions pkg/security/registry/securitycontextconstraints/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/cachesize"

securityapi "github.com/openshift/origin/pkg/security/apis/security"
"github.com/openshift/origin/pkg/security/registry/securitycontextconstraints"
Expand All @@ -30,7 +29,6 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
},
PredicateFunc: securitycontextconstraints.Matcher,
DefaultQualifiedResource: securityapi.Resource("securitycontextconstraints"),
WatchCacheSize: cachesize.GetWatchCacheSizeByResource("securitycontextconstraints"),

CreateStrategy: securitycontextconstraints.Strategy,
UpdateStrategy: securitycontextconstraints.Strategy,
Expand Down
101 changes: 43 additions & 58 deletions pkg/util/restoptions/configgetter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package restoptions

import (
"fmt"
"strconv"
"strings"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/runtime/schema"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"

"github.com/golang/glog"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
)
Expand All @@ -41,7 +35,6 @@ type configRESTOptionsGetter struct {
}

// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
// By default, the etcd watch cache is enabled with a size of 1000 per resource type.
// TODO: this class should either not need to know about configapi.MasterConfig, or not be in pkg/util
func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig *serverstorage.ResourceConfig, resourcePrefixOverrides map[schema.GroupResource]string, enforcedStorageVersions map[schema.GroupResource]schema.GroupVersion, quorumResources map[schema.GroupResource]struct{}) (Getter, error) {
apiserverOptions, err := kubernetes.BuildKubeAPIserverOptions(masterOptions)
Expand All @@ -55,27 +48,24 @@ func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig
storageFactory.DefaultResourcePrefixes = resourcePrefixOverrides
storageFactory.StorageConfig.Prefix = masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix

// TODO: refactor vendor/k8s.io/kubernetes/pkg/registry/cachesize to remove our custom cache size code
errs := []error{}
cacheSizes := map[schema.GroupResource]int{}
for _, c := range apiserverOptions.GenericServerRunOptions.WatchCacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)", c))
continue
// perform watch cache heuristic like upstream
if apiserverOptions.Etcd.EnableWatchCache {
glog.V(2).Infof("Initializing cache sizes based on %dMB limit", apiserverOptions.GenericServerRunOptions.TargetRAMMB)
sizes := newHeuristicWatchCacheSizes(apiserverOptions.GenericServerRunOptions.TargetRAMMB)
if userSpecified, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes); err == nil {
for resource, size := range userSpecified {
sizes[resource] = size
}
}

resource := schema.ParseGroupResource(tokens[0])

size, err := strconv.Atoi(tokens[1])
apiserverOptions.Etcd.WatchCacheSizes, err = options.WriteWatchCacheSizes(sizes)
if err != nil {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s': %v", c, err))
continue
return nil, err
}
cacheSizes[resource] = size
}
if len(errs) > 0 {
return nil, kerrors.NewAggregate(errs)

cacheSizes, err := options.ParseWatchCacheSizes(apiserverOptions.Etcd.WatchCacheSizes)
if err != nil {
return nil, err
}

return &configRESTOptionsGetter{
Expand Down Expand Up @@ -108,41 +98,14 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
config.Quorum = true
}

configuredCacheSize, specified := g.cacheSizes[resource]
if !specified || configuredCacheSize < 0 {
configuredCacheSize = g.defaultCacheSize
}
storageWithCacher := registry.StorageWithCacher(configuredCacheSize)

decorator := func(
copier runtime.ObjectCopier,
storageConfig *storagebackend.Config,
requestedSize *int,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFn func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFn storage.TriggerPublisherFunc,
) (storage.Interface, factory.DestroyFunc) {
// use the origin default cache size, not the one in registry.StorageWithCacher
capacity := &configuredCacheSize
if requestedSize != nil {
capacity = requestedSize
}

if *capacity == 0 || !g.cacheEnabled {
glog.V(5).Infof("using uncached watch storage for %s (quorum=%t)", resource.String(), storageConfig.Quorum)
return generic.UndecoratedStorage(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
}

glog.V(5).Infof("using watch cache storage (capacity=%v, quorum=%t) for %s %#v", *capacity, storageConfig.Quorum, resource.String(), storageConfig)
return storageWithCacher(copier, storageConfig, capacity, objectType, resourcePrefix, keyFunc, newListFn, getAttrsFunc, triggerFn)
cacheSize, ok := g.cacheSizes[resource]
if !ok {
cacheSize = g.defaultCacheSize
}

resourceOptions := generic.RESTOptions{
StorageConfig: config,
Decorator: decorator,
Decorator: registry.StorageWithCacher(cacheSize),
DeleteCollectionWorkers: g.deleteCollectionWorkers,
EnableGarbageCollection: g.enableGarbageCollection,
ResourcePrefix: g.storageFactory.ResourcePrefix(resource),
Expand All @@ -151,3 +114,25 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)

return resourceOptions, nil
}

// newHeuristicWatchCacheSizes returns a map of suggested watch cache sizes based on total
// memory. It reuses the upstream heuristic and adds OpenShift specific resources.
func newHeuristicWatchCacheSizes(expectedRAMCapacityMB int) map[schema.GroupResource]int {
// TODO: Revisit this heuristic, copied from upstream
clusterSize := expectedRAMCapacityMB / 60

// default enable watch caches for resources that will have a high number of clients accessing it
// and where the write rate may be significant
watchCacheSizes := make(map[schema.GroupResource]int)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "hostsubnets"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "netnamespaces"}] = maxInt(5*clusterSize, 100)
watchCacheSizes[schema.GroupResource{Group: "network.openshift.io", Resource: "egressnetworkpolicies"}] = maxInt(10*clusterSize, 100)
return watchCacheSizes
}

func maxInt(a, b int) int {
if a > b {
return a
}
return b
}

0 comments on commit 929dc82

Please sign in to comment.