Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add pausing deployment rollouts #838

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions internal/pkg/handler/pause_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package handler

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/pkg/kube"
app "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func PauseDeployment(deployment *app.Deployment, clients kube.Clients, deploymentName, namespace, pauseIntervalValue string) error {
pauseDuration, err := ParsePauseDuration(pauseIntervalValue)
if err != nil {
return err
}

if !deployment.Spec.Paused {
deployment.Spec.Paused = true
logrus.Infof("Pausing Deployment '%s' in namespace '%s' for %s seconds", deploymentName, namespace, pauseDuration)

if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
deployment.Annotations[options.PauseDeploymentTimeAnnotation] = time.Now().Format(time.RFC3339)

CreateResumeTimer(deployment, clients, deploymentName, namespace, pauseDuration)
} else {
logrus.Infof("Deployment '%s' in namespace '%s' is already paused", deploymentName, namespace)
}
return nil
}

func CreateResumeTimer(deployment *app.Deployment, clients kube.Clients, deploymentName, namespace string, pauseDuration time.Duration) {
time.AfterFunc(pauseDuration, func() {
ResumeDeployment(deploymentName, namespace, clients)
})
}

func ResumeDeployment(deploymentName, namespace string, clients kube.Clients) {
deployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
logrus.Errorf("Failed to get deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
return
}

if !deployment.Spec.Paused {
logrus.Infof("Deployment '%s' in namespace '%s' not paused. Skipping resume", deploymentName, namespace)
return
}

pausedAtAnnotationValue := deployment.Annotations[options.PauseDeploymentTimeAnnotation]
if pausedAtAnnotationValue == "" {
logrus.Infof("Deployment '%s' in namespace '%s' was not paused by Reloader. Skipping resume", deploymentName, namespace)
return
}

deployment.Spec.Paused = false
delete(deployment.Annotations, options.PauseDeploymentTimeAnnotation)

_, err = clients.KubernetesClient.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
logrus.Errorf("Failed to resume deployment '%s' in namespace '%s': %v", deploymentName, namespace, err)
}

logrus.Infof("Successfully resumed deployment '%s' in namespace '%s'", deploymentName, namespace)
}

func ParsePauseDuration(pauseIntervalValue string) (time.Duration, error) {
pauseDuration, err := time.ParseDuration(pauseIntervalValue)
if err != nil {
logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseIntervalValue, err)
return 0, err
}
return pauseDuration, nil
}
25 changes: 25 additions & 0 deletions internal/pkg/handler/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
app "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -219,6 +220,7 @@ func PerformAction(clients kube.Clients, config util.Config, upgradeFuncs callba
typedAutoAnnotationEnabledValue, foundTypedAuto := annotations[config.TypedAutoAnnotation]
excludeConfigmapAnnotationValue, foundExcludeConfigmap := annotations[options.ConfigmapExcludeReloaderAnnotation]
excludeSecretAnnotationValue, foundExcludeSecret := annotations[options.SecretExcludeReloaderAnnotation]
pauseInterval, foundPauseInterval := annotations[options.PauseDeploymentAnnotation]

