From ad89448c4f925f85d9f5e1b82aab32ff90da1831 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 5 Feb 2018 10:33:26 -0500 Subject: [PATCH] UPSTREAM: 56872: Fix event generation Also fix error handling in operation hooks. --- .../volume/persistentvolume/pv_controller.go | 4 +- .../kubernetes/pkg/volume/util/metrics.go | 6 +- .../nestedpendingoperations.go | 20 +- .../nestedpendingoperations_test.go | 88 ++-- .../operationexecutor/operation_executor.go | 61 +-- .../operation_executor_test.go | 128 ++++-- .../operationexecutor/operation_generator.go | 428 +++++++++++------- .../kubernetes/pkg/volume/util/types/types.go | 8 + 8 files changed, 432 insertions(+), 311 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go index b1ddcae334e2..7f663a0eb055 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go @@ -1254,7 +1254,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_delete") err = deleter.Delete() - opComplete(err) + opComplete(&err) if err != nil { // Deleter failed return false, err @@ -1377,7 +1377,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision") volume, err = provisioner.Provision() - opComplete(err) + opComplete(&err) if err != nil { strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err) glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err) diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go b/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go index ab2d76286bb7..e3af12df8906 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/metrics.go @@ -49,12 +49,12 @@ func registerMetrics() { } // OperationCompleteHook returns a hook to call when an operation is completed -func OperationCompleteHook(plugin, operationName string) func(error) { +func OperationCompleteHook(plugin, operationName string) func(*error) { requestTime := time.Now() - opComplete := func(err error) { + opComplete := func(err *error) { timeTaken := time.Since(requestTime).Seconds() // Create metric with operation name and plugin name - if err != nil { + if *err != nil { storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc() } else { storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken) diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 82462c1f2f6c..526ea403ceea 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -55,7 +55,7 @@ type NestedPendingOperations interface { // concatenation of volumeName and podName is removed from the list of // executing operations allowing a new operation to be started with the // volumeName without error. - Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, operationFunc func() error, operationCompleteFunc func(error)) error + Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish @@ -94,8 +94,7 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, podName types.UniquePodName, - operationFunc func() error, - operationCompleteFunc func(error)) error { + generatedOperations types.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) @@ -128,15 +127,20 @@ func (grm *nestedPendingOperations) Run( }) } - go func() (err error) { + go func() (eventErr, detailedErr error) { // Handle unhandled panics (very unlikely) defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(volumeName, podName, &err) - defer operationCompleteFunc(err) + defer grm.operationComplete(volumeName, podName, &detailedErr) + if generatedOperations.CompleteFunc != nil { + defer generatedOperations.CompleteFunc(&detailedErr) + } + if generatedOperations.EventRecorderFunc != nil { + defer generatedOperations.EventRecorderFunc(&eventErr) + } // Handle panic, if any, from operationFunc() - defer k8sRuntime.RecoverFromPanic(&err) - return operationFunc() + defer k8sRuntime.RecoverFromPanic(&detailedErr) + return generatedOperations.OperationFunc() }() return nil diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index 8882303bde55..5865f96c21a6 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -47,10 +47,10 @@ func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { @@ -63,11 +63,11 @@ func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volume1Name := v1.UniqueVolumeName("volume1-name") volume2Name := v1.UniqueVolumeName("volume2-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, operation, func(error) {}) - err2 := grm.Run(volume2Name, "" /* operationSubName */, operation, func(error) {}) + err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { @@ -85,11 +85,11 @@ func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1PodName := types.UniquePodName("operation1-podname") operation2PodName := types.UniquePodName("operation2-podname") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volumeName, operation1PodName, operation, func(error) {}) - err2 := grm.Run(volumeName, operation2PodName, operation, func(error) {}) + err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) + err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) // Assert if err1 != nil { @@ -105,10 +105,10 @@ func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() error { return nil } + operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, operation, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) // Assert if err != nil { @@ -122,7 +122,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -133,7 +133,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -154,7 +154,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -165,7 +165,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -185,7 +185,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -195,7 +195,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -215,7 +215,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -225,7 +225,7 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -246,14 +246,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -271,14 +271,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -296,14 +296,14 @@ func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) operationPodName := types.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, operation1, func(error) {}) + err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, operationPodName, operation2, func(error) {}) + err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -320,14 +320,14 @@ func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } operation2 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -344,7 +344,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -352,7 +352,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -367,7 +367,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -388,7 +388,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) } @@ -396,7 +396,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t operation3 := generateNoopFunc() // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, operation2, func(error) {}) + err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { @@ -411,7 +411,7 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, operation3, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) if err != nil { t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) return false, nil @@ -471,7 +471,7 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } @@ -500,7 +500,7 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, operation1, func(error) {}) + err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) if err != nil { t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) } @@ -522,28 +522,28 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { } } -func generateCallbackFunc(done chan<- interface{}) func() error { - return func() error { +func generateCallbackFunc(done chan<- interface{}) func() (error, error) { + return func() (error, error) { done <- true - return nil + return nil, nil } } -func generateWaitFunc(done <-chan interface{}) func() error { - return func() error { +func generateWaitFunc(done <-chan interface{}) func() (error, error) { + return func() (error, error) { <-done - return nil + return nil, nil } } -func generatePanicFunc() func() error { - return func() error { +func generatePanicFunc() func() (error, error) { + return func() (error, error) { panic("testing panic") } } -func generateNoopFunc() func() error { - return func() error { return nil } +func generateNoopFunc() func() (error, error) { + return func() (error, error) { return nil, nil } } func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go index 543067be6b75..7df9f43d79a9 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go @@ -571,30 +571,28 @@ func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, func (oe *operationExecutor) AttachVolume( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - attachFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_attach") return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, attachFunc, opCompleteFunc) + volumeToAttach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) DetachVolume( volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - detachFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_detach") return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, detachFunc, opCompleteFunc) + volumeToDetach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) VerifyVolumesAreAttached( @@ -661,7 +659,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( } for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode { - bulkVerifyVolumeFunc, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( + generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( pluginNodeVolumes, pluginName, volumeSpecMapByPlugin[pluginName], @@ -670,10 +668,9 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( glog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err) } - opCompleteFunc := util.OperationCompleteHook(pluginName, "verify_volumes_are_attached") // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, bulkVerifyVolumeFunc, opCompleteFunc) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) if err != nil { glog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -684,15 +681,14 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - volumesAreAttachedFunc, err := + generatedOperations, err := oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook("", "verify_volumes_are_attached_per_node") // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, volumesAreAttachedFunc, opCompleteFunc) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) } func (oe *operationExecutor) MountVolume( @@ -700,7 +696,7 @@ func (oe *operationExecutor) MountVolume( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error { - mountFunc, plugin, err := oe.operationGenerator.GenerateMountVolumeFunc( + generatedOperations, err := oe.operationGenerator.GenerateMountVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) if err != nil { return err @@ -715,16 +711,15 @@ func (oe *operationExecutor) MountVolume( } // TODO mount_device - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_mount") return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, mountFunc, opCompleteFunc) + volumeToMount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - unmountFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { return err @@ -734,42 +729,40 @@ func (oe *operationExecutor) UnmountVolume( // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) - opCompleteFunc := util.OperationCompleteHook(plugin, "volume_unmount") return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, unmountFunc, opCompleteFunc) + volumeToUnmount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmountDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - unmountDeviceFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "unmount_device") return oe.pendingOperations.Run( - deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc) + deviceToDetach.VolumeName, "" /* podName */, generatedOperations) } func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error { - expandFunc, pluginName, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) + generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) if err != nil { return err } uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) - opCompleteFunc := util.OperationCompleteHook(pluginName, "expand_volume") - return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc) + + return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) } func (oe *operationExecutor) MapVolume( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - mapFunc, plugin, err := oe.operationGenerator.GenerateMapVolumeFunc( + generatedOperations, err := oe.operationGenerator.GenerateMapVolumeFunc( waitForAttachTimeout, volumeToMount, actualStateOfWorld) if err != nil { return err @@ -785,15 +778,14 @@ func (oe *operationExecutor) MapVolume( podName = volumehelper.GetUniquePodName(volumeToMount.Pod) } - opCompleteFunc := util.OperationCompleteHook(plugin, "map_volume") return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, mapFunc, opCompleteFunc) + volumeToMount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmapVolume( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - unmapFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, actualStateOfWorld) if err != nil { return err @@ -803,16 +795,15 @@ func (oe *operationExecutor) UnmapVolume( // same volume in parallel podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) - opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_volume") return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, unmapFunc, opCompleteFunc) + volumeToUnmount.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) UnmapDevice( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) error { - unmapDeviceFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateUnmapDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) if err != nil { return err @@ -822,24 +813,22 @@ func (oe *operationExecutor) UnmapDevice( // the same volume in parallel podName := nestedpendingoperations.EmptyUniquePodName - opCompleteFunc := util.OperationCompleteHook(plugin, "unmap_device") return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, unmapDeviceFunc, opCompleteFunc) + deviceToDetach.VolumeName, podName, generatedOperations) } func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { - verifyControllerAttachedVolumeFunc, plugin, err := + generatedOperations, err := oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) if err != nil { return err } - opCompleteFunc := util.OperationCompleteHook(plugin, "verify_controller_attached_volume") return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc, opCompleteFunc) + volumeToMount.VolumeName, "" /* podName */, generatedOperations) } // VolumeStateHandler defines a set of operations for handling mount/unmount/detach/reconstruct volume-related operations diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go index 4e06b39616be..18e68a3ab0d5 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -350,87 +350,123 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera } } -func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { - return func() error { + resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginNane string, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, - actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (func() error, error) { - return func() error { + actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, }, nil } -func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } -func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) { - return func() error { +func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { startOperationAndBlock(fopg.ch, fopg.quit) - return nil - }, "", nil + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil } func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go index 2ff4b668f00b..a322ed4ca3d8 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) @@ -83,34 +84,34 @@ func NewOperationGenerator(kubeClient clientset.Interface, // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable type OperationGenerator interface { // Generates the MountVolume function needed to perform the mount of a volume plugin - GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (func() error, string, error) + GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error) // Generates the UnmountVolume function needed to perform the unmount of a volume plugin - GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the AttachVolume function needed to perform attach of a volume plugin - GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the DetachVolume function needed to perform the detach of a volume plugin - GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the VolumesAreAttached function needed to verify if volume plugins are attached - GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) + GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnMountDevice function needed to perform the unmount of a device - GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) + GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) // Generates the function needed to check if the attach_detach controller has attached the volume plugin - GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) + GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) // Generates the MapVolume function needed to perform the map of a volume plugin - GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnmapVolume function needed to perform the unmap of a volume plugin - GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) + GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Generates the UnmapDevice function needed to perform the unmap of a device - GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, string, error) + GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) // GetVolumePluginMgr returns volume plugin manager GetVolumePluginMgr() *volume.VolumePluginMgr @@ -118,15 +119,15 @@ type OperationGenerator interface { GenerateBulkVolumeVerifyFunc( map[types.NodeName][]*volume.Spec, string, - map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error) + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) - GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (func() error, string, error) + GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) } func (og *operationGenerator) GenerateVolumesAreAttachedFunc( attachedVolumes []AttachedVolume, nodeName types.NodeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong // to this type of plugin @@ -154,7 +155,7 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName } - return func() error { + volumesAreAttachedFunc := func() (error, error) { // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check // whether the volumes are still attached. @@ -195,7 +196,13 @@ func (og *operationGenerator) GenerateVolumesAreAttachedFunc( } } } - return nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: volumesAreAttachedFunc, + CompleteFunc: util.OperationCompleteHook("", "verify_volumes_are_attached_per_node"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil } @@ -203,9 +210,9 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginName string, volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { - return func() error { + bulkVolumeVerifyFunc := func() (error, error) { attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil || attachableVolumePlugin == nil { @@ -213,7 +220,7 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", pluginName, err) - return nil + return nil, nil } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() @@ -223,19 +230,19 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v", attachableVolumePlugin, newAttacherErr) - return nil + return nil, nil } bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) if !ok { glog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) - return nil + return nil, nil } attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) if bulkAttachErr != nil { glog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) - return nil + return nil, nil } for nodeName, volumeSpecs := range pluginNodeVolumes { @@ -260,26 +267,43 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( } } - return nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: bulkVolumeVerifyFunc, + CompleteFunc: util.OperationCompleteHook(pluginName, "verify_volumes_are_attached"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil + } func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { // Get attacher plugin + eventRecorderFunc := func(err *error) { + if *err != nil { + for _, pod := range volumeToAttach.ScheduledPods { + og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error()) + } + } + } + attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err) } volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() if newAttacherErr != nil { - return nil, attachableVolumePlugin.GetPluginName(), volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) + eventRecorderFunc(&err) + return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.NewAttacher failed", newAttacherErr) } - return func() error { + attachVolumeFunc := func() (error, error) { // Execute attach devicePath, attachErr := volumeAttacher.Attach( volumeToAttach.VolumeSpec, volumeToAttach.NodeName) @@ -298,11 +322,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) - for _, pod := range volumeToAttach.ScheduledPods { - og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - } - return detailedErr + return volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) } glog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", "")) @@ -312,11 +332,17 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToAttach.GenerateErrorDetailed("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) } - return nil - }, attachableVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: attachVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(attachableVolumePlugin.GetPluginName(), "volume_attach"), + }, nil } func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { @@ -326,7 +352,7 @@ func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { func (og *operationGenerator) GenerateDetachVolumeFunc( volumeToDetach AttachedVolume, verifySafeToDetach bool, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { var volumeName string var attachableVolumePlugin volume.AttachableVolumePlugin var pluginName string @@ -337,13 +363,13 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } volumeName, err = attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) } } else { // Get attacher plugin and the volumeName by splitting the volume unique name in case @@ -351,11 +377,11 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( // when a pod has been deleted during the controller downtime pluginName, volumeName, err = volumehelper.SplitUniqueName(volumeToDetach.VolumeName) if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) } attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err) } } @@ -365,10 +391,10 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, pluginName, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) + return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) } - return func() error { + getVolumePluginMgrFunc := func() (error, error) { var err error if verifySafeToDetach { err = og.verifyVolumeIsSafeToDetach(volumeToDetach) @@ -380,7 +406,7 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( // On failure, add volume back to ReportAsAttached list actualStateOfWorld.AddVolumeToReportAsAttached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return volumeToDetach.GenerateErrorDetailed("DetachVolume.Detach failed", err) + return volumeToDetach.GenerateError("DetachVolume.Detach failed", err) } glog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", "")) @@ -389,25 +415,33 @@ func (og *operationGenerator) GenerateDetachVolumeFunc( actualStateOfWorld.MarkVolumeAsDetached( volumeToDetach.VolumeName, volumeToDetach.NodeName) - return nil - }, pluginName, nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: getVolumePluginMgrFunc, + CompleteFunc: util.OperationCompleteHook(pluginName, "volume_detach"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateMountVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - isRemount bool) (func() error, string, error) { + isRemount bool) (volumetypes.GeneratedOperations, error) { // Get mounter plugin volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err) } affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin) if affinityErr != nil { - return nil, volumePlugin.GetPluginName(), affinityErr + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } volumeMounter, newMounterErr := volumePlugin.NewMounter( @@ -417,13 +451,15 @@ func (og *operationGenerator) GenerateMountVolumeFunc( if newMounterErr != nil { eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return nil, volumePlugin.GetPluginName(), detailedErr + return volumetypes.GeneratedOperations{}, detailedErr } mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin) if mountCheckError != nil { - return nil, volumePlugin.GetPluginName(), mountCheckError + eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.UnsupportedMountOption, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } // Get attacher, if possible @@ -440,7 +476,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup } - return func() error { + mountVolumeFunc := func() (error, error) { if volumeAttacher != nil { // Wait for attachable volumes to finish attaching glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath))) @@ -449,7 +485,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.WaitForAttach failed", err) + return volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -459,14 +495,14 @@ func (og *operationGenerator) GenerateMountVolumeFunc( resizeError := og.resizeFileSystem(volumeToMount, devicePath, volumePlugin.GetPluginName()) if resizeError != nil { - return volumeToMount.GenerateErrorDetailed("MountVolume.Resize failed", resizeError) + return volumeToMount.GenerateError("MountVolume.Resize failed", resizeError) } deviceMountPath, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.GetDeviceMountPath failed", err) + return volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err) } // Mount device to global mount path @@ -476,9 +512,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( deviceMountPath) if err != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.MountDevice failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath))) @@ -488,7 +522,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeName) if markDeviceMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) + return volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) } } @@ -497,9 +531,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( err = fmt.Errorf( "Verify that your node machine has the required components before attempting to mount this volume type. %s", canMountErr) - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.CanMount failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.CanMount failed", err) } } @@ -507,9 +539,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( mountErr := volumeMounter.SetUp(fsGroup) if mountErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) } simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "") @@ -532,11 +562,23 @@ func (og *operationGenerator) GenerateMountVolumeFunc( volumeToMount.VolumeGidValue) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) } - return nil - }, volumePlugin.GetPluginName(), nil + return nil, nil + } + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: mountVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_mount"), + }, nil } func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devicePath string, pluginName string) error { @@ -608,26 +650,26 @@ func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devi func (og *operationGenerator) GenerateUnmountVolumeFunc( volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get mountable plugin volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) if err != nil || volumePlugin == nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) } volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) if newUnmounterErr != nil { - return nil, volumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) } - return func() error { + unmountVolumeFunc := func() (error, error) { // Execute unmount unmountErr := volumeUnmounter.TearDown() if unmountErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmountVolume.TearDown failed", unmountErr) + return volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) } glog.Infof( @@ -648,37 +690,43 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( glog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error()) } - return nil - }, volumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmountVolumeFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_unmount"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateUnmountDeviceFunc( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) (func() error, string, error) { + mounter mount.Interface) (volumetypes.GeneratedOperations, error) { // Get attacher plugin attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec) if err != nil || attachableVolumePlugin == nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindAttachablePluginBySpec failed", err) } volumeDetacher, err := attachableVolumePlugin.NewDetacher() if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDetacher failed", err) } volumeAttacher, err := attachableVolumePlugin.NewAttacher() if err != nil { - return nil, attachableVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewAttacher failed", err) } - return func() error { + unmountDeviceFunc := func() (error, error) { deviceMountPath, err := volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("GetDeviceMountPath failed", err) + return deviceToDetach.GenerateError("GetDeviceMountPath failed", err) } refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) @@ -686,13 +734,13 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( if err == nil { err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) } - return deviceToDetach.GenerateErrorDetailed("GetDeviceMountRefs check failed", err) + return deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err) } // Execute unmount unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmountDevice failed", unmountDeviceErr) + return deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) } // Before logging that UnmountDevice succeeded and moving on, // use mounter.PathIsDevice to check if the path is a device, @@ -700,27 +748,33 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter) if deviceOpenedErr != nil { - return deviceOpenedErr + return nil, deviceOpenedErr } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateErrorDetailed( + return deviceToDetach.GenerateError( "UnmountDevice failed", fmt.Errorf("the device is in use when it was no longer expected to be in use")) } - glog.Infof(deviceToDetach.GenerateMsgDetailed("UnmountDevice succeeded", "")) + glog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) // Update actual state of world markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) } - return nil - }, attachableVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmountDeviceFunc, + CompleteFunc: util.OperationCompleteHook(attachableVolumePlugin.GetPluginName(), "unmount_device"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } // GenerateMapVolumeFunc marks volume as mounted based on following steps. @@ -731,25 +785,27 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // device map path. Once symbolic links are created, take fd lock by // loopback for the device to avoid silent volume replacement. This lock // will be realased once no one uses the device. -// If all steps are completed, the volume is marked as unmounted. +// If all steps are completed, the volume is marked as mounted. func (og *operationGenerator) GenerateMapVolumeFunc( waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get block volume mapper plugin var blockVolumeMapper volume.BlockVolumeMapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) if err != nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err) } if blockVolumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } affinityErr := checkNodeAffinity(og, volumeToMount, blockVolumePlugin) if affinityErr != nil { - return nil, blockVolumePlugin.GetPluginName(), affinityErr + eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) + return volumetypes.GeneratedOperations{}, detailedErr } blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( volumeToMount.VolumeSpec, @@ -758,7 +814,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( if newMapperErr != nil { eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return nil, blockVolumePlugin.GetPluginName(), detailedErr + return volumetypes.GeneratedOperations{}, detailedErr } // Get attacher, if possible @@ -769,7 +825,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeAttacher, _ = attachableVolumePlugin.NewAttacher() } - return func() error { + mapVolumeFunc := func() (error, error) { var devicePath string if volumeAttacher != nil { // Wait for attachable volumes to finish attaching @@ -779,7 +835,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.WaitForAttach failed", err) + return volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err) } glog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) @@ -789,23 +845,21 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeName) if markDeviceMappedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) + return volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) } } // A plugin doesn't have attacher also needs to map device to global map path with SetUpDevice() pluginDevicePath, mapErr := blockVolumeMapper.SetUpDevice() if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUp failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.SetUp failed", mapErr) } // Update devicePath for none attachable plugin case if len(devicePath) == 0 { if len(pluginDevicePath) != 0 { devicePath = pluginDevicePath } else { - return volumeToMount.GenerateErrorDetailed("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) + return volumeToMount.GenerateError("MapVolume failed", fmt.Errorf("Device path of the volume is empty")) } } // Set up global map path under the given plugin directory using symbolic link @@ -813,14 +867,12 @@ func (og *operationGenerator) GenerateMapVolumeFunc( blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.GetDeviceMountPath failed", err) + return volumeToMount.GenerateError("MapVolume.GetDeviceMountPath failed", err) } mapErr = og.blkUtil.MapDevice(devicePath, globalMapPath, string(volumeToMount.Pod.UID)) if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) } // Device mapping for global map path succeeded simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath)) @@ -833,9 +885,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( mapErr = og.blkUtil.MapDevice(devicePath, volumeMapPath, volName) if mapErr != nil { // On failure, return error. Caller will log and retry. - eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) - return detailedErr + return volumeToMount.GenerateError("MapVolume.MapDevice failed", mapErr) } // Take filedescriptor lock to keep a block device opened. Otherwise, there is a case @@ -844,7 +894,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( // for the block device is required. _, err = og.blkUtil.AttachFileDevice(devicePath) if err != nil { - return volumeToMount.GenerateErrorDetailed("MapVolume.AttachFileDevice failed", err) + return volumeToMount.GenerateError("MapVolume.AttachFileDevice failed", err) } // Device mapping for pod device map path succeeded @@ -864,11 +914,23 @@ func (og *operationGenerator) GenerateMapVolumeFunc( volumeToMount.VolumeGidValue) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) + return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: mapVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "map_volume"), + }, nil } // GenerateUnmapVolumeFunc marks volume as unmonuted based on following steps. @@ -877,32 +939,32 @@ func (og *operationGenerator) GenerateMapVolumeFunc( // If all steps are completed, the volume is marked as unmounted. func (og *operationGenerator) GenerateUnmapVolumeFunc( volumeToUnmount MountedVolume, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { // Get block volume unmapper plugin var blockVolumeUnmapper volume.BlockVolumeUnmapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName) if err != nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err) } if blockVolumePlugin == nil { - return nil, "", volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) if newUnmapperErr != nil { - return nil, blockVolumePlugin.GetPluginName(), volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) + return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) } - return func() error { + unmapVolumeFunc := func() (error, error) { // Try to unmap volumeName symlink under pod device map path dir // pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath() unmapDeviceErr := og.blkUtil.UnmapDevice(podDeviceUnmapPath, volName) if unmapDeviceErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.UnmapDevice on pod device map path failed", unmapDeviceErr) + return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on pod device map path failed", unmapDeviceErr) } // Try to unmap podUID symlink under global map path dir // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID} @@ -910,12 +972,12 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.GetGlobalUnmapPath failed", err) + return volumeToUnmount.GenerateError("UnmapVolume.GetGlobalUnmapPath failed", err) } unmapDeviceErr = og.blkUtil.UnmapDevice(globalUnmapPath, string(volumeToUnmount.PodUID)) if unmapDeviceErr != nil { // On failure, return error. Caller will log and retry. - return volumeToUnmount.GenerateErrorDetailed("UnmapVolume.UnmapDevice on global map path failed", unmapDeviceErr) + return volumeToUnmount.GenerateError("UnmapVolume.UnmapDevice on global map path failed", unmapDeviceErr) } glog.Infof( @@ -936,8 +998,14 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( glog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error()) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmapVolumeFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "unmap_volume"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } // GenerateUnmapDeviceFunc marks device as unmounted based on following steps. @@ -953,56 +1021,56 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( func (og *operationGenerator) GenerateUnmapDeviceFunc( deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, - mounter mount.Interface) (func() error, string, error) { + mounter mount.Interface) (volumetypes.GeneratedOperations, error) { // Get block volume mapper plugin var blockVolumeMapper volume.BlockVolumeMapper blockVolumePlugin, err := og.volumePluginMgr.FindMapperPluginBySpec(deviceToDetach.VolumeSpec) if err != nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed", err) } if blockVolumePlugin == nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) } blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( deviceToDetach.VolumeSpec, nil, /* Pod */ volume.VolumeOptions{}) if newMapperErr != nil { - return nil, "", deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewBlockVolumeMapper initialization failed", newMapperErr) } blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( string(deviceToDetach.VolumeName), "" /* podUID */) if newUnmapperErr != nil { - return nil, blockVolumePlugin.GetPluginName(), deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) + return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) } - return func() error { + unmapDeviceFunc := func() (error, error) { // Search under globalMapPath dir if all symbolic links from pods have been removed already. // If symbolick links are there, pods may still refer the volume. globalMapPath, err := blockVolumeMapper.GetGlobalMapPath(deviceToDetach.VolumeSpec) if err != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.GetGlobalMapPath failed", err) + return deviceToDetach.GenerateError("UnmapDevice.GetGlobalMapPath failed", err) } refs, err := og.blkUtil.GetDeviceSymlinkRefs(deviceToDetach.DevicePath, globalMapPath) if err != nil { - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.GetDeviceSymlinkRefs check failed", err) + return deviceToDetach.GenerateError("UnmapDevice.GetDeviceSymlinkRefs check failed", err) } if len(refs) > 0 { err = fmt.Errorf("The device %q is still referenced from other Pods %v", globalMapPath, refs) - return deviceToDetach.GenerateErrorDetailed("UnmapDevice failed", err) + return deviceToDetach.GenerateError("UnmapDevice failed", err) } // Execute tear down device unmapErr := blockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath) if unmapErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.TearDownDevice failed", unmapErr) + return deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr) } // Plugin finished TearDownDevice(). Now globalMapPath dir and plugin's stored data @@ -1010,7 +1078,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath) if removeMapPathErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("UnmapDevice failed", removeMapPathErr) + return deviceToDetach.GenerateError("UnmapDevice failed", removeMapPathErr) } // The block volume is not referenced from Pods. Release file descriptor lock. @@ -1021,7 +1089,7 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( } else { err = og.blkUtil.RemoveLoopDevice(loopPath) if err != nil { - return deviceToDetach.GenerateErrorDetailed("UnmapDevice.AttachFileDevice failed", err) + return deviceToDetach.GenerateError("UnmapDevice.AttachFileDevice failed", err) } } @@ -1031,11 +1099,11 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( // else on the system. Retry if it returns true. deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, mounter) if deviceOpenedErr != nil { - return deviceOpenedErr + return nil, deviceOpenedErr } // The device is still in use elsewhere. Caller will log and retry. if deviceOpened { - return deviceToDetach.GenerateErrorDetailed( + return deviceToDetach.GenerateError( "UnmapDevice failed", fmt.Errorf("the device is in use when it was no longer expected to be in use")) } @@ -1047,24 +1115,30 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( deviceToDetach.VolumeName) if markDeviceUnmountedErr != nil { // On failure, return error. Caller will log and retry. - return deviceToDetach.GenerateErrorDetailed("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) + return deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) } - return nil - }, blockVolumePlugin.GetPluginName(), nil + return nil, nil + } + + return volumetypes.GeneratedOperations{ + OperationFunc: unmapDeviceFunc, + CompleteFunc: util.OperationCompleteHook(blockVolumePlugin.GetPluginName(), "unmap_device"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil } func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount VolumeToMount, nodeName types.NodeName, - actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, string, error) { + actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { - return nil, "", volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) } - return func() error { + verifyControllerAttachedVolumeFunc := func() (error, error) { if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is @@ -1074,10 +1148,10 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) } - return nil + return nil, nil } if !volumeToMount.ReportedInUse { @@ -1087,19 +1161,19 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( // periodically by kubelet, so it may take as much as 10 seconds // before this clears. // Issue #28141 to enable on demand status updates. - return volumeToMount.GenerateErrorDetailed("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) + return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) } // Fetch current node object node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{}) if fetchErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) } if node == nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed( + return volumeToMount.GenerateError( "VerifyControllerAttachedVolume failed", fmt.Errorf("Node object retrieved from API server is nil")) } @@ -1111,15 +1185,22 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( glog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath))) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) + return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) } - return nil + return nil, nil } } // Volume not attached, return error. Caller will log and retry. - return volumeToMount.GenerateErrorDetailed("Volume not attached according to node status", nil) - }, volumePlugin.GetPluginName(), nil + return volumeToMount.GenerateError("Volume not attached according to node status", nil) + } + + return volumetypes.GeneratedOperations{ + OperationFunc: verifyControllerAttachedVolumeFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "verify_controller_attached_volume"), + EventRecorderFunc: nil, // nil because we do not want to generate event on error + }, nil + } func (og *operationGenerator) verifyVolumeIsSafeToDetach( @@ -1158,17 +1239,17 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( func (og *operationGenerator) GenerateExpandVolumeFunc( pvcWithResizeRequest *expandcache.PVCWithResizeRequest, - resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { + resizeMap expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) { volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false) volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) if err != nil { - return nil, "", fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) + return volumetypes.GeneratedOperations{}, fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) } - expandFunc := func() error { + expandVolumeFunc := func() (error, error) { newSize := pvcWithResizeRequest.ExpectedSize pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage] if pvSize.Cmp(newSize) < 0 { @@ -1178,9 +1259,8 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( pvcWithResizeRequest.CurrentSize) if expandErr != nil { - glog.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, expandErr.Error()) - return expandErr + detailedErr := fmt.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) + return detailedErr, detailedErr } glog.Infof("ExpandVolume succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) newSize = updatedSize @@ -1190,9 +1270,8 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize) if updateErr != nil { - glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, updateErr.Error()) - return updateErr + detailedErr := fmt.Errorf("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) + return detailedErr, detailedErr } glog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", pvcWithResizeRequest.QualifiedName()) } @@ -1205,24 +1284,32 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize) if err != nil { - glog.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) - og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, err.Error()) - return err + detailedErr := fmt.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) + return detailedErr, detailedErr } } - return nil + return nil, nil } - return expandFunc, volumePlugin.GetPluginName(), nil + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: expandVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "expand_volume"), + }, nil } func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := volume.MountOptionFromSpec(volumeToMount.VolumeSpec) if len(mountOptions) > 0 && !plugin.SupportsMountOption() { - eventErr, detailedErr := volumeToMount.GenerateError("Mount options are not supported for this volume type", nil) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.UnsupportedMountOption, eventErr.Error()) - return detailedErr + return fmt.Errorf("Mount options are not supported for this volume type") } return nil } @@ -1238,14 +1325,11 @@ func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount, plug if pv != nil { nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels() if err != nil { - return volumeToMount.GenerateErrorDetailed("Error getting node labels", err) + return err } - err = util.CheckNodeAffinity(pv, nodeLabels) if err != nil { - eventErr, detailedErr := volumeToMount.GenerateError("Storage node affinity check failed", err) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) - return detailedErr + return err } } return nil diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/types/types.go b/vendor/k8s.io/kubernetes/pkg/volume/util/types/types.go index 9375ad6750c3..9815545ff609 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/types/types.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/types/types.go @@ -24,3 +24,11 @@ type UniquePodName types.UID // UniquePVCName defines the type to key pvc off type UniquePVCName types.UID + +// GeneratedOperations contains the operation that is created as well as +// supporting functions required for the operation executor +type GeneratedOperations struct { + OperationFunc func() (eventErr error, detailedErr error) + EventRecorderFunc func(*error) + CompleteFunc func(*error) +}