Skip to content

Commit

Permalink
Add pausing deployments during upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
tom1299 committed Feb 18, 2025
1 parent 574129d commit 8359d87
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 0 deletions.
78 changes: 78 additions & 0 deletions internal/pkg/handler/pause_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package handler

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/pkg/kube"
app "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func PauseDeployment(deployment *app.Deployment, clients kube.Clients, deploymentName, namespace, pauseIntervalValue string) error {
pauseDuration, err := ParsePauseDuration(pauseIntervalValue)
if err != nil {
return err
}

if !deployment.Spec.Paused {
deployment.Spec.Paused = true
logrus.Infof("Pausing Deployment '%s' in namespace '%s' for %s seconds", deploymentName, namespace, pauseDuration)

if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
deployment.Annotations[options.PauseDeploymentTimeAnnotation] = time.Now().Format(time.RFC3339)

CreateResumeTimer(deployment, clients, deploymentName, namespace, pauseDuration)
} else {
logrus.Infof("Deployment '%s' in namespace '%s' is already paused", deploymentName, namespace)
}
return nil
}

func CreateResumeTimer(deployment *app.Deployment, clients kube.Clients, deploymentName, namespace string, pauseDuration time.Duration) {
time.AfterFunc(pauseDuration, func() {
ResumeDeployment(deploymentName, namespace, clients)
})
}

func ResumeDeployment(deploymentName, namespace string, clients kube.Clients) {
deployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
return
}

if !deployment.Spec.Paused {
logrus.Infof("Deployment '%s' in namespace '%s' not paused. Skipping resume", deploymentName, namespace)
return
}

pausedAtAnnotationValue := deployment.Annotations[options.PauseDeploymentTimeAnnotation]
if pausedAtAnnotationValue == "" {
logrus.Infof("Deployment '%s' in namespace '%s' was not paused by Reloader. Skipping resume", deploymentName, namespace)
return
}

deployment.Spec.Paused = false
delete(deployment.Annotations, options.PauseDeploymentTimeAnnotation)

_, err = clients.KubernetesClient.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
logrus.Errorf("Failed to resume deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
}

logrus.Infof("Successfully resumed deployment '%s' in namespace '%s'", deploymentName, namespace)
}

func ParsePauseDuration(pauseIntervalValue string) (time.Duration, error) {
pauseDuration, err := time.ParseDuration(pauseIntervalValue)
if err != nil {
logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseIntervalValue, err)
return 0, err
}
return pauseDuration, nil
}
21 changes: 21 additions & 0 deletions internal/pkg/handler/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
app "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -219,6 +220,7 @@ func PerformAction(clients kube.Clients, config util.Config, upgradeFuncs callba
typedAutoAnnotationEnabledValue, foundTypedAuto := annotations[config.TypedAutoAnnotation]
excludeConfigmapAnnotationValue, foundExcludeConfigmap := annotations[options.ConfigmapExcludeReloaderAnnotation]
excludeSecretAnnotationValue, foundExcludeSecret := annotations[options.SecretExcludeReloaderAnnotation]
pauseInterval, foundPauseInterval := annotations[options.PauseDeploymentAnnotation]

