diff --git a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/fake/fake.go b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/fake/fake.go index 6b4ee6825620..f0818b90b6e1 100644 --- a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/fake/fake.go +++ b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/fake/fake.go @@ -22,6 +22,7 @@ import ( "net" "regexp" "sync" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -55,6 +56,7 @@ type FakeCloud struct { Calls []string Addresses []v1.NodeAddress + addressesMux sync.Mutex ExtID map[types.NodeName]string InstanceTypes map[types.NodeName]string Machines []types.NodeName @@ -70,6 +72,8 @@ type FakeCloud struct { addCallLock sync.Mutex cloudprovider.Zone VolumeLabelMap map[string]map[string]string + + RequestDelay time.Duration } type FakeRoute struct { @@ -80,6 +84,9 @@ type FakeRoute struct { func (f *FakeCloud) addCall(desc string) { f.addCallLock.Lock() defer f.addCallLock.Unlock() + + time.Sleep(f.RequestDelay) + f.Calls = append(f.Calls, desc) } @@ -198,9 +205,17 @@ func (f *FakeCloud) CurrentNodeName(ctx context.Context, hostname string) (types // It adds an entry "node-addresses" into the internal method call record. func (f *FakeCloud) NodeAddresses(ctx context.Context, instance types.NodeName) ([]v1.NodeAddress, error) { f.addCall("node-addresses") + f.addressesMux.Lock() + defer f.addressesMux.Unlock() return f.Addresses, f.Err } +func (f *FakeCloud) SetNodeAddresses(nodeAddresses []v1.NodeAddress) { + f.addressesMux.Lock() + defer f.addressesMux.Unlock() + f.Addresses = nodeAddresses +} + // NodeAddressesByProviderID is a test-spy implementation of Instances.NodeAddressesByProviderID. // It adds an entry "node-addresses-by-provider-id" into the internal method call record. func (f *FakeCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) { diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD b/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD index f0230fbb113c..d41678ffb185 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD @@ -10,6 +10,7 @@ go_library( name = "go_default_library", srcs = [ "active_deadline.go", + "cloud_request_manager.go", "doc.go", "kubelet.go", "kubelet_getters.go", @@ -145,6 +146,7 @@ go_test( name = "go_default_test", srcs = [ "active_deadline_test.go", + "cloud_request_manager_test.go", "kubelet_getters_test.go", "kubelet_network_test.go", "kubelet_node_status_test.go", diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager.go b/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager.go new file mode 100644 index 000000000000..58752bf19d3c --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager.go @@ -0,0 +1,116 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/cloudprovider" + + "github.com/golang/glog" +) + +var nodeAddressesRetryPeriod = 5 * time.Second + +type cloudResourceSyncManager struct { + // Cloud provider interface. + cloud cloudprovider.Interface + // Sync period + syncPeriod time.Duration + + nodeAddressesMux sync.Mutex + nodeAddressesErr error + nodeAddresses []v1.NodeAddress + + nodeName types.NodeName +} + +// NewCloudResourceSyncManager creates a manager responsible for collecting resources +// from a cloud provider through requests that are sensitive to timeouts and hanging +func NewCloudResourceSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) *cloudResourceSyncManager { + return &cloudResourceSyncManager{ + cloud: cloud, + syncPeriod: syncPeriod, + nodeName: nodeName, + } +} + +func (manager *cloudResourceSyncManager) getNodeAddressSafe() ([]v1.NodeAddress, error) { + manager.nodeAddressesMux.Lock() + defer manager.nodeAddressesMux.Unlock() + + return manager.nodeAddresses, manager.nodeAddressesErr +} + +func (manager *cloudResourceSyncManager) setNodeAddressSafe(nodeAddresses []v1.NodeAddress, err error) { + manager.nodeAddressesMux.Lock() + defer manager.nodeAddressesMux.Unlock() + + manager.nodeAddresses = nodeAddresses + manager.nodeAddressesErr = err +} + +// NodeAddresses does not wait for cloud provider to return a node addresses. +// It always returns node addresses or an error. +func (manager *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) { + // wait until there is something + for { + nodeAddresses, err := manager.getNodeAddressSafe() + if len(nodeAddresses) == 0 && err == nil { + glog.V(5).Infof("Waiting for %v for cloud provider to provide node addresses", nodeAddressesRetryPeriod) + time.Sleep(nodeAddressesRetryPeriod) + continue + } + return nodeAddresses, err + } +} + +func (manager *cloudResourceSyncManager) collectNodeAddresses(ctx context.Context, nodeName types.NodeName) { + glog.V(2).Infof("Requesting node addresses from cloud provider for node %q", nodeName) + + instances, ok := manager.cloud.Instances() + if !ok { + manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get instances from cloud provider")) + return + } + + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface + // TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below? + + nodeAddresses, err := instances.NodeAddresses(ctx, nodeName) + if err != nil { + manager.setNodeAddressSafe(nil, fmt.Errorf("failed to get node address from cloud provider: %v", err)) + glog.V(2).Infof("Node addresses from cloud provider for node %q not collected", nodeName) + } else { + manager.setNodeAddressSafe(nodeAddresses, nil) + glog.V(2).Infof("Node addresses from cloud provider for node %q collected", nodeName) + } +} + +func (manager *cloudResourceSyncManager) Run(stopCh <-chan struct{}) { + wait.Until(func() { + manager.collectNodeAddresses(context.TODO(), manager.nodeName) + }, manager.syncPeriod, stopCh) +} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager_test.go new file mode 100644 index 000000000000..f29293c504f4 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/cloud_request_manager_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" +) + +func collectNodeAddresses(manager *cloudResourceSyncManager) ([]v1.NodeAddress, error) { + var nodeAddresses []v1.NodeAddress + var err error + + collected := make(chan struct{}, 1) + go func() { + nodeAddresses, err = manager.NodeAddresses() + close(collected) + }() + + select { + case <-collected: + return nodeAddresses, err + case <-time.Tick(2 * nodeAddressesRetryPeriod): + return nil, fmt.Errorf("Timeout after %v waiting for address to appear", 2*nodeAddressesRetryPeriod) + } +} + +func createNodeInternalIPAddress(address string) []v1.NodeAddress { + return []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: address, + }, + } +} + +func TestNodeAddressesRequest(t *testing.T) { + syncPeriod := 300 * time.Millisecond + maxRetry := 5 + cloud := &fake.FakeCloud{ + Addresses: createNodeInternalIPAddress("10.0.1.12"), + // Set the request delay so the manager timeouts and collects the node addresses later + RequestDelay: 400 * time.Millisecond, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + manager := NewCloudResourceSyncManager(cloud, "defaultNode", syncPeriod) + go manager.Run(stopCh) + + nodeAddresses, err := collectNodeAddresses(manager) + if err != nil { + t.Errorf("Unexpected err: %q\n", err) + } + if !reflect.DeepEqual(nodeAddresses, cloud.Addresses) { + t.Errorf("Unexpected list of node addresses %#v, expected %#v: %v", nodeAddresses, cloud.Addresses, err) + } + + // Change the IP address + cloud.SetNodeAddresses(createNodeInternalIPAddress("10.0.1.13")) + + // Wait until the IP address changes + for i := 0; i < maxRetry; i++ { + nodeAddresses, err := collectNodeAddresses(manager) + t.Logf("nodeAddresses: %#v, err: %v", nodeAddresses, err) + if err != nil { + t.Errorf("Unexpected err: %q\n", err) + } + // It is safe to read cloud.Addresses since no routine is changing the value at the same time + if err == nil && nodeAddresses[0].Address != cloud.Addresses[0].Address { + time.Sleep(100 * time.Millisecond) + continue + } + if err != nil { + t.Errorf("Unexpected err: %q\n", err) + } + return + } + t.Errorf("Timeout waiting for %q address to appear", cloud.Addresses[0].Address) +} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go index e5c87dd2afad..fc5c93a8c96d 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go @@ -538,10 +538,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } if klet.cloud != nil { - klet.cloudproviderRequestParallelism = make(chan int, 1) - klet.cloudproviderRequestSync = make(chan int) - // TODO(jchaloup): Make it configurable via --cloud-provider-request-timeout - klet.cloudproviderRequestTimeout = 10 * time.Second + klet.cloudResourceSyncManager = NewCloudResourceSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) } secretManager := secret.NewCachingSecretManager( @@ -1051,18 +1048,12 @@ type Kubelet struct { // Cloud provider interface. cloud cloudprovider.Interface + // Handles requests to cloud provider with timeout + cloudResourceSyncManager *cloudResourceSyncManager + // Indicates that the node initialization happens in an external cloud controller externalCloudProvider bool - // To keep exclusive access to the cloudproviderRequestParallelism - cloudproviderRequestMux sync.Mutex - // Keep the count of requests processed in parallel (expected to be 1 at most at a given time) - cloudproviderRequestParallelism chan int - // Sync with finished requests - cloudproviderRequestSync chan int - // Request timeout - cloudproviderRequestTimeout time.Duration - // Reference to this node. nodeRef *v1.ObjectReference @@ -1392,6 +1383,11 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { glog.Warning("No api server defined - no node status update will be sent.") } + // Start the cloud provider sync manager + if kl.cloudResourceSyncManager != nil { + go kl.cloudResourceSyncManager.Run(wait.NeverStop) + } + if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) glog.Fatal(err) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go index 03bf9825907d..9fa91e35d633 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go @@ -471,46 +471,10 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error { return nil } if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface - // TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below? - var nodeAddresses []v1.NodeAddress - var err error - - // Make sure the instances.NodeAddresses returns even if the cloud provider API hangs for a long time - func() { - kl.cloudproviderRequestMux.Lock() - if len(kl.cloudproviderRequestParallelism) > 0 { - kl.cloudproviderRequestMux.Unlock() - return - } - kl.cloudproviderRequestParallelism <- 0 - kl.cloudproviderRequestMux.Unlock() - - go func() { - nodeAddresses, err = instances.NodeAddresses(context.TODO(), kl.nodeName) - - kl.cloudproviderRequestMux.Lock() - <-kl.cloudproviderRequestParallelism - kl.cloudproviderRequestMux.Unlock() - - kl.cloudproviderRequestSync <- 0 - }() - }() - - select { - case <-kl.cloudproviderRequestSync: - case <-time.After(kl.cloudproviderRequestTimeout): - err = fmt.Errorf("Timeout after %v", kl.cloudproviderRequestTimeout) - } + nodeAddresses, err := kl.cloudResourceSyncManager.NodeAddresses() if err != nil { - return fmt.Errorf("failed to get node address from cloud provider: %v", err) + return err } if kl.nodeIP != nil { enforcedNodeAddresses := []v1.NodeAddress{} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go index a40cd33ecf8a..c170e535036b 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go @@ -192,12 +192,12 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) { Err: nil, } kubelet.cloud = fakeCloud - kubelet.cloudproviderRequestParallelism = make(chan int, 1) - kubelet.cloudproviderRequestSync = make(chan int) - kubelet.cloudproviderRequestTimeout = 10 * time.Second + kubelet.cloudResourceSyncManager = NewCloudResourceSyncManager(kubelet.cloud, kubelet.nodeName, kubelet.nodeStatusUpdateFrequency) + stopCh := make(chan struct{}) + go kubelet.cloudResourceSyncManager.Run(stopCh) kubelet.setNodeAddress(&existingNode) - + close(stopCh) expectedAddresses := []v1.NodeAddress{ { Type: v1.NodeExternalIP,