From fe8eebd02f7cd6024766d403179f8893536a6e67 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 13:39:07 -0400 Subject: [PATCH 1/8] UPSTREAM: 28294: kubectl: don't display empty list when trying to get a single resource that isn't found --- vendor/k8s.io/kubernetes/pkg/kubectl/cmd/get.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vendor/k8s.io/kubernetes/pkg/kubectl/cmd/get.go b/vendor/k8s.io/kubernetes/pkg/kubectl/cmd/get.go index 57a8faaea30a..64689f1a7f62 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubectl/cmd/get.go +++ b/vendor/k8s.io/kubernetes/pkg/kubectl/cmd/get.go @@ -248,6 +248,9 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string singular := false infos, err := r.IntoSingular(&singular).Infos() if err != nil { + if singular { + return err + } allErrs = append(allErrs, err) } From edc443d55da42fcec9a718630d72ffbeee5e780d Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 13:39:50 -0400 Subject: [PATCH 2/8] UPSTREAM: 28886: Add ForgetPod to SchedulerCache --- .../plugin/pkg/scheduler/scheduler.go | 3 +++ .../pkg/scheduler/schedulercache/cache.go | 24 +++++++++++++++++++ .../scheduler/schedulercache/cache_test.go | 9 +++++++ .../pkg/scheduler/schedulercache/interface.go | 17 +++++++------ .../pkg/scheduler/testing/fake_cache.go | 2 ++ .../pkg/scheduler/testing/pods_to_cache.go | 6 ++--- 6 files changed, 51 insertions(+), 10 deletions(-) diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/scheduler.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/scheduler.go index 067cd41a0ba8..09a4887fbf09 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/scheduler.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/scheduler.go @@ -140,6 +140,9 @@ func (s *Scheduler) scheduleOne() { err := s.config.Binder.Bind(b) if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) + if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil { + glog.Errorf("scheduler cache ForgetPod failed: %v", err) + } s.config.Error(pod, err) s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err) s.config.PodConditionUpdater.Update(pod, &api.PodCondition{ diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache.go index a55b0e9feaee..b8e286099875 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache.go @@ -126,6 +126,30 @@ func (cache *schedulerCache) assumePod(pod *api.Pod, now time.Time) error { return nil } +func (cache *schedulerCache) ForgetPod(pod *api.Pod) error { + key, err := getPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + _, ok := cache.podStates[key] + switch { + // Only assumed pod can be forgotten. + case ok && cache.assumedPods[key]: + err := cache.removePod(pod) + if err != nil { + return err + } + delete(cache.podStates, key) + default: + return fmt.Errorf("pod state wasn't assumed but get forgotten. Pod key: %v", key) + } + return nil +} + func (cache *schedulerCache) AddPod(pod *api.Pod) error { key, err := getPodKey(pod) if err != nil { diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache_test.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache_test.go index 950ff38bcb5e..e42b93c7605f 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -95,6 +95,15 @@ func TestAssumePodScheduled(t *testing.T) { if !reflect.DeepEqual(n, tt.wNodeInfo) { t.Errorf("#%d: node info get=%s, want=%s", i, n, tt.wNodeInfo) } + + for _, pod := range tt.pods { + if err := cache.ForgetPod(pod); err != nil { + t.Fatalf("ForgetPod failed: %v", err) + } + } + if cache.nodes[nodeName] != nil { + t.Errorf("NodeInfo should be cleaned for %s", nodeName) + } } } diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/interface.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/interface.go index 07330a2a8971..09e89972f757 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/interface.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache/interface.go @@ -36,13 +36,13 @@ import ( // | | | | Update // + Assume Add v v | //Initial +--------> Assumed +------------+---> Added <--+ -// + | + -// | | | -// | Add | | Remove -// | | | -// | + | -// +-------------> Expired +----> Deleted -// Expire +// ^ + + | + +// | | | | | +// | | | Add | | Remove +// | | | | | +// | | | + | +// +----------------+ +-----------> Expired +----> Deleted +// Forget Expire // // // Note that an assumed pod can expire, because if we haven't received Add event notifying us @@ -61,6 +61,9 @@ type Cache interface { // After expiration, its information would be subtracted. AssumePod(pod *api.Pod) error + // ForgetPod removes an assumed pod from cache. + ForgetPod(pod *api.Pod) error + // AddPod either confirms a pod if it's assumed, or adds it back if it's expired. // If added back, the pod's information would be added again. AddPod(pod *api.Pod) error diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/fake_cache.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/fake_cache.go index 02c76d3d6506..45bfeae646f1 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/fake_cache.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/fake_cache.go @@ -32,6 +32,8 @@ func (f *FakeCache) AssumePod(pod *api.Pod) error { return nil } +func (f *FakeCache) ForgetPod(pod *api.Pod) error { return nil } + func (f *FakeCache) AddPod(pod *api.Pod) error { return nil } func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil } diff --git a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/pods_to_cache.go b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/pods_to_cache.go index 99fe15ee7030..da8735ec0fdc 100644 --- a/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/pods_to_cache.go +++ b/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/testing/pods_to_cache.go @@ -25,9 +25,9 @@ import ( // PodsToCache is used for testing type PodsToCache []*api.Pod -func (p PodsToCache) AssumePod(pod *api.Pod) error { - return nil -} +func (p PodsToCache) AssumePod(pod *api.Pod) error { return nil } + +func (p PodsToCache) ForgetPod(pod *api.Pod) error { return nil } func (p PodsToCache) AddPod(pod *api.Pod) error { return nil } From ca86d3b8f52fb8ec10db08d90781f329a41e04b8 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 13:31:14 -0400 Subject: [PATCH 3/8] Update RunNodeController to match upstream change --- pkg/cmd/server/kubernetes/master.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/server/kubernetes/master.go b/pkg/cmd/server/kubernetes/master.go index c1ae32f33495..d32e140eb41d 100644 --- a/pkg/cmd/server/kubernetes/master.go +++ b/pkg/cmd/server/kubernetes/master.go @@ -337,7 +337,7 @@ func (c *MasterConfig) RunNodeController() { _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR) - controller := nodecontroller.NewNodeController( + controller, err := nodecontroller.NewNodeController( c.CloudProvider, clientadapter.FromUnversionedClient(c.KubeClient), s.PodEvictionTimeout.Duration, @@ -356,6 +356,9 @@ func (c *MasterConfig) RunNodeController() { s.AllocateNodeCIDRs, ) + if err != nil { + glog.Fatalf("Unable to start node controller: %v", err) + } controller.Run(s.NodeSyncPeriod.Duration) } From 845888da02cb5cf7080ffacec9ccbf0dee9d69bd Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 10:55:27 -0400 Subject: [PATCH 4/8] UPSTREAM: 29063: Automated cherry pick of #28604 #29062 Move CIDR allocation logic away from nodecontroller.go List all nodes and occupy cidr map before starting allocations --- .../kubernetes/cmd/integration/integration.go | 5 +- .../app/controllermanager.go | 5 +- .../pkg/controller/node/cidr_allocator.go | 252 +++++--- .../controller/node/cidr_allocator_test.go | 610 ++++++++++-------- .../pkg/controller/node/cidr_set.go | 142 ++++ .../pkg/controller/node/cidr_set_test.go | 335 ++++++++++ .../pkg/controller/node/nodecontroller.go | 175 ++--- .../controller/node/nodecontroller_test.go | 171 +---- .../pkg/controller/node/test_utils.go | 237 +++++++ 9 files changed, 1257 insertions(+), 675 deletions(-) create mode 100644 vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go create mode 100644 vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set_test.go create mode 100644 vendor/k8s.io/kubernetes/pkg/controller/node/test_utils.go diff --git a/vendor/k8s.io/kubernetes/cmd/integration/integration.go b/vendor/k8s.io/kubernetes/cmd/integration/integration.go index 85277c41d6df..703aaaa4201d 100644 --- a/vendor/k8s.io/kubernetes/cmd/integration/integration.go +++ b/vendor/k8s.io/kubernetes/cmd/integration/integration.go @@ -199,8 +199,11 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string go podInformer.Run(wait.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), + nodeController, err := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, nil, 0, false) + if err != nil { + glog.Fatalf("Failed to initialize nodecontroller: %v", err) + } nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisortest.Fake) diff --git a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go index 9947e453437b..7ce336537526 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go +++ b/vendor/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go @@ -238,10 +238,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)), s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) + if err != nil { + glog.Fatalf("Failed to initialize nodecontroller: %v", err) + } nodeController.Run(s.NodeSyncPeriod.Duration) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go index f05792a1c43e..95170aeb7caa 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go @@ -17,144 +17,202 @@ limitations under the License. package node import ( - "encoding/binary" "errors" "fmt" - "math/big" "net" - "sync" + + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" +) + +// TODO: figure out the good setting for those constants. +const ( + // controls how many NodeSpec updates NC can process concurrently. + cidrUpdateWorkers = 10 + cidrUpdateQueueSize = 5000 + // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update. + podCIDRUpdateRetry = 5 ) var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range") +type nodeAndCIDR struct { + cidr *net.IPNet + nodeName string +} + // CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes. type CIDRAllocator interface { - AllocateNext() (*net.IPNet, error) - Occupy(*net.IPNet) error - Release(*net.IPNet) error + AllocateOrOccupyCIDR(node *api.Node) error + ReleaseCIDR(node *api.Node) error } type rangeAllocator struct { - clusterCIDR *net.IPNet - clusterIP net.IP - clusterMaskSize int - subNetMaskSize int - maxCIDRs int - used big.Int - lock sync.Mutex - nextCandidate int + client clientset.Interface + cidrs *cidrSet + clusterCIDR *net.IPNet + maxCIDRs int + // Channel that is used to pass updating Nodes with assigned CIDRs to the background + // This increases a throughput of CIDR assignment by not blocking on long operations. + nodeCIDRUpdateChannel chan nodeAndCIDR + recorder record.EventRecorder } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. -func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator { - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() +// Caller must always pass in a list of existing nodes so the new allocator +// can initialize its CIDR map. NodeList is only nil in testing. +func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *api.NodeList) (CIDRAllocator, error) { + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cidrAllocator"}) + eventBroadcaster.StartLogging(glog.Infof) ra := &rangeAllocator{ - clusterCIDR: clusterCIDR, - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: subNetMaskSize, - maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize), - nextCandidate: 0, - } - return ra -} + client: client, + cidrs: newCIDRSet(clusterCIDR, subNetMaskSize), + clusterCIDR: clusterCIDR, + nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), + recorder: recorder, + } -func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) { - r.lock.Lock() - defer r.lock.Unlock() + if serviceCIDR != nil { + ra.filterOutServiceRange(serviceCIDR) + } else { + glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.") + } - nextUnused := -1 - for i := 0; i < r.maxCIDRs; i++ { - candidate := (i + r.nextCandidate) % r.maxCIDRs - if r.used.Bit(candidate) == 0 { - nextUnused = candidate - break + if nodeList != nil { + for _, node := range nodeList.Items { + if node.Spec.PodCIDR == "" { + glog.Infof("Node %v has no CIDR, ignoring", node.Name) + continue + } else { + glog.Infof("Node %v has CIDR %s, occupying it in CIDR map", node.Name, node.Spec.PodCIDR) + } + if err := ra.occupyCIDR(&node); err != nil { + // This will happen if: + // 1. We find garbage in the podCIDR field. Retrying is useless. + // 2. CIDR out of range: This means a node CIDR has changed. + // This error will keep crashing controller-manager. + return nil, err + } } } - if nextUnused == -1 { - return nil, errCIDRRangeNoCIDRsRemaining + for i := 0; i < cidrUpdateWorkers; i++ { + go func(stopChan <-chan struct{}) { + for { + select { + case workItem, ok := <-ra.nodeCIDRUpdateChannel: + if !ok { + glog.Warning("NodeCIDRUpdateChannel read returned false.") + return + } + ra.updateCIDRAllocation(workItem) + case <-stopChan: + return + } + } + }(wait.NeverStop) } - r.nextCandidate = (nextUnused + 1) % r.maxCIDRs - r.used.SetBit(&r.used, nextUnused, 1) - - j := uint32(nextUnused) << uint32(32-r.subNetMaskSize) - ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j - ip := make([]byte, 4) - binary.BigEndian.PutUint32(ip, ipInt) - - return &net.IPNet{ - IP: ip, - Mask: net.CIDRMask(r.subNetMaskSize, 32), - }, nil + return ra, nil } -func (r *rangeAllocator) Release(cidr *net.IPNet) error { - used, err := r.getIndexForCIDR(cidr) +func (r *rangeAllocator) occupyCIDR(node *api.Node) error { + if node.Spec.PodCIDR == "" { + return nil + } + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) if err != nil { - return err + return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) } + if err := r.cidrs.occupy(podCIDR); err != nil { + return fmt.Errorf("failed to mark cidr as occupied: %v", err) + } + return nil +} - r.lock.Lock() - defer r.lock.Unlock() - r.used.SetBit(&r.used, used, 0) +// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR +// if it doesn't currently have one or mark the CIDR as used if the node already have one. +func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error { + if node.Spec.PodCIDR != "" { + return r.occupyCIDR(node) + } + podCIDR, err := r.cidrs.allocateNext() + if err != nil { + recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + return fmt.Errorf("failed to allocate cidr: %v", err) + } + glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) + r.nodeCIDRUpdateChannel <- nodeAndCIDR{ + nodeName: node.Name, + cidr: podCIDR, + } return nil } -func (r *rangeAllocator) MaxCIDRs() int { - return r.maxCIDRs +// ReleaseCIDR releases the CIDR of the removed node +func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error { + if node.Spec.PodCIDR == "" { + return nil + } + _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) + if err != nil { + return fmt.Errorf("Failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) + } + + glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR) + if err = r.cidrs.release(podCIDR); err != nil { + return fmt.Errorf("Failed to release cidr: %v", err) + } + return err } -func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) { - begin, end := 0, r.maxCIDRs - cidrMask := cidr.Mask - maskSize, _ := cidrMask.Size() +// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be +// assignable. +func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { + // Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either + // clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR) + // or vice versa (which means that serviceCIDR contains clusterCIDR). + if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) { + return + } - if !r.clusterCIDR.Contains(cidr.IP.Mask(r.clusterCIDR.Mask)) && !cidr.Contains(r.clusterCIDR.IP.Mask(cidr.Mask)) { - return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, r.clusterCIDR) + if err := r.cidrs.occupy(serviceCIDR); err != nil { + glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err) } +} - if r.clusterMaskSize < maskSize { - subNetMask := net.CIDRMask(r.subNetMaskSize, 32) - begin, err = r.getIndexForCIDR(&net.IPNet{ - IP: cidr.IP.To4().Mask(subNetMask), - Mask: subNetMask, - }) +// Assigns CIDR to Node and sends an update to the API server. +func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { + var err error + var node *api.Node + for rep := 0; rep < podCIDRUpdateRetry; rep++ { + // TODO: change it to using PATCH instead of full Node updates. + node, err = r.client.Core().Nodes().Get(data.nodeName) + glog.Infof("Got Node: %v", node) if err != nil { - return err + glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) + continue } - - ip := make([]byte, 4) - ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask)) - binary.BigEndian.PutUint32(ip, ipInt) - end, err = r.getIndexForCIDR(&net.IPNet{ - IP: net.IP(ip).To4().Mask(subNetMask), - Mask: subNetMask, - }) - if err != nil { - return err + node.Spec.PodCIDR = data.cidr.String() + if _, err := r.client.Core().Nodes().Update(node); err != nil { + glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) + } else { + break } } - - r.lock.Lock() - defer r.lock.Unlock() - for i := begin; i <= end; i++ { - r.used.SetBit(&r.used, i, 1) - } - - return nil -} - -func (r *rangeAllocator) getIndexForCIDR(cidr *net.IPNet) (int, error) { - cidrIndex := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize) - - if cidrIndex >= uint32(r.maxCIDRs) { - return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) + if err != nil { + recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) + if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { + glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) + } } - - return int(cidrIndex), nil + return err } diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator_test.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator_test.go index 37cfdf67bab8..8d43713f9855 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator_test.go @@ -17,334 +17,380 @@ limitations under the License. package node import ( - "github.com/golang/glog" - "math/big" "net" - "reflect" "testing" -) - -func TestRangeAllocatorFullyAllocated(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30") - a := NewCIDRRangeAllocator(clusterCIDR, 30) - p, err := a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p.String() != "127.123.234.0/30" { - t.Fatalf("unexpected allocated cidr: %s", p.String()) - } - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - a.Release(p) - p, err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if p.String() != "127.123.234.0/30" { - t.Fatalf("unexpected allocated cidr: %s", p.String()) - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } -} + "time" -func TestRangeAllocator_RandomishAllocation(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") - a := NewCIDRRangeAllocator(clusterCIDR, 24) + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/util/wait" +) - // allocate all the CIDRs - var err error - cidrs := make([]*net.IPNet, 256) +const ( + nodePollInterval = 100 * time.Millisecond +) - for i := 0; i < 256; i++ { - cidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) +func waitForUpdatedNodeWithTimeout(nodeHandler *FakeNodeHandler, number int, timeout time.Duration) error { + return wait.Poll(nodePollInterval, timeout, func() (bool, error) { + if len(nodeHandler.getUpdatedNodesCopy()) >= number { + return true, nil } - } + return false, nil + }) +} - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - // release them all - for i := 0; i < 256; i++ { - a.Release(cidrs[i]) +func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDR string + allocatedCIDRs []string + }{ + { + description: "When there's no ServiceCIDR return first CIDR in range", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + expectedAllocatedCIDR: "127.123.234.0/30", + }, + { + description: "Correctly filter out ServiceCIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return clusterCIDR + }(), + subNetMaskSize: 30, + // it should return first /30 CIDR after service range + expectedAllocatedCIDR: "127.123.234.64/30", + }, + { + description: "Correctly ignore already allocated CIDRs", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/24") + return clusterCIDR + }(), + serviceCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/26") + return clusterCIDR + }(), + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.64/30", "127.123.234.68/30", "127.123.234.72/30", "127.123.234.80/30"}, + expectedAllocatedCIDR: "127.123.234.76/30", + }, } - // allocate the CIDRs again - rcidrs := make([]*net.IPNet, 256) - for i := 0; i < 256; i++ { - rcidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %d, %v", i, err) + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDR string + allocatedCIDRs []string + }) { + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + if err = rangeAllocator.cidrs.occupy(cidr); err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - - if !reflect.DeepEqual(cidrs, rcidrs) { - t.Fatalf("expected re-allocated cidrs are the same collection") - } -} - -func TestRangeAllocator_AllocationOccupied(t *testing.T) { - _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") - a := NewCIDRRangeAllocator(clusterCIDR, 24) - - // allocate all the CIDRs - var err error - cidrs := make([]*net.IPNet, 256) - - for i := 0; i < 256; i++ { - cidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %v", err) + if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { + t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } - } - - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") - } - // release them all - for i := 0; i < 256; i++ { - a.Release(cidrs[i]) - } - // occupy the last 128 CIDRs - for i := 128; i < 256; i++ { - a.Occupy(cidrs[i]) - } - - // allocate the first 128 CIDRs again - rcidrs := make([]*net.IPNet, 128) - for i := 0; i < 128; i++ { - rcidrs[i], err = a.AllocateNext() - if err != nil { - t.Fatalf("unexpected error: %d, %v", i, err) + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) + } + found := false + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDR { + found = true + break + } + } + if !found { + t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", + tc.description, tc.expectedAllocatedCIDR, seenCIDRs) } - } - _, err = a.AllocateNext() - if err == nil { - t.Fatalf("expected error because of fully-allocated range") } - // check Occupy() work properly - for i := 128; i < 256; i++ { - rcidrs = append(rcidrs, cidrs[i]) - } - if !reflect.DeepEqual(cidrs, rcidrs) { - t.Fatalf("expected re-allocated cidrs are the same collection") + for _, tc := range testCases { + testFunc(tc) } } -func TestGetBitforCIDR(t *testing.T) { - cases := []struct { - clusterCIDRStr string - subNetMaskSize int - subNetCIDRStr string - expectedBit int - expectErr bool +func TestAllocateOrOccupyCIDRFailure(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + allocatedCIDRs []string }{ { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/16", - expectedBit: 0, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.123.0.0/16", - expectedBit: 123, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.168.0.0/16", - expectedBit: 168, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.224.0.0/16", - expectedBit: 224, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "192.168.12.0/24", - expectedBit: 12, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "192.168.151.0/24", - expectedBit: 151, - expectErr: false, - }, - { - clusterCIDRStr: "192.168.0.0/16", - subNetMaskSize: 24, - subNetCIDRStr: "127.168.224.0/24", - expectErr: true, + description: "When there's no ServiceCIDR return first CIDR in range", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, }, } - for _, tc := range cases { - _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - ra := &rangeAllocator{ - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: tc.subNetMaskSize, - maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize), + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + allocatedCIDRs []string + }) { + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + err = rangeAllocator.cidrs.occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } - - _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err == nil { + t.Errorf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err) } - - got, err := ra.getIndexForCIDR(subnetCIDR) - if err == nil && tc.expectErr { - glog.Errorf("expected error but got null") - continue + // We don't expect any updates, so just sleep for some time + time.Sleep(time.Second) + if len(tc.fakeNodeHandler.getUpdatedNodesCopy()) != 0 { + t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.getUpdatedNodesCopy()) } - - if err != nil && !tc.expectErr { - glog.Errorf("unexpected error: %v", err) - continue + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + if updatedNode.Spec.PodCIDR != "" { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + } } - - if got != tc.expectedBit { - glog.Errorf("expected %v, but got %v", tc.expectedBit, got) + if len(seenCIDRs) != 0 { + t.Errorf("%v: Seen assigned CIDRs when not expected: %v", + tc.description, seenCIDRs) } } + for _, tc := range testCases { + testFunc(tc) + } } -func TestOccupy(t *testing.T) { - cases := []struct { - clusterCIDRStr string - subNetMaskSize int - subNetCIDRStr string - expectedUsedBegin int - expectedUsedEnd int - expectErr bool +func TestReleaseCIDRSuccess(t *testing.T) { + testCases := []struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDRFirstRound string + expectedAllocatedCIDRSecondRound string + allocatedCIDRs []string + cidrsToRelease []string }{ { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/8", - expectedUsedBegin: 0, - expectedUsedEnd: 256, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/2", - expectedUsedBegin: 0, - expectedUsedEnd: 256, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/16", - expectedUsedBegin: 0, - expectedUsedEnd: 0, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/8", - subNetMaskSize: 32, - subNetCIDRStr: "127.0.0.0/16", - expectedUsedBegin: 0, - expectedUsedEnd: 65535, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 16, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 256, - expectedUsedEnd: 257, - expectErr: false, + description: "Correctly release preallocated CIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + allocatedCIDRs: []string{"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30", "127.123.234.12/30"}, + expectedAllocatedCIDRFirstRound: "", + cidrsToRelease: []string{"127.123.234.4/30"}, + expectedAllocatedCIDRSecondRound: "127.123.234.4/30", }, { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 15, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 128, - expectedUsedEnd: 128, - expectErr: false, - }, - { - clusterCIDRStr: "127.0.0.0/7", - subNetMaskSize: 18, - subNetCIDRStr: "127.0.0.0/15", - expectedUsedBegin: 1024, - expectedUsedEnd: 1031, - expectErr: false, + description: "Correctly recycle CIDR", + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + }, + }, + Clientset: fake.NewSimpleClientset(), + }, + clusterCIDR: func() *net.IPNet { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28") + return clusterCIDR + }(), + serviceCIDR: nil, + subNetMaskSize: 30, + expectedAllocatedCIDRFirstRound: "127.123.234.0/30", + cidrsToRelease: []string{"127.123.234.0/30"}, + expectedAllocatedCIDRSecondRound: "127.123.234.0/30", }, } - for _, tc := range cases { - _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) + testFunc := func(tc struct { + description string + fakeNodeHandler *FakeNodeHandler + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + subNetMaskSize int + expectedAllocatedCIDRFirstRound string + expectedAllocatedCIDRSecondRound string + allocatedCIDRs []string + cidrsToRelease []string + }) { + allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil) + // this is a bit of white box testing + for _, allocated := range tc.allocatedCIDRs { + _, cidr, err := net.ParseCIDR(allocated) + if err != nil { + t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err) + } + rangeAllocator, ok := allocator.(*rangeAllocator) + if !ok { + t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) + return + } + err = rangeAllocator.cidrs.occupy(cidr) + if err != nil { + t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) + } } - clusterMask := clusterCIDR.Mask - clusterMaskSize, _ := clusterMask.Size() - - ra := &rangeAllocator{ - clusterCIDR: clusterCIDR, - clusterIP: clusterCIDR.IP.To4(), - clusterMaskSize: clusterMaskSize, - subNetMaskSize: tc.subNetMaskSize, - maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize), + err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]) + if tc.expectedAllocatedCIDRFirstRound != "" { + if err != nil { + t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) + } + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) + } + } else { + if err == nil { + t.Fatalf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err) + } + // We don't expect any updates here + time.Sleep(time.Second) + if len(tc.fakeNodeHandler.getUpdatedNodesCopy()) != 0 { + t.Fatalf("%v: unexpected update of nodes: %v", tc.description, tc.fakeNodeHandler.getUpdatedNodesCopy()) + } } - _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) - if err != nil { - t.Fatalf("unexpected error: %v", err) + for _, cidrToRelease := range tc.cidrsToRelease { + nodeToRelease := api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "node0", + }, + } + nodeToRelease.Spec.PodCIDR = cidrToRelease + err = allocator.ReleaseCIDR(&nodeToRelease) + if err != nil { + t.Fatalf("%v: unexpected error in ReleaseCIDR: %v", tc.description, err) + } } - err = ra.Occupy(subnetCIDR) - if err == nil && tc.expectErr { - t.Errorf("expected error but got none") - continue + if err = allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil { + t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } - if err != nil && !tc.expectErr { - t.Errorf("unexpected error: %v", err) - continue + if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { + t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err) } - expectedUsed := big.Int{} - for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ { - expectedUsed.SetBit(&expectedUsed, i, 1) + found := false + seenCIDRs := []string{} + for _, updatedNode := range tc.fakeNodeHandler.getUpdatedNodesCopy() { + seenCIDRs = append(seenCIDRs, updatedNode.Spec.PodCIDR) + if updatedNode.Spec.PodCIDR == tc.expectedAllocatedCIDRSecondRound { + found = true + break + } } - if expectedUsed.Cmp(&ra.used) != 0 { - t.Errorf("error") + if !found { + t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", + tc.description, tc.expectedAllocatedCIDRSecondRound, seenCIDRs) } } + for _, tc := range testCases { + testFunc(tc) + } } diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go new file mode 100644 index 000000000000..5f013d92f02c --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go @@ -0,0 +1,142 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "encoding/binary" + "fmt" + "math/big" + "net" + "sync" +) + +type cidrSet struct { + sync.Mutex + clusterCIDR *net.IPNet + clusterIP net.IP + clusterMaskSize int + maxCIDRs int + nextCandidate int + used big.Int + subNetMaskSize int +} + +func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet { + clusterMask := clusterCIDR.Mask + clusterMaskSize, _ := clusterMask.Size() + maxCIDRs := 1 << uint32(subNetMaskSize-clusterMaskSize) + return &cidrSet{ + clusterCIDR: clusterCIDR, + clusterIP: clusterCIDR.IP.To4(), + clusterMaskSize: clusterMaskSize, + maxCIDRs: maxCIDRs, + subNetMaskSize: subNetMaskSize, + } +} + +func (s *cidrSet) allocateNext() (*net.IPNet, error) { + s.Lock() + defer s.Unlock() + + nextUnused := -1 + for i := 0; i < s.maxCIDRs; i++ { + candidate := (i + s.nextCandidate) % s.maxCIDRs + if s.used.Bit(candidate) == 0 { + nextUnused = candidate + break + } + } + if nextUnused == -1 { + return nil, errCIDRRangeNoCIDRsRemaining + } + s.nextCandidate = (nextUnused + 1) % s.maxCIDRs + + s.used.SetBit(&s.used, nextUnused, 1) + + j := uint32(nextUnused) << uint32(32-s.subNetMaskSize) + ipInt := (binary.BigEndian.Uint32(s.clusterIP)) | j + ip := make([]byte, 4) + binary.BigEndian.PutUint32(ip, ipInt) + + return &net.IPNet{ + IP: ip, + Mask: net.CIDRMask(s.subNetMaskSize, 32), + }, nil +} + +func (s *cidrSet) release(cidr *net.IPNet) error { + used, err := s.getIndexForCIDR(cidr) + if err != nil { + return err + } + + s.Lock() + defer s.Unlock() + s.used.SetBit(&s.used, used, 0) + + return nil +} + +func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { + begin, end := 0, s.maxCIDRs + cidrMask := cidr.Mask + maskSize, _ := cidrMask.Size() + + if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) { + return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) + } + + if s.clusterMaskSize < maskSize { + subNetMask := net.CIDRMask(s.subNetMaskSize, 32) + begin, err = s.getIndexForCIDR(&net.IPNet{ + IP: cidr.IP.To4().Mask(subNetMask), + Mask: subNetMask, + }) + if err != nil { + return err + } + + ip := make([]byte, 4) + ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask)) + binary.BigEndian.PutUint32(ip, ipInt) + end, err = s.getIndexForCIDR(&net.IPNet{ + IP: net.IP(ip).To4().Mask(subNetMask), + Mask: subNetMask, + }) + if err != nil { + return err + } + } + + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + s.used.SetBit(&s.used, i, 1) + } + + return nil +} + +func (s *cidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) { + cidrIndex := (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize) + + if cidrIndex >= uint32(s.maxCIDRs) { + return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr) + } + + return int(cidrIndex), nil +} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set_test.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set_test.go new file mode 100644 index 000000000000..5738da0a96c8 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set_test.go @@ -0,0 +1,335 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "github.com/golang/glog" + "math/big" + "net" + "reflect" + "testing" +) + +func TestCIDRSetFullyAllocated(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30") + a := newCIDRSet(clusterCIDR, 30) + + p, err := a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + a.release(p) + p, err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.String() != "127.123.234.0/30" { + t.Fatalf("unexpected allocated cidr: %s", p.String()) + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } +} + +func TestCIDRSet_RandomishAllocation(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := newCIDRSet(clusterCIDR, 24) + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.release(cidrs[i]) + } + + // allocate the CIDRs again + rcidrs := make([]*net.IPNet, 256) + for i := 0; i < 256; i++ { + rcidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestCIDRSet_AllocationOccupied(t *testing.T) { + _, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16") + a := newCIDRSet(clusterCIDR, 24) + + // allocate all the CIDRs + var err error + cidrs := make([]*net.IPNet, 256) + + for i := 0; i < 256; i++ { + cidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + // release them all + for i := 0; i < 256; i++ { + a.release(cidrs[i]) + } + // occupy the last 128 CIDRs + for i := 128; i < 256; i++ { + a.occupy(cidrs[i]) + } + + // allocate the first 128 CIDRs again + rcidrs := make([]*net.IPNet, 128) + for i := 0; i < 128; i++ { + rcidrs[i], err = a.allocateNext() + if err != nil { + t.Fatalf("unexpected error: %d, %v", i, err) + } + } + _, err = a.allocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + // check Occupy() work properly + for i := 128; i < 256; i++ { + rcidrs = append(rcidrs, cidrs[i]) + } + if !reflect.DeepEqual(cidrs, rcidrs) { + t.Fatalf("expected re-allocated cidrs are the same collection") + } +} + +func TestGetBitforCIDR(t *testing.T) { + cases := []struct { + clusterCIDRStr string + subNetMaskSize int + subNetCIDRStr string + expectedBit int + expectErr bool + }{ + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/16", + expectedBit: 0, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.123.0.0/16", + expectedBit: 123, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.168.0.0/16", + expectedBit: 168, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.224.0.0/16", + expectedBit: 224, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.12.0/24", + expectedBit: 12, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "192.168.151.0/24", + expectedBit: 151, + expectErr: false, + }, + { + clusterCIDRStr: "192.168.0.0/16", + subNetMaskSize: 24, + subNetCIDRStr: "127.168.224.0/24", + expectErr: true, + }, + } + + for _, tc := range cases { + _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + + _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + got, err := cs.getIndexForCIDR(subnetCIDR) + if err == nil && tc.expectErr { + glog.Errorf("expected error but got null") + continue + } + + if err != nil && !tc.expectErr { + glog.Errorf("unexpected error: %v", err) + continue + } + + if got != tc.expectedBit { + glog.Errorf("expected %v, but got %v", tc.expectedBit, got) + } + } +} + +func TestOccupy(t *testing.T) { + cases := []struct { + clusterCIDRStr string + subNetMaskSize int + subNetCIDRStr string + expectedUsedBegin int + expectedUsedEnd int + expectErr bool + }{ + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/8", + expectedUsedBegin: 0, + expectedUsedEnd: 256, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/2", + expectedUsedBegin: 0, + expectedUsedEnd: 256, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/16", + expectedUsedBegin: 0, + expectedUsedEnd: 0, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/8", + subNetMaskSize: 32, + subNetCIDRStr: "127.0.0.0/16", + expectedUsedBegin: 0, + expectedUsedEnd: 65535, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 16, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 256, + expectedUsedEnd: 257, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 15, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 128, + expectedUsedEnd: 128, + expectErr: false, + }, + { + clusterCIDRStr: "127.0.0.0/7", + subNetMaskSize: 18, + subNetCIDRStr: "127.0.0.0/15", + expectedUsedBegin: 1024, + expectedUsedEnd: 1031, + expectErr: false, + }, + } + + for _, tc := range cases { + _, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize) + + _, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + err = cs.occupy(subnetCIDR) + if err == nil && tc.expectErr { + t.Errorf("expected error but got none") + continue + } + if err != nil && !tc.expectErr { + t.Errorf("unexpected error: %v", err) + continue + } + + expectedUsed := big.Int{} + for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ { + expectedUsed.SetBit(&expectedUsed, i, 1) + } + if expectedUsed.Cmp(&cs.used) != 0 { + t.Errorf("error") + } + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go index 4549202298ca..1bb3402905b4 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go @@ -57,13 +57,10 @@ var ( const ( // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. nodeStatusUpdateRetry = 5 - // podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update. - podCIDRUpdateRetry = 5 // controls how often NodeController will try to evict Pods from non-responsive Nodes. nodeEvictionPeriod = 100 * time.Millisecond - // controls how many NodeSpec updates NC can process in any moment. - cidrUpdateWorkers = 10 - cidrUpdateQueueSize = 5000 + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute ) type nodeStatusData struct { @@ -72,11 +69,6 @@ type nodeStatusData struct { status api.NodeStatus } -type nodeAndCIDR struct { - nodeName string - cidr *net.IPNet -} - type NodeController struct { allocateNodeCIDRs bool cloud cloudprovider.Interface @@ -142,11 +134,12 @@ type NodeController struct { // It is enabled when all Nodes observed by the NodeController are NotReady and disabled // when NC sees any healthy Node. This is a temporary fix for v1.3. networkSegmentationMode bool - - nodeCIDRUpdateChannel chan nodeAndCIDR } // NewNodeController returns a new node controller to sync instances from cloudprovider. +// This method returns an error if it is unable to initialize the CIDR bitmap with +// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes +// currently, this should be handled as a fatal error. func NewNodeController( cloud cloudprovider.Interface, kubeClient clientset.Interface, @@ -159,7 +152,7 @@ func NewNodeController( clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, nodeCIDRMaskSize int, - allocateNodeCIDRs bool) *NodeController { + allocateNodeCIDRs bool) (*NodeController, error) { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) eventBroadcaster.StartLogging(glog.Infof) @@ -206,7 +199,6 @@ func NewNodeController( allocateNodeCIDRs: allocateNodeCIDRs, forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, - nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), } nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( @@ -233,8 +225,20 @@ func NewNodeController( nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{} if nc.allocateNodeCIDRs { nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{ - AddFunc: nc.allocateOrOccupyCIDR, - DeleteFunc: nc.recycleCIDR, + AddFunc: func(obj interface{}) { + node := obj.(*api.Node) + err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + if err != nil { + glog.Errorf("Error allocating CIDR: %v", err) + } + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*api.Node) + err := nc.cidrAllocator.ReleaseCIDR(node) + if err != nil { + glog.Errorf("Error releasing CIDR: %v", err) + } + }, } } @@ -267,22 +271,34 @@ func NewNodeController( ) if allocateNodeCIDRs { - nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, nodeCIDRMaskSize) + var nodeList *api.NodeList + var err error + // We must poll because apiserver might not be up. This error causes + // controller manager to restart. + if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) { + nodeList, err = kubeClient.Core().Nodes().List(api.ListOptions{ + FieldSelector: fields.Everything(), + LabelSelector: labels.Everything(), + }) + if err != nil { + glog.Errorf("Failed to list all nodes: %v", err) + return false, nil + } + return true, nil + }); pollErr != nil { + return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod) + } + nc.cidrAllocator, err = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList) + if err != nil { + return nil, err + } } - return nc + return nc, nil } // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { - if nc.allocateNodeCIDRs { - if nc.serviceCIDR != nil { - nc.filterOutServiceRange() - } else { - glog.Info("No Service CIDR provided. Skipping filtering out service addresses.") - } - } - go nc.nodeController.Run(wait.NeverStop) go nc.podController.Run(wait.NeverStop) go nc.daemonSetController.Run(wait.NeverStop) @@ -351,107 +367,6 @@ func (nc *NodeController) Run(period time.Duration) { }, nodeEvictionPeriod, wait.NeverStop) go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop) - - for i := 0; i < cidrUpdateWorkers; i++ { - go func(stopChan <-chan struct{}) { - for { - select { - case workItem, ok := <-nc.nodeCIDRUpdateChannel: - if !ok { - glog.Warning("NodeCIDRUpdateChannel read returned false.") - return - } - nc.updateCIDRAllocation(workItem) - case <-stopChan: - glog.V(0).Info("StopChannel is closed.") - return - } - } - }(wait.NeverStop) - } -} - -func (nc *NodeController) filterOutServiceRange() { - if !nc.clusterCIDR.Contains(nc.serviceCIDR.IP.Mask(nc.clusterCIDR.Mask)) && !nc.serviceCIDR.Contains(nc.clusterCIDR.IP.Mask(nc.serviceCIDR.Mask)) { - return - } - - if err := nc.cidrAllocator.Occupy(nc.serviceCIDR); err != nil { - glog.Errorf("Error filtering out service cidr: %v", err) - } -} - -func (nc *NodeController) updateCIDRAllocation(data nodeAndCIDR) { - var err error - var node *api.Node - for rep := 0; rep < podCIDRUpdateRetry; rep++ { - node, err = nc.kubeClient.Core().Nodes().Get(data.nodeName) - if err != nil { - glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) - continue - } - node.Spec.PodCIDR = data.cidr.String() - if _, err := nc.kubeClient.Core().Nodes().Update(node); err != nil { - glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err) - } else { - break - } - } - if err != nil { - nc.recordNodeStatusChange(node, "CIDRAssignmentFailed") - glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) - err := nc.cidrAllocator.Release(data.cidr) - glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, err) - } -} - -// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR -// if it doesn't currently have one or mark the CIDR as used if the node already have one. -func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) { - node := obj.(*api.Node) - if node.Spec.PodCIDR != "" { - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - return - } - if err := nc.cidrAllocator.Occupy(podCIDR); err != nil { - glog.Errorf("failed to mark cidr as occupied :%v", err) - return - } - return - } - podCIDR, err := nc.cidrAllocator.AllocateNext() - if err != nil { - nc.recordNodeStatusChange(node, "CIDRNotAvailable") - return - } - - glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR) - nc.nodeCIDRUpdateChannel <- nodeAndCIDR{ - nodeName: node.Name, - cidr: podCIDR, - } -} - -// recycleCIDR recycles the CIDR of a removed node -func (nc *NodeController) recycleCIDR(obj interface{}) { - node := obj.(*api.Node) - - if node.Spec.PodCIDR == "" { - return - } - - _, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR) - if err != nil { - glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR) - return - } - - glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR) - if err := nc.cidrAllocator.Release(podCIDR); err != nil { - glog.Errorf("failed to release cidr: %v", err) - } } var gracefulDeletionVersion = version.MustParse("v1.1.0") @@ -626,7 +541,7 @@ func (nc *NodeController) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue { - nc.recordNodeStatusChange(node, "NodeNotReady") + recordNodeStatusChange(nc.recorder, node, "NodeNotReady") if err = nc.markAllPodsNotReady(node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } @@ -722,7 +637,7 @@ func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event str nc.recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) { +func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) { ref := &api.ObjectReference{ Kind: "Node", Name: node.Name, @@ -732,7 +647,7 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - nc.recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) + recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) } // For a given node checks its conditions and tries to update it. Returns grace period to which given node diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller_test.go index 5c60e028df93..a87beeb0373b 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller_test.go @@ -17,24 +17,19 @@ limitations under the License. package node import ( - "errors" - "sync" "testing" "time" "k8s.io/kubernetes/pkg/api" - apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -43,129 +38,6 @@ const ( testNodeMonitorPeriod = 5 * time.Second ) -// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It -// allows test cases to have fine-grained control over mock behaviors. We also need -// PodsInterface and PodInterface to test list & delet pods, which is implemented in -// the embedded client.Fake field. -type FakeNodeHandler struct { - *fake.Clientset - - // Input: Hooks determine if request is valid or not - CreateHook func(*FakeNodeHandler, *api.Node) bool - Existing []*api.Node - - // Output - CreatedNodes []*api.Node - DeletedNodes []*api.Node - UpdatedNodes []*api.Node - UpdatedNodeStatuses []*api.Node - RequestCount int - - // Synchronization - createLock sync.Mutex - deleteWaitChan chan struct{} -} - -type FakeLegacyHandler struct { - unversionedcore.CoreInterface - n *FakeNodeHandler -} - -func (c *FakeNodeHandler) Core() unversionedcore.CoreInterface { - return &FakeLegacyHandler{c.Clientset.Core(), c} -} - -func (m *FakeLegacyHandler) Nodes() unversionedcore.NodeInterface { - return m.n -} - -func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { - m.createLock.Lock() - defer func() { - m.RequestCount++ - m.createLock.Unlock() - }() - for _, n := range m.Existing { - if n.Name == node.Name { - return nil, apierrors.NewAlreadyExists(api.Resource("nodes"), node.Name) - } - } - if m.CreateHook == nil || m.CreateHook(m, node) { - nodeCopy := *node - m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) - return node, nil - } else { - return nil, errors.New("Create error.") - } -} - -func (m *FakeNodeHandler) Get(name string) (*api.Node, error) { - return nil, nil -} - -func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) { - defer func() { m.RequestCount++ }() - var nodes []*api.Node - for i := 0; i < len(m.UpdatedNodes); i++ { - if !contains(m.UpdatedNodes[i], m.DeletedNodes) { - nodes = append(nodes, m.UpdatedNodes[i]) - } - } - for i := 0; i < len(m.Existing); i++ { - if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) { - nodes = append(nodes, m.Existing[i]) - } - } - for i := 0; i < len(m.CreatedNodes); i++ { - if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) { - nodes = append(nodes, m.CreatedNodes[i]) - } - } - nodeList := &api.NodeList{} - for _, node := range nodes { - nodeList.Items = append(nodeList.Items, *node) - } - return nodeList, nil -} - -func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error { - defer func() { - if m.deleteWaitChan != nil { - m.deleteWaitChan <- struct{}{} - } - }() - m.DeletedNodes = append(m.DeletedNodes, newNode(id)) - m.RequestCount++ - return nil -} - -func (m *FakeNodeHandler) DeleteCollection(opt *api.DeleteOptions, listOpts api.ListOptions) error { - return nil -} - -func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { - nodeCopy := *node - m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy) - m.RequestCount++ - return node, nil -} - -func (m *FakeNodeHandler) UpdateStatus(node *api.Node) (*api.Node, error) { - nodeCopy := *node - m.UpdatedNodeStatuses = append(m.UpdatedNodeStatuses, &nodeCopy) - m.RequestCount++ - return node, nil -} - -func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node, error) { - m.RequestCount++ - return &api.Node{}, nil -} - -func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { - return nil, nil -} - func TestMonitorNodeStatusEvictPods(t *testing.T) { fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) evictionTimeout := 10 * time.Minute @@ -663,7 +535,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } @@ -733,7 +605,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), deleteWaitChan: make(chan struct{}), } - nodeController := NewNodeController(nil, fnh, 10*time.Minute, + nodeController, _ := NewNodeController(nil, fnh, 10*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) @@ -967,7 +839,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1117,7 +989,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), + nodeController, _ := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1199,7 +1071,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), + nodeController, _ := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1303,7 +1175,7 @@ func TestCheckPod(t *testing.T) { }, } - nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) nc.nodeStore.Store.Add(&api.Node{ ObjectMeta: api.ObjectMeta{ @@ -1380,7 +1252,7 @@ func TestCleanupOrphanedPods(t *testing.T) { newPod("b", "bar"), newPod("c", "gone"), } - nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) + nc, _ := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false) nc.nodeStore.Store.Add(newNode("foo")) nc.nodeStore.Store.Add(newNode("bar")) @@ -1405,32 +1277,3 @@ func TestCleanupOrphanedPods(t *testing.T) { t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName) } } - -func newNode(name string) *api.Node { - return &api.Node{ - ObjectMeta: api.ObjectMeta{Name: name}, - Spec: api.NodeSpec{ - ExternalID: name, - }, - Status: api.NodeStatus{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - }, - } -} - -func newPod(name, host string) *api.Pod { - return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{NodeName: host}, - Status: api.PodStatus{Conditions: []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}}}} -} - -func contains(node *api.Node, nodes []*api.Node) bool { - for i := 0; i < len(nodes); i++ { - if node.Name == nodes[i].Name { - return true - } - } - return false -} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/test_utils.go b/vendor/k8s.io/kubernetes/pkg/controller/node/test_utils.go new file mode 100644 index 000000000000..94e50c5b40b9 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/test_utils.go @@ -0,0 +1,237 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "errors" + "sync" + + "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + "k8s.io/kubernetes/pkg/watch" +) + +// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It +// allows test cases to have fine-grained control over mock behaviors. We also need +// PodsInterface and PodInterface to test list & delet pods, which is implemented in +// the embedded client.Fake field. +type FakeNodeHandler struct { + *fake.Clientset + + // Input: Hooks determine if request is valid or not + CreateHook func(*FakeNodeHandler, *api.Node) bool + Existing []*api.Node + + // Output + CreatedNodes []*api.Node + DeletedNodes []*api.Node + UpdatedNodes []*api.Node + UpdatedNodeStatuses []*api.Node + RequestCount int + + // Synchronization + lock sync.Mutex + deleteWaitChan chan struct{} +} + +type FakeLegacyHandler struct { + unversionedcore.CoreInterface + n *FakeNodeHandler +} + +func (c *FakeNodeHandler) getUpdatedNodesCopy() []*api.Node { + c.lock.Lock() + defer c.lock.Unlock() + updatedNodesCopy := make([]*api.Node, len(c.UpdatedNodes), len(c.UpdatedNodes)) + for i, ptr := range c.UpdatedNodes { + updatedNodesCopy[i] = ptr + } + return updatedNodesCopy +} + +func (c *FakeNodeHandler) Core() unversionedcore.CoreInterface { + return &FakeLegacyHandler{c.Clientset.Core(), c} +} + +func (m *FakeLegacyHandler) Nodes() unversionedcore.NodeInterface { + return m.n +} + +func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + for _, n := range m.Existing { + if n.Name == node.Name { + return nil, apierrors.NewAlreadyExists(api.Resource("nodes"), node.Name) + } + } + if m.CreateHook == nil || m.CreateHook(m, node) { + nodeCopy := *node + m.CreatedNodes = append(m.CreatedNodes, &nodeCopy) + return node, nil + } else { + return nil, errors.New("Create error.") + } +} + +func (m *FakeNodeHandler) Get(name string) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + for i := range m.Existing { + if m.Existing[i].Name == name { + nodeCopy := *m.Existing[i] + return &nodeCopy, nil + } + } + return nil, nil +} + +func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + var nodes []*api.Node + for i := 0; i < len(m.UpdatedNodes); i++ { + if !contains(m.UpdatedNodes[i], m.DeletedNodes) { + nodes = append(nodes, m.UpdatedNodes[i]) + } + } + for i := 0; i < len(m.Existing); i++ { + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) { + nodes = append(nodes, m.Existing[i]) + } + } + for i := 0; i < len(m.CreatedNodes); i++ { + if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) { + nodes = append(nodes, m.CreatedNodes[i]) + } + } + nodeList := &api.NodeList{} + for _, node := range nodes { + nodeList.Items = append(nodeList.Items, *node) + } + return nodeList, nil +} + +func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error { + m.lock.Lock() + defer func() { + m.RequestCount++ + if m.deleteWaitChan != nil { + m.deleteWaitChan <- struct{}{} + } + m.lock.Unlock() + }() + m.DeletedNodes = append(m.DeletedNodes, newNode(id)) + return nil +} + +func (m *FakeNodeHandler) DeleteCollection(opt *api.DeleteOptions, listOpts api.ListOptions) error { + return nil +} + +func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + nodeCopy := *node + m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy) + return node, nil +} + +func (m *FakeNodeHandler) UpdateStatus(node *api.Node) (*api.Node, error) { + m.lock.Lock() + defer func() { + m.RequestCount++ + m.lock.Unlock() + }() + nodeCopy := *node + m.UpdatedNodeStatuses = append(m.UpdatedNodeStatuses, &nodeCopy) + return node, nil +} + +func (m *FakeNodeHandler) PatchStatus(nodeName string, data []byte) (*api.Node, error) { + m.RequestCount++ + return &api.Node{}, nil +} + +func (m *FakeNodeHandler) Watch(opts api.ListOptions) (watch.Interface, error) { + return nil, nil +} + +func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte) (*api.Node, error) { + return nil, nil +} + +func newNode(name string) *api.Node { + return &api.Node{ + ObjectMeta: api.ObjectMeta{Name: name}, + Spec: api.NodeSpec{ + ExternalID: name, + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + } +} + +func newPod(name, host string) *api.Pod { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: name, + }, + Spec: api.PodSpec{ + NodeName: host, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + return pod +} + +func contains(node *api.Node, nodes []*api.Node) bool { + for i := 0; i < len(nodes); i++ { + if node.Name == nodes[i].Name { + return true + } + } + return false +} From d91abb80da0c48c79a06abbceb95c5dbeb0095e4 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 10:58:13 -0400 Subject: [PATCH 5/8] UPSTREAM: 29246: Kubelet: Set PruneChildren when removing image --- .../k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go index f5fb370e8913..65641703bb77 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go @@ -862,7 +862,7 @@ func (dm *DockerManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, er // Removes the specified image. func (dm *DockerManager) RemoveImage(image kubecontainer.ImageSpec) error { // TODO(harryz) currently Runtime interface does not provide other remove options. - _, err := dm.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{}) + _, err := dm.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{PruneChildren: true}) return err } From 4060a4fd3826d537173fc63d26669fa9a9525f26 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 11:00:07 -0400 Subject: [PATCH 6/8] UPSTREAM: 29576: Retry assigning CIDRs in case of failure --- .../pkg/controller/node/cidr_allocator.go | 50 +++++++++++++++++-- .../pkg/controller/node/cidr_set.go | 44 +++++++++------- .../pkg/controller/node/nodecontroller.go | 28 +++++++++++ 3 files changed, 99 insertions(+), 23 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go index 95170aeb7caa..9a1b900afa71 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_allocator.go @@ -20,10 +20,13 @@ import ( "errors" "fmt" "net" + "sync" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" @@ -60,6 +63,9 @@ type rangeAllocator struct { // This increases a throughput of CIDR assignment by not blocking on long operations. nodeCIDRUpdateChannel chan nodeAndCIDR recorder record.EventRecorder + // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation + sync.Mutex + nodesInProcessing sets.String } // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node @@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s clusterCIDR: clusterCIDR, nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), recorder: recorder, + nodesInProcessing: sets.NewString(), } if serviceCIDR != nil { @@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s return ra, nil } +func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool { + r.Lock() + defer r.Unlock() + if r.nodesInProcessing.Has(nodeName) { + return false + } + r.nodesInProcessing.Insert(nodeName) + return true +} + +func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) { + r.Lock() + defer r.Unlock() + r.nodesInProcessing.Delete(nodeName) +} + func (r *rangeAllocator) occupyCIDR(node *api.Node) error { + defer r.removeNodeFromProcessing(node.Name) if node.Spec.PodCIDR == "" { return nil } @@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error { // AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR // if it doesn't currently have one or mark the CIDR as used if the node already have one. +// WARNING: If you're adding any return calls or defer any more work from this function +// you have to handle correctly nodesInProcessing. func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error { + if node == nil { + return nil + } + if !r.insertNodeToProcessing(node.Name) { + glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) + return nil + } if node.Spec.PodCIDR != "" { return r.occupyCIDR(node) } podCIDR, err := r.cidrs.allocateNext() if err != nil { + r.removeNodeFromProcessing(node.Name) recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } @@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error { return err } -// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be -// assignable. +// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, +// so that they won't be assignable. func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { // Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either // clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR) @@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) { func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { var err error var node *api.Node + defer r.removeNodeFromProcessing(data.nodeName) for rep := 0; rep < podCIDRUpdateRetry; rep++ { // TODO: change it to using PATCH instead of full Node updates. node, err = r.client.Core().Nodes().Get(data.nodeName) @@ -209,9 +244,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } if err != nil { recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") - glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) - if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { - glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) + // We accept the fact that we may leek CIDRs here. This is safer than releasing + // them in case when we don't know if request went through. + // NodeController restart will return all falsely allocated CIDRs to the pool. + if !apierrors.IsServerTimeout(err) { + glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err) + if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil { + glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr) + } } } return err diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go index 5f013d92f02c..02f2a63eb2e9 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/cidr_set.go @@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) { }, nil } -func (s *cidrSet) release(cidr *net.IPNet) error { - used, err := s.getIndexForCIDR(cidr) - if err != nil { - return err - } - - s.Lock() - defer s.Unlock() - s.used.SetBit(&s.used, used, 0) - - return nil -} - -func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { - begin, end := 0, s.maxCIDRs +func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) { + begin, end = 0, s.maxCIDRs cidrMask := cidr.Mask maskSize, _ := cidrMask.Size() if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) { - return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) + return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR) } if s.clusterMaskSize < maskSize { @@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { Mask: subNetMask, }) if err != nil { - return err + return -1, -1, err } ip := make([]byte, 4) @@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { Mask: subNetMask, }) if err != nil { - return err + return -1, -1, err } } + return begin, end, nil +} + +func (s *cidrSet) release(cidr *net.IPNet) error { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } + s.Lock() + defer s.Unlock() + for i := begin; i <= end; i++ { + s.used.SetBit(&s.used, i, 0) + } + return nil +} + +func (s *cidrSet) occupy(cidr *net.IPNet) (err error) { + begin, end, err := s.getBeginingAndEndIndices(cidr) + if err != nil { + return err + } s.Lock() defer s.Unlock() diff --git a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go index 1bb3402905b4..7f0c24cab70c 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/node/nodecontroller.go @@ -232,6 +232,34 @@ func NewNodeController( glog.Errorf("Error allocating CIDR: %v", err) } }, + UpdateFunc: func(_, obj interface{}) { + node := obj.(*api.Node) + // If the PodCIDR is not empty we either: + // - already processed a Node that already had a CIDR after NC restarted + // (cidr is marked as used), + // - already processed a Node successfully and allocated a CIDR for it + // (cidr is marked as used), + // - already processed a Node but we did saw a "timeout" response and + // request eventually got through in this case we haven't released + // the allocated CIDR (cidr is still marked as used). + // There's a possible error here: + // - NC sees a new Node and assigns a CIDR X to it, + // - Update Node call fails with a timeout, + // - Node is updated by some other component, NC sees an update and + // assigns CIDR Y to the Node, + // - Both CIDR X and CIDR Y are marked as used in the local cache, + // even though Node sees only CIDR Y + // The problem here is that in in-memory cache we see CIDR X as marked, + // which prevents it from being assigned to any new node. The cluster + // state is correct. + // Restart of NC fixes the issue. + if node.Spec.PodCIDR == "" { + err := nc.cidrAllocator.AllocateOrOccupyCIDR(node) + if err != nil { + glog.Errorf("Error allocating CIDR: %v", err) + } + } + }, DeleteFunc: func(obj interface{}) { node := obj.(*api.Node) err := nc.cidrAllocator.ReleaseCIDR(node) From ca09928c0ec94a56ee5d91c70ed36f48573f7702 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 11:06:23 -0400 Subject: [PATCH 7/8] UPSTREAM: 29961: Allow PVs to specify supplemental GIDs --- .../pkg/kubelet/container/helpers.go | 4 + .../pkg/kubelet/dockertools/docker_manager.go | 3 +- .../dockertools/docker_manager_test.go | 12 +- .../kubernetes/pkg/kubelet/kubelet_getters.go | 16 + .../kubelet/rkt/fake_rkt_interface_test.go | 4 + .../k8s.io/kubernetes/pkg/kubelet/rkt/rkt.go | 16 +- .../kubernetes/pkg/kubelet/rkt/rkt_test.go | 48 +-- .../kubelet/volumemanager/volume_manager.go | 107 +++---- .../volumemanager/volume_manager_test.go | 276 ++++++++++++++++++ .../kubernetes/pkg/securitycontext/fake.go | 2 +- .../pkg/securitycontext/provider.go | 24 +- .../pkg/securitycontext/provider_test.go | 40 ++- .../kubernetes/pkg/securitycontext/types.go | 6 +- 13 files changed, 444 insertions(+), 114 deletions(-) create mode 100644 vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager_test.go diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/container/helpers.go b/vendor/k8s.io/kubernetes/pkg/kubelet/container/helpers.go index ed0311df2a8b..8aacbea11b35 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/container/helpers.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/container/helpers.go @@ -44,6 +44,10 @@ type RuntimeHelper interface { GetClusterDNS(pod *api.Pod) (dnsServers []string, dnsSearches []string, err error) GetPodDir(podUID types.UID) string GeneratePodHostNameAndDomain(pod *api.Pod) (hostname string, hostDomain string, err error) + // GetExtraSupplementalGroupsForPod returns a list of the extra + // supplemental groups for the Pod. These extra supplemental groups come + // from annotations on persistent volumes that the pod depends on. + GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 } // ShouldContainerBeRestarted checks whether a container needs to be restarted. diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go index 65641703bb77..5eac29ebadc5 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager.go @@ -689,9 +689,10 @@ func (dm *DockerManager) runContainer( glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd) + supplementalGids := dm.runtimeHelper.GetExtraSupplementalGroupsForPod(pod) securityContextProvider := securitycontext.NewSimpleSecurityContextProvider() securityContextProvider.ModifyContainerConfig(pod, container, dockerOpts.Config) - securityContextProvider.ModifyHostConfig(pod, container, dockerOpts.HostConfig) + securityContextProvider.ModifyHostConfig(pod, container, dockerOpts.HostConfig, supplementalGids) createResp, err := dm.client.CreateContainer(dockerOpts) if err != nil { dm.recorder.Eventf(ref, api.EventTypeWarning, kubecontainer.FailedToCreateContainer, "Failed to create docker container with error: %v", err) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager_test.go index 88877694a74c..2088bb9a6001 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/dockertools/docker_manager_test.go @@ -103,6 +103,10 @@ func (f *fakeRuntimeHelper) GetPodDir(kubetypes.UID) string { return "" } +func (f *fakeRuntimeHelper) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { + return nil +} + func createTestDockerManager(fakeHTTPClient *fakeHTTP, fakeDocker *FakeDockerClient) (*DockerManager, *FakeDockerClient) { if fakeHTTPClient == nil { fakeHTTPClient = &fakeHTTP{} @@ -112,7 +116,13 @@ func createTestDockerManager(fakeHTTPClient *fakeHTTP, fakeDocker *FakeDockerCli } fakeRecorder := &record.FakeRecorder{} containerRefManager := kubecontainer.NewRefManager() - networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8") + networkPlugin, _ := network.InitNetworkPlugin( + []network.NetworkPlugin{}, + "", + nettest.NewFakeHost(nil), + componentconfig.HairpinNone, + "10.0.0.0/8") + dockerManager := NewFakeDockerManager( fakeDocker, fakeRecorder, diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_getters.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_getters.go index 6c594f537339..8d3377c0846c 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_getters.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_getters.go @@ -218,3 +218,19 @@ func (kl *Kubelet) GetHostIP() (net.IP, error) { } return nodeutil.GetNodeHostIP(node) } + +// getHostIPAnyway attempts to return the host IP from kubelet's nodeInfo, or the initialNodeStatus +func (kl *Kubelet) getHostIPAnyWay() (net.IP, error) { + node, err := kl.getNodeAnyWay() + if err != nil { + return nil, err + } + return nodeutil.GetNodeHostIP(node) +} + +// GetExtraSupplementalGroupsForPod returns a list of the extra +// supplemental groups for the Pod. These extra supplemental groups come +// from annotations on persistent volumes that the pod depends on. +func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { + return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod) +} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/fake_rkt_interface_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/fake_rkt_interface_test.go index a7070b454267..389f1fb9e915 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/fake_rkt_interface_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/fake_rkt_interface_test.go @@ -175,6 +175,10 @@ func (f *fakeRuntimeHelper) GetPodDir(podUID types.UID) string { return "/poddir/" + string(podUID) } +func (f *fakeRuntimeHelper) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { + return nil +} + type fakeRktCli struct { sync.Mutex cmds []string diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt.go b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt.go index 8118c8fbbf4c..a64aefe45daa 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt.go @@ -509,9 +509,11 @@ func verifyNonRoot(app *appctypes.App, ctx *api.SecurityContext) error { return nil } -func setSupplementaryGIDs(app *appctypes.App, podCtx *api.PodSecurityContext) { - if podCtx != nil { +func setSupplementalGIDs(app *appctypes.App, podCtx *api.PodSecurityContext, supplementalGids []int64) { + if podCtx != nil || len(supplementalGids) != 0 { app.SupplementaryGIDs = app.SupplementaryGIDs[:0] + } + if podCtx != nil { for _, v := range podCtx.SupplementalGroups { app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(v)) } @@ -519,10 +521,13 @@ func setSupplementaryGIDs(app *appctypes.App, podCtx *api.PodSecurityContext) { app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(*podCtx.FSGroup)) } } + for _, v := range supplementalGids { + app.SupplementaryGIDs = append(app.SupplementaryGIDs, int(v)) + } } // setApp merges the container spec with the image's manifest. -func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubecontainer.RunContainerOptions, ctx *api.SecurityContext, podCtx *api.PodSecurityContext) error { +func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubecontainer.RunContainerOptions, ctx *api.SecurityContext, podCtx *api.PodSecurityContext, supplementalGids []int64) error { app := imgManifest.App // Set up Exec. @@ -563,7 +568,7 @@ func setApp(imgManifest *appcschema.ImageManifest, c *api.Container, opts *kubec if ctx != nil && ctx.RunAsUser != nil { app.User = strconv.Itoa(int(*ctx.RunAsUser)) } - setSupplementaryGIDs(app, podCtx) + setSupplementalGIDs(app, podCtx, supplementalGids) // If 'User' or 'Group' are still empty at this point, // then apply the root UID and GID. @@ -786,8 +791,9 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, podIP string, c api.Container, }) } + supplementalGids := r.runtimeHelper.GetExtraSupplementalGroupsForPod(pod) ctx := securitycontext.DetermineEffectiveSecurityContext(pod, &c) - if err := setApp(imgManifest, &c, opts, ctx, pod.Spec.SecurityContext); err != nil { + if err := setApp(imgManifest, &c, opts, ctx, pod.Spec.SecurityContext, supplementalGids); err != nil { return err } diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt_test.go index cdc90c0ba108..1b277349a52c 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/rkt/rkt_test.go @@ -940,21 +940,23 @@ func TestSetApp(t *testing.T) { fsgid := int64(3) tests := []struct { - container *api.Container - opts *kubecontainer.RunContainerOptions - ctx *api.SecurityContext - podCtx *api.PodSecurityContext - expect *appctypes.App - err error + container *api.Container + opts *kubecontainer.RunContainerOptions + ctx *api.SecurityContext + podCtx *api.PodSecurityContext + supplementalGids []int64 + expect *appctypes.App + err error }{ // Nothing should change, but the "User" and "Group" should be filled. { - container: &api.Container{}, - opts: &kubecontainer.RunContainerOptions{}, - ctx: nil, - podCtx: nil, - expect: baseAppWithRootUserGroup(t), - err: nil, + container: &api.Container{}, + opts: &kubecontainer.RunContainerOptions{}, + ctx: nil, + podCtx: nil, + supplementalGids: nil, + expect: baseAppWithRootUserGroup(t), + err: nil, }, // error verifying non-root. @@ -965,9 +967,10 @@ func TestSetApp(t *testing.T) { RunAsNonRoot: &runAsNonRootTrue, RunAsUser: &rootUser, }, - podCtx: nil, - expect: nil, - err: fmt.Errorf("container has no runAsUser and image will run as root"), + podCtx: nil, + supplementalGids: nil, + expect: nil, + err: fmt.Errorf("container has no runAsUser and image will run as root"), }, // app's args should be changed. @@ -975,9 +978,10 @@ func TestSetApp(t *testing.T) { container: &api.Container{ Args: []string{"foo"}, }, - opts: &kubecontainer.RunContainerOptions{}, - ctx: nil, - podCtx: nil, + opts: &kubecontainer.RunContainerOptions{}, + ctx: nil, + podCtx: nil, + supplementalGids: nil, expect: &appctypes.App{ Exec: appctypes.Exec{"/bin/foo", "foo"}, User: "0", @@ -1036,11 +1040,12 @@ func TestSetApp(t *testing.T) { SupplementalGroups: []int64{1, 2}, FSGroup: &fsgid, }, + supplementalGids: []int64{4}, expect: &appctypes.App{ Exec: appctypes.Exec{"/bin/bar", "foo"}, User: "42", Group: "0", - SupplementaryGIDs: []int{1, 2, 3}, + SupplementaryGIDs: []int{1, 2, 3, 4}, WorkingDirectory: tmpDir, Environment: []appctypes.EnvironmentVariable{ {"env-foo", "bar"}, @@ -1099,11 +1104,12 @@ func TestSetApp(t *testing.T) { SupplementalGroups: []int64{1, 2}, FSGroup: &fsgid, }, + supplementalGids: []int64{4}, expect: &appctypes.App{ Exec: appctypes.Exec{"/bin/hello", "foo", "hello", "world", "bar"}, User: "42", Group: "0", - SupplementaryGIDs: []int{1, 2, 3}, + SupplementaryGIDs: []int{1, 2, 3, 4}, WorkingDirectory: tmpDir, Environment: []appctypes.EnvironmentVariable{ {"env-foo", "foo"}, @@ -1128,7 +1134,7 @@ func TestSetApp(t *testing.T) { for i, tt := range tests { testCaseHint := fmt.Sprintf("test case #%d", i) img := baseImageManifest(t) - err := setApp(img, tt.container, tt.opts, tt.ctx, tt.podCtx) + err := setApp(img, tt.container, tt.opts, tt.ctx, tt.podCtx, tt.supplementalGids) if err == nil && tt.err != nil || err != nil && tt.err == nil { t.Errorf("%s: expect %v, saw %v", testCaseHint, tt.err, err) } diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager.go index 5a7c4c650def..c3293327f0d6 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager.go @@ -98,16 +98,10 @@ type VolumeManager interface { // volumes. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap - // GetVolumesForPodAndApplySupplementalGroups, like GetVolumesForPod returns - // a VolumeMap containing the volumes referenced by the specified pod that - // are successfully attached and mounted. The key in the map is the - // OuterVolumeSpecName (i.e. pod.Spec.Volumes[x].Name). - // It returns an empty VolumeMap if pod has no volumes. - // In addition for every volume that specifies a VolumeGidValue, it appends - // the SecurityContext.SupplementalGroups for the specified pod. - // XXX: https://github.com/kubernetes/kubernetes/issues/27197 mutating the - // pod object is bad, and should be avoided. - GetVolumesForPodAndAppendSupplementalGroups(pod *api.Pod) container.VolumeMap + // GetExtraSupplementalGroupsForPod returns a list of the extra + // supplemental groups for the Pod. These extra supplemental groups come + // from annotations on persistent volumes that the pod depends on. + GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 // Returns a list of all volumes that implement the volume.Attacher // interface and are currently in use according to the actual and desired @@ -224,12 +218,34 @@ func (vm *volumeManager) Run(stopCh <-chan struct{}) { func (vm *volumeManager) GetMountedVolumesForPod( podName types.UniquePodName) container.VolumeMap { - return vm.getVolumesForPodHelper(podName, nil /* pod */) + podVolumes := make(container.VolumeMap) + for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) { + podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{Mounter: mountedVolume.Mounter} + } + return podVolumes } -func (vm *volumeManager) GetVolumesForPodAndAppendSupplementalGroups( - pod *api.Pod) container.VolumeMap { - return vm.getVolumesForPodHelper("" /* podName */, pod) +func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { + podName := volumehelper.GetUniquePodName(pod) + supplementalGroups := sets.NewString() + + for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) { + if mountedVolume.VolumeGidValue != "" { + supplementalGroups.Insert(mountedVolume.VolumeGidValue) + } + } + + result := make([]int64, 0, supplementalGroups.Len()) + for _, group := range supplementalGroups.List() { + iGroup, extra := getExtraSupplementalGid(group, pod) + if !extra { + continue + } + + result = append(result, int64(iGroup)) + } + + return result } func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName { @@ -276,33 +292,6 @@ func (vm *volumeManager) MarkVolumesAsReportedInUse( vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse) } -// getVolumesForPodHelper is a helper method implements the common logic for -// the GetVolumesForPod methods. -// XXX: https://github.com/kubernetes/kubernetes/issues/27197 mutating the pod -// object is bad, and should be avoided. -func (vm *volumeManager) getVolumesForPodHelper( - podName types.UniquePodName, pod *api.Pod) container.VolumeMap { - if pod != nil { - podName = volumehelper.GetUniquePodName(pod) - } - podVolumes := make(container.VolumeMap) - for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) { - podVolumes[mountedVolume.OuterVolumeSpecName] = - container.VolumeInfo{Mounter: mountedVolume.Mounter} - if pod != nil { - err := applyPersistentVolumeAnnotations( - mountedVolume.VolumeGidValue, pod) - if err != nil { - glog.Errorf("applyPersistentVolumeAnnotations failed for pod %q volume %q with: %v", - podName, - mountedVolume.VolumeName, - err) - } - } - } - return podVolumes -} - func (vm *volumeManager) WaitForAttachAndMount(pod *api.Pod) error { expectedVolumes := getExpectedVolumes(pod) if len(expectedVolumes) == 0 { @@ -392,32 +381,26 @@ func getExpectedVolumes(pod *api.Pod) []string { return expectedVolumes } -// applyPersistentVolumeAnnotations appends a pod -// SecurityContext.SupplementalGroups if a GID annotation is provided. -// XXX: https://github.com/kubernetes/kubernetes/issues/27197 mutating the pod -// object is bad, and should be avoided. -func applyPersistentVolumeAnnotations( - volumeGidValue string, pod *api.Pod) error { - if volumeGidValue != "" { - gid, err := strconv.ParseInt(volumeGidValue, 10, 64) - if err != nil { - return fmt.Errorf( - "Invalid value for %s %v", - volumehelper.VolumeGidAnnotationKey, - err) - } +// getExtraSupplementalGid returns the value of an extra supplemental GID as +// defined by an annotation on a volume and a boolean indicating whether the +// volume defined a GID that the pod doesn't already request. +func getExtraSupplementalGid(volumeGidValue string, pod *api.Pod) (int64, bool) { + if volumeGidValue == "" { + return 0, false + } - if pod.Spec.SecurityContext == nil { - pod.Spec.SecurityContext = &api.PodSecurityContext{} - } + gid, err := strconv.ParseInt(volumeGidValue, 10, 64) + if err != nil { + return 0, false + } + + if pod.Spec.SecurityContext != nil { for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups { if gid == existingGid { - return nil + return 0, false } } - pod.Spec.SecurityContext.SupplementalGroups = - append(pod.Spec.SecurityContext.SupplementalGroups, gid) } - return nil + return gid, true } diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager_test.go new file mode 100644 index 000000000000..ec9bf6224f6d --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumemanager + +import ( + "os" + "reflect" + "strconv" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/pod" + kubepod "k8s.io/kubernetes/pkg/kubelet/pod" + podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" + "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/kubernetes/pkg/volume/util/volumehelper" +) + +const ( + testHostname = "test-hostname" +) + +func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + + node, pod, pv, claim := createObjects() + kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) + + manager, err := newTestVolumeManager(tmpDir, podManager, kubeClient) + if err != nil { + t.Fatalf("Failed to initialize volume manager: %v", err) + } + + stopCh := make(chan struct{}) + go manager.Run(stopCh) + defer close(stopCh) + + podManager.SetPods([]*api.Pod{pod}) + + // Fake node status update + go simulateVolumeInUseUpdate( + api.UniqueVolumeName(node.Status.VolumesAttached[0].Name), + stopCh, + manager) + + err = manager.WaitForAttachAndMount(pod) + if err != nil { + t.Errorf("Expected success: %v", err) + } + + expectedMounted := pod.Spec.Volumes[0].Name + actualMounted := manager.GetMountedVolumesForPod(types.UniquePodName(pod.ObjectMeta.UID)) + if _, ok := actualMounted[expectedMounted]; !ok || (len(actualMounted) != 1) { + t.Errorf("Expected %v to be mounted to pod but got %v", expectedMounted, actualMounted) + } + + expectedInUse := []api.UniqueVolumeName{api.UniqueVolumeName(node.Status.VolumesAttached[0].Name)} + actualInUse := manager.GetVolumesInUse() + if !reflect.DeepEqual(expectedInUse, actualInUse) { + t.Errorf("Expected %v to be in use but got %v", expectedInUse, actualInUse) + } +} + +func TestGetExtraSupplementalGroupsForPod(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + + node, pod, _, claim := createObjects() + + existingGid := pod.Spec.SecurityContext.SupplementalGroups[0] + + cases := []struct { + gidAnnotation string + expected []int64 + }{ + { + gidAnnotation: "777", + expected: []int64{777}, + }, + { + gidAnnotation: strconv.FormatInt(existingGid, 10), + expected: []int64{}, + }, + { + gidAnnotation: "a", + expected: []int64{}, + }, + { + gidAnnotation: "", + expected: []int64{}, + }, + } + + for _, tc := range cases { + pv := &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: "pvA", + Annotations: map[string]string{ + volumehelper.VolumeGidAnnotationKey: tc.gidAnnotation, + }, + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device", + }, + }, + ClaimRef: &api.ObjectReference{ + Name: claim.ObjectMeta.Name, + }, + }, + } + kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) + + manager, err := newTestVolumeManager(tmpDir, podManager, kubeClient) + if err != nil { + t.Errorf("Failed to initialize volume manager: %v", err) + continue + } + + stopCh := make(chan struct{}) + go manager.Run(stopCh) + + podManager.SetPods([]*api.Pod{pod}) + + // Fake node status update + go simulateVolumeInUseUpdate( + api.UniqueVolumeName(node.Status.VolumesAttached[0].Name), + stopCh, + manager) + + err = manager.WaitForAttachAndMount(pod) + if err != nil { + t.Errorf("Expected success: %v", err) + continue + } + + actual := manager.GetExtraSupplementalGroupsForPod(pod) + if !reflect.DeepEqual(tc.expected, actual) { + t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual) + } + + close(stopCh) + } +} + +func newTestVolumeManager( + tmpDir string, + podManager pod.Manager, + kubeClient internalclientset.Interface) (VolumeManager, error) { + plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} + plugMgr := &volume.VolumePluginMgr{} + plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil, "" /* rootContext */)) + + vm, err := NewVolumeManager( + true, + testHostname, + podManager, + kubeClient, + plugMgr, + &containertest.FakeRuntime{}) + return vm, err +} + +// createObjects returns objects for making a fake clientset. The pv is +// already attached to the node and bound to the claim used by the pod. +func createObjects() (*api.Node, *api.Pod, *api.PersistentVolume, *api.PersistentVolumeClaim) { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{Name: testHostname}, + Status: api.NodeStatus{ + VolumesAttached: []api.AttachedVolume{ + { + Name: "fake/pvA", + DevicePath: "fake/path", + }, + }}, + Spec: api.NodeSpec{ExternalID: testHostname}, + } + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "abc", + Namespace: "nsA", + UID: "1234", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "vol1", + VolumeSource: api.VolumeSource{ + PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{ + ClaimName: "claimA", + }, + }, + }, + }, + SecurityContext: &api.PodSecurityContext{ + SupplementalGroups: []int64{555}, + }, + }, + } + pv := &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: "pvA", + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device", + }, + }, + ClaimRef: &api.ObjectReference{ + Name: "claimA", + }, + }, + } + claim := &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "claimA", + Namespace: "nsA", + }, + Spec: api.PersistentVolumeClaimSpec{ + VolumeName: "pvA", + }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimBound, + }, + } + return node, pod, pv, claim +} + +func simulateVolumeInUseUpdate( + volumeName api.UniqueVolumeName, + stopCh <-chan struct{}, + volumeManager VolumeManager) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + volumeManager.MarkVolumesAsReportedInUse( + []api.UniqueVolumeName{volumeName}) + case <-stopCh: + return + } + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/securitycontext/fake.go b/vendor/k8s.io/kubernetes/pkg/securitycontext/fake.go index 804f1690599d..a4bf2fa7073c 100644 --- a/vendor/k8s.io/kubernetes/pkg/securitycontext/fake.go +++ b/vendor/k8s.io/kubernetes/pkg/securitycontext/fake.go @@ -41,5 +41,5 @@ type FakeSecurityContextProvider struct{} func (p FakeSecurityContextProvider) ModifyContainerConfig(pod *api.Pod, container *api.Container, config *dockercontainer.Config) { } -func (p FakeSecurityContextProvider) ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig) { +func (p FakeSecurityContextProvider) ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig, supplementalGids []int64) { } diff --git a/vendor/k8s.io/kubernetes/pkg/securitycontext/provider.go b/vendor/k8s.io/kubernetes/pkg/securitycontext/provider.go index e31914960f99..4a187676bb09 100644 --- a/vendor/k8s.io/kubernetes/pkg/securitycontext/provider.go +++ b/vendor/k8s.io/kubernetes/pkg/securitycontext/provider.go @@ -47,12 +47,12 @@ func (p SimpleSecurityContextProvider) ModifyContainerConfig(pod *api.Pod, conta } } -// ModifyHostConfig is called before the Docker runContainer call. -// The security context provider can make changes to the HostConfig, affecting +// ModifyHostConfig is called before the Docker runContainer call. The +// security context provider can make changes to the HostConfig, affecting // security options, whether the container is privileged, volume binds, etc. -func (p SimpleSecurityContextProvider) ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig) { - // Apply pod security context - if container.Name != leaky.PodInfraContainerName && pod.Spec.SecurityContext != nil { +func (p SimpleSecurityContextProvider) ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig, supplementalGids []int64) { + // Apply supplemental groups + if container.Name != leaky.PodInfraContainerName { // TODO: We skip application of supplemental groups to the // infra container to work around a runc issue which // requires containers to have the '/etc/group'. For @@ -60,15 +60,17 @@ func (p SimpleSecurityContextProvider) ModifyHostConfig(pod *api.Pod, container // https://github.com/opencontainers/runc/pull/313 // This can be removed once the fix makes it into the // required version of docker. - if pod.Spec.SecurityContext.SupplementalGroups != nil { - hostConfig.GroupAdd = make([]string, len(pod.Spec.SecurityContext.SupplementalGroups)) - for i, group := range pod.Spec.SecurityContext.SupplementalGroups { - hostConfig.GroupAdd[i] = strconv.Itoa(int(group)) + if pod.Spec.SecurityContext != nil { + for _, group := range pod.Spec.SecurityContext.SupplementalGroups { + hostConfig.GroupAdd = append(hostConfig.GroupAdd, strconv.Itoa(int(group))) + } + if pod.Spec.SecurityContext.FSGroup != nil { + hostConfig.GroupAdd = append(hostConfig.GroupAdd, strconv.Itoa(int(*pod.Spec.SecurityContext.FSGroup))) } } - if pod.Spec.SecurityContext.FSGroup != nil { - hostConfig.GroupAdd = append(hostConfig.GroupAdd, strconv.Itoa(int(*pod.Spec.SecurityContext.FSGroup))) + for _, group := range supplementalGids { + hostConfig.GroupAdd = append(hostConfig.GroupAdd, strconv.Itoa(int(group))) } } diff --git a/vendor/k8s.io/kubernetes/pkg/securitycontext/provider_test.go b/vendor/k8s.io/kubernetes/pkg/securitycontext/provider_test.go index 5d70c338de0b..9f7bff6e0de0 100644 --- a/vendor/k8s.io/kubernetes/pkg/securitycontext/provider_test.go +++ b/vendor/k8s.io/kubernetes/pkg/securitycontext/provider_test.go @@ -166,7 +166,7 @@ func TestModifyHostConfig(t *testing.T) { dummyContainer.SecurityContext = tc.sc dockerCfg := &dockercontainer.HostConfig{} - provider.ModifyHostConfig(pod, dummyContainer, dockerCfg) + provider.ModifyHostConfig(pod, dummyContainer, dockerCfg, nil) if e, a := tc.expected, dockerCfg; !reflect.DeepEqual(e, a) { t.Errorf("%v: unexpected modification of host config\nExpected:\n\n%#v\n\nGot:\n\n%#v", tc.name, e, a) @@ -181,32 +181,50 @@ func TestModifyHostConfigPodSecurityContext(t *testing.T) { supplementalGroupHC.GroupAdd = []string{"2222"} fsGroupHC := fullValidHostConfig() fsGroupHC.GroupAdd = []string{"1234"} + extraSupplementalGroupHC := fullValidHostConfig() + extraSupplementalGroupHC.GroupAdd = []string{"1234"} bothHC := fullValidHostConfig() bothHC.GroupAdd = []string{"2222", "1234"} fsGroup := int64(1234) + extraSupplementalGroup := []int64{1234} testCases := map[string]struct { - securityContext *api.PodSecurityContext - expected *dockercontainer.HostConfig + securityContext *api.PodSecurityContext + expected *dockercontainer.HostConfig + extraSupplementalGroups []int64 }{ "nil": { - securityContext: nil, - expected: fullValidHostConfig(), + securityContext: nil, + expected: fullValidHostConfig(), + extraSupplementalGroups: nil, }, "SupplementalGroup": { - securityContext: supplementalGroupsSC, - expected: supplementalGroupHC, + securityContext: supplementalGroupsSC, + expected: supplementalGroupHC, + extraSupplementalGroups: nil, }, "FSGroup": { - securityContext: &api.PodSecurityContext{FSGroup: &fsGroup}, - expected: fsGroupHC, + securityContext: &api.PodSecurityContext{FSGroup: &fsGroup}, + expected: fsGroupHC, + extraSupplementalGroups: nil, }, "FSGroup + SupplementalGroups": { securityContext: &api.PodSecurityContext{ SupplementalGroups: []int64{2222}, FSGroup: &fsGroup, }, - expected: bothHC, + expected: bothHC, + extraSupplementalGroups: nil, + }, + "ExtraSupplementalGroup": { + securityContext: nil, + expected: extraSupplementalGroupHC, + extraSupplementalGroups: extraSupplementalGroup, + }, + "ExtraSupplementalGroup + SupplementalGroups": { + securityContext: supplementalGroupsSC, + expected: bothHC, + extraSupplementalGroups: extraSupplementalGroup, }, } @@ -220,7 +238,7 @@ func TestModifyHostConfigPodSecurityContext(t *testing.T) { for k, v := range testCases { dummyPod.Spec.SecurityContext = v.securityContext dockerCfg := &dockercontainer.HostConfig{} - provider.ModifyHostConfig(dummyPod, dummyContainer, dockerCfg) + provider.ModifyHostConfig(dummyPod, dummyContainer, dockerCfg, v.extraSupplementalGroups) if !reflect.DeepEqual(v.expected, dockerCfg) { t.Errorf("unexpected modification of host config for %s. Expected: %#v Got: %#v", k, v.expected, dockerCfg) } diff --git a/vendor/k8s.io/kubernetes/pkg/securitycontext/types.go b/vendor/k8s.io/kubernetes/pkg/securitycontext/types.go index 6f45b19684ff..87b5063fcc72 100644 --- a/vendor/k8s.io/kubernetes/pkg/securitycontext/types.go +++ b/vendor/k8s.io/kubernetes/pkg/securitycontext/types.go @@ -33,7 +33,11 @@ type SecurityContextProvider interface { // security options, whether the container is privileged, volume binds, etc. // An error is returned if it's not possible to secure the container as requested // with a security context. - ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig) + // + // - pod: the pod to modify the docker hostconfig for + // - container: the container to modify the hostconfig for + // - supplementalGids: additional supplemental GIDs associated with the pod's volumes + ModifyHostConfig(pod *api.Pod, container *api.Container, hostConfig *dockercontainer.HostConfig, supplementalGids []int64) } const ( From f73592b1f99e720592a67761038ab74002e0042b Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 5 Aug 2016 11:07:21 -0400 Subject: [PATCH 8/8] UPSTREAM: google/cadvisor: 1359: Make ThinPoolWatcher loglevel consistent --- .../github.com/google/cadvisor/container/docker/handler.go | 5 ++++- .../google/cadvisor/devicemapper/thin_pool_watcher.go | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/vendor/github.com/google/cadvisor/container/docker/handler.go b/vendor/github.com/google/cadvisor/container/docker/handler.go index fd3e2fdc4ba9..c1fb15864167 100644 --- a/vendor/github.com/google/cadvisor/container/docker/handler.go +++ b/vendor/github.com/google/cadvisor/container/docker/handler.go @@ -275,7 +275,10 @@ func (h *dockerFsHandler) Usage() (uint64, uint64) { if h.thinPoolWatcher != nil { thinPoolUsage, err := h.thinPoolWatcher.GetUsage(h.deviceID) if err != nil { - glog.Errorf("unable to get fs usage from thin pool for device %v: %v", h.deviceID, err) + // TODO: ideally we should keep track of how many times we failed to get the usage for this + // device vs how many refreshes of the cache there have been, and display an error e.g. if we've + // had at least 1 refresh and we still can't find the device. + glog.V(5).Infof("unable to get fs usage from thin pool for device %s: %v", h.deviceID, err) } else { baseUsage = thinPoolUsage usage += thinPoolUsage diff --git a/vendor/github.com/google/cadvisor/devicemapper/thin_pool_watcher.go b/vendor/github.com/google/cadvisor/devicemapper/thin_pool_watcher.go index bf2300a33bd3..6f5666a02fe3 100644 --- a/vendor/github.com/google/cadvisor/devicemapper/thin_pool_watcher.go +++ b/vendor/github.com/google/cadvisor/devicemapper/thin_pool_watcher.go @@ -74,7 +74,7 @@ func (w *ThinPoolWatcher) Start() { // print latency for refresh duration := time.Since(start) - glog.V(3).Infof("thin_ls(%d) took %s", start.Unix(), duration) + glog.V(5).Infof("thin_ls(%d) took %s", start.Unix(), duration) } } } @@ -115,7 +115,7 @@ func (w *ThinPoolWatcher) Refresh() error { } if currentlyReserved { - glog.V(4).Infof("metadata for %v is currently reserved; releasing", w.poolName) + glog.V(5).Infof("metadata for %v is currently reserved; releasing", w.poolName) _, err = w.dmsetup.Message(w.poolName, 0, releaseMetadataMessage) if err != nil { err = fmt.Errorf("error releasing metadata snapshot for %v: %v", w.poolName, err) @@ -123,7 +123,7 @@ func (w *ThinPoolWatcher) Refresh() error { } } - glog.Infof("reserving metadata snapshot for thin-pool %v", w.poolName) + glog.V(5).Infof("reserving metadata snapshot for thin-pool %v", w.poolName) // NOTE: "0" in the call below is for the 'sector' argument to 'dmsetup // message'. It's not needed for thin pools. if output, err := w.dmsetup.Message(w.poolName, 0, reserveMetadataMessage); err != nil {