if !found && !foundAuto && !foundTypedAuto && !foundSearchAnn {
annotations = upgradeFuncs.PodAnnotationsFunc(i)
Expand Down Expand Up @@ -274,6 +276,25 @@ func PerformAction(clients kube.Clients, config util.Config, upgradeFuncs callba
}

if result == constants.Updated {

if foundPauseInterval {

accessor, err := meta.Accessor(i)
if err != nil {
return err
}

itemName := accessor.GetName()
itemNamespace := accessor.GetNamespace()

deployment, ok := i.(*app.Deployment)
if !ok {
logrus.Warnf("Annotation '%s' only applicable for deployments", options.PauseDeploymentAnnotation)
} else {
PauseDeployment(deployment, clients, itemName, itemNamespace, pauseInterval)

Check failure on line 294 in internal/pkg/handler/upgrade.go

View workflow job for this annotation

GitHub Actions / Build

Error return value is not checked (errcheck)
}
}

accessor, err := meta.Accessor(i)
if err != nil {
return err
Expand Down
142 changes: 142 additions & 0 deletions internal/pkg/handler/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/testutil"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -49,6 +50,7 @@ var (
arsConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5)
arsSecretWithExcludeSecretAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
arsConfigmapWithExcludeConfigMapAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
arsConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5)

ersNamespace = "test-handler-" + testutil.RandSeq(5)
ersConfigmapName = "testconfigmap-handler-" + testutil.RandSeq(5)
Expand All @@ -72,6 +74,7 @@ var (
ersConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5)
ersSecretWithSecretExcludeAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
ersConfigmapWithConfigMapExcludeAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
ersConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5)
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -200,6 +203,12 @@ func setupArs() {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating configmap for testing pausing deployments
_, err = testutil.CreateConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment, "www.google.com")
if err != nil {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating secret used with secret auto annotation
_, err = testutil.CreateSecret(clients.KubernetesClient, arsNamespace, arsSecretWithExcludeSecretAnnotation, data)
if err != nil {
Expand Down Expand Up @@ -421,6 +430,12 @@ func setupArs() {
if err != nil {
logrus.Errorf("Error in Deployment with both annotations: %v", err)
}

// Creating Deployment with pause annotation
_, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, arsConfigmapWithPausedDeployment, arsNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false)
if err != nil {
logrus.Errorf("Error in Deployment with configmap creation: %v", err)
}
}

func teardownArs() {
Expand Down Expand Up @@ -622,6 +637,12 @@ func teardownArs() {
logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError)
}

// Deleting Deployment with pasuse annotation
deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment)
if deploymentError != nil {
logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError)
}

// Deleting Configmap
err := testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapName)
if err != nil {
Expand Down Expand Up @@ -735,6 +756,12 @@ func teardownArs() {
logrus.Errorf("Error while deleting the configmap used with configmap auto annotations: %v", err)
}

// Deleting configmap for testing pausing deployments
err = testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment)
if err != nil {
logrus.Errorf("Error while deleting the configmap: %v", err)
}

// Deleting namespace
testutil.DeleteNamespace(arsNamespace, clients.KubernetesClient)

Expand Down Expand Up @@ -794,6 +821,12 @@ func setupErs() {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating configmap for testing pausing deployments
_, err = testutil.CreateConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment, "www.google.com")
if err != nil {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating secret
_, err = testutil.CreateSecret(clients.KubernetesClient, ersNamespace, ersSecretWithInitEnv, data)
if err != nil {
Expand Down Expand Up @@ -970,6 +1003,12 @@ func setupErs() {
logrus.Errorf("Error in Deployment with configmap and with configmap exclude annotation: %v", err)
}

// Creating Deployment with pause annotation
_, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, ersConfigmapWithPausedDeployment, ersNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false)
if err != nil {
logrus.Errorf("Error in Deployment with configmap creation: %v", err)
}

// Creating DaemonSet with configmap
_, err = testutil.CreateDaemonSet(clients.KubernetesClient, ersConfigmapName, ersNamespace, true)
if err != nil {
Expand Down Expand Up @@ -1254,6 +1293,12 @@ func teardownErs() {
logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError)
}

// Deleting Deployment for testing pausing deployments
deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment)
if deploymentError != nil {
logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError)
}

// Deleting Configmap
err := testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapName)
if err != nil {
Expand Down Expand Up @@ -1367,6 +1412,12 @@ func teardownErs() {
logrus.Errorf("Error while deleting the configmap used with configmap exclude annotation: %v", err)
}

// Deleting ConfigMap for testins pausing deployments
err = testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment)
if err != nil {
logrus.Errorf("Error while deleting the configmap: %v", err)
}

// Deleting namespace
testutil.DeleteNamespace(ersNamespace, clients.KubernetesClient)

Expand Down Expand Up @@ -3548,3 +3599,94 @@ func TestFailedRollingUpgradeUsingErs(t *testing.T) {
t.Errorf("Counter by namespace was not increased")
}
}

func TestPausingDeploymentUsingErs(t *testing.T) {
options.ReloadStrategy = constants.EnvVarsReloadStrategy
testPausingDeployment(t, options.ReloadStrategy, ersConfigmapWithPausedDeployment, ersNamespace)
}

func TestPausingDeploymentUsingArs(t *testing.T) {
options.ReloadStrategy = constants.AnnotationsReloadStrategy
testPausingDeployment(t, options.ReloadStrategy, arsConfigmapWithPausedDeployment, arsNamespace)
}