if !found && !foundAuto && !foundTypedAuto && !foundSearchAnn {
annotations = upgradeFuncs.PodAnnotationsFunc(i)
Expand Down Expand Up @@ -274,6 +276,29 @@ func PerformAction(clients kube.Clients, config util.Config, upgradeFuncs callba
}

if result == constants.Updated {

if foundPauseInterval {

accessor, err := meta.Accessor(i)
if err != nil {
return err
}

itemName := accessor.GetName()
itemNamespace := accessor.GetNamespace()

deployment, ok := i.(*app.Deployment)
if !ok {
logrus.Warnf("Annotation '%s' only applicable for deployments", options.PauseDeploymentAnnotation)
} else {
err = PauseDeployment(deployment, clients, itemName, itemNamespace, pauseInterval)
if err != nil {
logrus.Errorf("Failed to pause deployment '%s' in namespace '%s': %v", itemName, itemNamespace, err)
return err
}
}
}

accessor, err := meta.Accessor(i)
if err != nil {
return err
Expand Down
142 changes: 142 additions & 0 deletions internal/pkg/handler/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/testutil"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -49,6 +50,7 @@ var (
arsConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5)
arsSecretWithExcludeSecretAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
arsConfigmapWithExcludeConfigMapAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
arsConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5)

ersNamespace = "test-handler-" + testutil.RandSeq(5)
ersConfigmapName = "testconfigmap-handler-" + testutil.RandSeq(5)
Expand All @@ -72,6 +74,7 @@ var (
ersConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5)
ersSecretWithSecretExcludeAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
ersConfigmapWithConfigMapExcludeAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5)
ersConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5)
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -200,6 +203,12 @@ func setupArs() {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating configmap for testing pausing deployments
_, err = testutil.CreateConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment, "www.google.com")
if err != nil {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating secret used with secret auto annotation
_, err = testutil.CreateSecret(clients.KubernetesClient, arsNamespace, arsSecretWithExcludeSecretAnnotation, data)
if err != nil {
Expand Down Expand Up @@ -421,6 +430,12 @@ func setupArs() {
if err != nil {
logrus.Errorf("Error in Deployment with both annotations: %v", err)
}

// Creating Deployment with pause annotation
_, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, arsConfigmapWithPausedDeployment, arsNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false)
if err != nil {
logrus.Errorf("Error in Deployment with configmap creation: %v", err)
}
}

func teardownArs() {
Expand Down Expand Up @@ -622,6 +637,12 @@ func teardownArs() {
logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError)
}

// Deleting Deployment with pasuse annotation
deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment)
if deploymentError != nil {
logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError)
}

// Deleting Configmap
err := testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapName)
if err != nil {
Expand Down Expand Up @@ -735,6 +756,12 @@ func teardownArs() {
logrus.Errorf("Error while deleting the configmap used with configmap auto annotations: %v", err)
}

// Deleting configmap for testing pausing deployments
err = testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment)
if err != nil {
logrus.Errorf("Error while deleting the configmap: %v", err)
}

// Deleting namespace
testutil.DeleteNamespace(arsNamespace, clients.KubernetesClient)

Expand Down Expand Up @@ -794,6 +821,12 @@ func setupErs() {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating configmap for testing pausing deployments
_, err = testutil.CreateConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment, "www.google.com")
if err != nil {
logrus.Errorf("Error in configmap creation: %v", err)
}

// Creating secret
_, err = testutil.CreateSecret(clients.KubernetesClient, ersNamespace, ersSecretWithInitEnv, data)
if err != nil {
Expand Down Expand Up @@ -970,6 +1003,12 @@ func setupErs() {
logrus.Errorf("Error in Deployment with configmap and with configmap exclude annotation: %v", err)
}

// Creating Deployment with pause annotation
_, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, ersConfigmapWithPausedDeployment, ersNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false)
if err != nil {
logrus.Errorf("Error in Deployment with configmap creation: %v", err)
}

// Creating DaemonSet with configmap
_, err = testutil.CreateDaemonSet(clients.KubernetesClient, ersConfigmapName, ersNamespace, true)
if err != nil {
Expand Down Expand Up @@ -1254,6 +1293,12 @@ func teardownErs() {
logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError)
}

// Deleting Deployment for testing pausing deployments
deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment)
if deploymentError != nil {
logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError)
}

// Deleting Configmap
err := testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapName)
if err != nil {
Expand Down Expand Up @@ -1367,6 +1412,12 @@ func teardownErs() {
logrus.Errorf("Error while deleting the configmap used with configmap exclude annotation: %v", err)
}

// Deleting ConfigMap for testins pausing deployments
err = testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment)
if err != nil {
logrus.Errorf("Error while deleting the configmap: %v", err)
}

// Deleting namespace
testutil.DeleteNamespace(ersNamespace, clients.KubernetesClient)

Expand Down Expand Up @@ -3548,3 +3599,94 @@ func TestFailedRollingUpgradeUsingErs(t *testing.T) {
t.Errorf("Counter by namespace was not increased")
}
}

func TestPausingDeploymentUsingErs(t *testing.T) {
options.ReloadStrategy = constants.EnvVarsReloadStrategy
testPausingDeployment(t, options.ReloadStrategy, ersConfigmapWithPausedDeployment, ersNamespace)
}

func TestPausingDeploymentUsingArs(t *testing.T) {
options.ReloadStrategy = constants.AnnotationsReloadStrategy
testPausingDeployment(t, options.ReloadStrategy, arsConfigmapWithPausedDeployment, arsNamespace)
}