func testPausingDeployment(t *testing.T, reloadStrategy string, testName string, namespace string) {
options.ReloadStrategy = reloadStrategy
envVarPostfix := constants.ConfigmapEnvVarPostfix

shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause.stakater.com")
config := getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation)
deploymentFuncs := GetDeploymentRollingUpgradeFuncs()
collectors := getCollectors()

_ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy)

if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 1 {
t.Errorf("Counter was not increased")
}

if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 1 {
t.Errorf("Counter by namespace was not increased")
}

logrus.Infof("Verifying deployment has been paused")
items := deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err := isDeploymentPaused(items, testName)
if err != nil {
t.Errorf(err.Error())

Check failure on line 3636 in internal/pkg/handler/upgrade_test.go

View workflow job for this annotation

GitHub Actions / Build

SA1006: printf-style function with dynamic format string and no further arguments should use print-style function instead (staticcheck)
}
if !deploymentPaused {
t.Errorf("Deployment has not been paused")
}

shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause-changed.stakater.com")
config = getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation)

_ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy)

if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 2 {
t.Errorf("Counter was not increased")
}

if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 2 {
t.Errorf("Counter by namespace was not increased")
}

logrus.Infof("Verifying deployment is still paused")
items = deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err = isDeploymentPaused(items, testName)
if err != nil {
t.Errorf("%s", err.Error())
}
if !deploymentPaused {
t.Errorf("Deployment should still be paused")
}

logrus.Infof("Verifying deployment has been resumed after pause interval")
time.Sleep(11 * time.Second)
items = deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err = isDeploymentPaused(items, testName)
if err != nil {
t.Errorf("%s", err.Error())
}
if deploymentPaused {
t.Errorf("Deployment should have been resumed after pause interval")
}
}

func isDeploymentPaused(deployments []runtime.Object, deploymentName string) (bool, error) {
for _, deployment := range deployments {
accessor, err := meta.Accessor(deployment)
if err != nil {
return false, fmt.Errorf("Error getting accessor for item: %v", err)
}
if accessor.GetName() == deploymentName {
deploymentObj, ok := deployment.(*appsv1.Deployment)
if !ok {
return false, fmt.Errorf("Failed to cast to Deployment")
}
return deploymentObj.Spec.Paused, nil
}
}
return false, nil
}
6 changes: 6 additions & 0 deletions internal/pkg/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
SearchMatchAnnotation = "reloader.stakater.com/match"
// RolloutStrategyAnnotation is an annotation to define rollout update strategy
RolloutStrategyAnnotation = "reloader.stakater.com/rollout-strategy"
// PauseDeploymentAnnotation is an annotation to define the time period to pause a deployment after
// a configmap/secret change has been detected. Valid values are described here: https://pkg.go.dev/time#ParseDuration
// only positive values are allowed
PauseDeploymentAnnotation = "deployment.reloader.stakater.com/pause-period"
// Annotation set by reloader to indicate that the deployment has been paused
PauseDeploymentTimeAnnotation = "deployment.reloader.stakater.com/paused-at"
// LogFormat is the log format to use (json, or empty string for default)
LogFormat = ""
// LogLevel is the log level to use (trace, debug, info, warning, error, fatal and panic)
Expand Down
20 changes: 20 additions & 0 deletions internal/pkg/testutil/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,26 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp
return deployment, err
}

// CreateDeployment creates a deployment in given namespace and returns the Deployment
func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) {
logrus.Infof("Creating Deployment")
deploymentClient := client.AppsV1().Deployments(namespace)
var deploymentObj *appsv1.Deployment
if volumeMount {
deploymentObj = GetDeployment(namespace, deploymentName)
} else {
deploymentObj = GetDeploymentWithEnvVars(namespace, deploymentName)
}

for annotationKey, annotationValue := range additionalAnnotations {
deploymentObj.Annotations[annotationKey] = annotationValue
}

deployment, err := deploymentClient.Create(context.TODO(), deploymentObj, metav1.CreateOptions{})
time.Sleep(3 * time.Second)
return deployment, err
}

// CreateDeploymentConfig creates a deploymentConfig in given namespace and returns the DeploymentConfig
func CreateDeploymentConfig(client appsclient.Interface, deploymentName string, namespace string, volumeMount bool) (*openshiftv1.DeploymentConfig, error) {
logrus.Infof("Creating DeploymentConfig")
Expand Down

0 comments on commit 8359d87

Please sign in to comment.