func testPausingDeployment(t *testing.T, reloadStrategy string, testName string, namespace string) {
options.ReloadStrategy = reloadStrategy
envVarPostfix := constants.ConfigmapEnvVarPostfix

shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause.stakater.com")
config := getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation)
deploymentFuncs := GetDeploymentRollingUpgradeFuncs()
collectors := getCollectors()

_ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy)

if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 1 {
t.Errorf("Counter was not increased")
}

if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 1 {
t.Errorf("Counter by namespace was not increased")
}

logrus.Infof("Verifying deployment has been paused")
items := deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err := isDeploymentPaused(items, testName)
if err != nil {
t.Errorf("%s", err.Error())
}
if !deploymentPaused {
t.Errorf("Deployment has not been paused")
}

shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause-changed.stakater.com")
config = getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation)

_ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy)

if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 2 {
t.Errorf("Counter was not increased")
}

if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 2 {
t.Errorf("Counter by namespace was not increased")
}

logrus.Infof("Verifying deployment is still paused")
items = deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err = isDeploymentPaused(items, testName)
if err != nil {
t.Errorf("%s", err.Error())
}
if !deploymentPaused {
t.Errorf("Deployment should still be paused")
}

logrus.Infof("Verifying deployment has been resumed after pause interval")
time.Sleep(11 * time.Second)
items = deploymentFuncs.ItemsFunc(clients, config.Namespace)
deploymentPaused, err = isDeploymentPaused(items, testName)
if err != nil {
t.Errorf("%s", err.Error())
}
if deploymentPaused {
t.Errorf("Deployment should have been resumed after pause interval")
}
}

func isDeploymentPaused(deployments []runtime.Object, deploymentName string) (bool, error) {
for _, deployment := range deployments {
accessor, err := meta.Accessor(deployment)
if err != nil {
return false, fmt.Errorf("Error getting accessor for item: %v", err)
}
if accessor.GetName() == deploymentName {
deploymentObj, ok := deployment.(*appsv1.Deployment)
if !ok {
return false, fmt.Errorf("Failed to cast to Deployment")
}
return deploymentObj.Spec.Paused, nil
}
}
return false, nil
}
6 changes: 6 additions & 0 deletions internal/pkg/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
SearchMatchAnnotation = "reloader.stakater.com/match"
// RolloutStrategyAnnotation is an annotation to define rollout update strategy
RolloutStrategyAnnotation = "reloader.stakater.com/rollout-strategy"
// PauseDeploymentAnnotation is an annotation to define the time period to pause a deployment after
// a configmap/secret change has been detected. Valid values are described here: https://pkg.go.dev/time#ParseDuration
// only positive values are allowed
PauseDeploymentAnnotation = "deployment.reloader.stakater.com/pause-period"
// Annotation set by reloader to indicate that the deployment has been paused
PauseDeploymentTimeAnnotation = "deployment.reloader.stakater.com/paused-at"
// LogFormat is the log format to use (json, or empty string for default)
LogFormat = ""
// LogLevel is the log level to use (trace, debug, info, warning, error, fatal and panic)
Expand Down
20 changes: 20 additions & 0 deletions internal/pkg/testutil/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,26 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp
return deployment, err
}

// CreateDeployment creates a deployment in given namespace and returns the Deployment
func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) {
logrus.Infof("Creating Deployment")
deploymentClient := client.AppsV1().Deployments(namespace)
var deploymentObj *appsv1.Deployment
if volumeMount {
deploymentObj = GetDeployment(namespace, deploymentName)
} else {
deploymentObj = GetDeploymentWithEnvVars(namespace, deploymentName)
}

for annotationKey, annotationValue := range additionalAnnotations {
deploymentObj.Annotations[annotationKey] = annotationValue
}

deployment, err := deploymentClient.Create(context.TODO(), deploymentObj, metav1.CreateOptions{})
time.Sleep(3 * time.Second)
return deployment, err
}

// CreateDeploymentConfig creates a deploymentConfig in given namespace and returns the DeploymentConfig
func CreateDeploymentConfig(client appsclient.Interface, deploymentName string, namespace string, volumeMount bool) (*openshiftv1.DeploymentConfig, error) {
logrus.Infof("Creating DeploymentConfig")
Expand Down
Loading