Skip to content

Commit

Permalink
apps: replace kubectl scaler in deployer with direct client call and …
Browse files Browse the repository at this point in the history
…polling
  • Loading branch information
mfojtik committed Apr 10, 2018
1 parent 09f841f commit 7f7125b
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 161 deletions.
63 changes: 30 additions & 33 deletions pkg/apps/strategy/recreate/recreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/apis/autoscaling"
kapi "k8s.io/kubernetes/pkg/apis/core"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/api/errors"

appsapi "github.com/openshift/origin/pkg/apps/apis/apps"
strat "github.com/openshift/origin/pkg/apps/strategy"
Expand Down Expand Up @@ -48,16 +49,10 @@ type RecreateDeploymentStrategy struct {
// getUpdateAcceptor returns an UpdateAcceptor to verify the first replica
// of the deployment.
getUpdateAcceptor func(time.Duration, int32) strat.UpdateAcceptor
// scaler is used to scale replication controllers.
scaler kubectl.Scaler
// codec is used to decode DeploymentConfigs contained in deployments.
decoder runtime.Decoder
// hookExecutor can execute a lifecycle hook.
hookExecutor stratsupport.HookExecutor
// retryPeriod is how often to try updating the replica count.
retryPeriod time.Duration
// retryParams encapsulates the retry parameters
retryParams *kubectl.RetryParams
// events records the events
events record.EventSink
}
Expand All @@ -83,10 +78,8 @@ func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imagec
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout)
},
scaler: appsutil.NewReplicationControllerV1Scaler(client),
decoder: decoder,
hookExecutor: stratsupport.NewHookExecutor(client.Core(), tagClient, client.Core(), os.Stdout, decoder),
retryPeriod: 1 * time.Second,
}
}

Expand All @@ -108,25 +101,22 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
return fmt.Errorf("couldn't decode config from deployment %s: %v", to.Name, err)
}

retryTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
recreateTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
params := config.Spec.Strategy.RecreateParams
rollingParams := config.Spec.Strategy.RollingParams

if params != nil && params.TimeoutSeconds != nil {
retryTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
recreateTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
}

// When doing the initial rollout for rolling strategy we use recreate and for that we
// have to set the TimeoutSecond based on the rollling strategy parameters.
if rollingParams != nil && rollingParams.TimeoutSeconds != nil {
retryTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
recreateTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
}

s.retryParams = kubectl.NewRetryParams(s.retryPeriod, retryTimeout)
waitParams := kubectl.NewRetryParams(s.retryPeriod, retryTimeout)

if updateAcceptor == nil {
updateAcceptor = s.getUpdateAcceptor(retryTimeout, config.Spec.MinReadySeconds)
updateAcceptor = s.getUpdateAcceptor(recreateTimeout, config.Spec.MinReadySeconds)
}

// Execute any pre-hook.
Expand All @@ -147,7 +137,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
// Scale down the from deployment.
if from != nil {
fmt.Fprintf(s.out, "--> Scaling %s down to zero\n", from.Name)
_, err := s.scaleAndWait(from, 0, s.retryParams, waitParams)
_, err := s.scaleAndWait(from, 0, recreateTimeout)
if err != nil {
return fmt.Errorf("couldn't scale %s to 0: %v", from.Name, err)
}
Expand Down Expand Up @@ -177,7 +167,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
// Scale up to 1 and validate the replica,
// aborting if the replica isn't acceptable.
fmt.Fprintf(s.out, "--> Scaling %s to 1 before performing acceptance check\n", to.Name)
updatedTo, err := s.scaleAndWait(to, 1, s.retryParams, waitParams)
updatedTo, err := s.scaleAndWait(to, 1, recreateTimeout)
if err != nil {
return fmt.Errorf("couldn't scale %s to 1: %v", to.Name, err)
}
Expand All @@ -195,7 +185,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
// Complete the scale up.
if to.Spec.Replicas != int32(desiredReplicas) {
fmt.Fprintf(s.out, "--> Scaling %s to %d\n", to.Name, desiredReplicas)
updatedTo, err := s.scaleAndWait(to, desiredReplicas, s.retryParams, waitParams)
updatedTo, err := s.scaleAndWait(to, desiredReplicas, recreateTimeout)
if err != nil {
return fmt.Errorf("couldn't scale %s to %d: %v", to.Name, desiredReplicas, err)
}
Expand Down Expand Up @@ -224,32 +214,39 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
return nil
}

func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retry *kubectl.RetryParams, retryParams *kubectl.RetryParams) (*kapi.ReplicationController, error) {
func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retryTimeout time.Duration) (*kapi.ReplicationController, error) {
if int32(replicas) == deployment.Spec.Replicas && int32(replicas) == deployment.Status.Replicas {
return deployment, nil
}
var scaleErr error
err := wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
scaleErr = s.scaler.Scale(deployment.Namespace, deployment.Name, uint(replicas), &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}, retry, retryParams)
if scaleErr == nil {
return true, nil
}
// Update replication controller scale
err := wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
updateScaleErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
scale := &autoscaling.Scale{}
scale.Spec.Replicas = int32(replicas)
_, scaleErr := s.rcClient.ReplicationControllers(deployment.Namespace).UpdateScale(deployment.Name, scale)
return scaleErr
})
// This error is returned when the lifecycle admission plugin cache is not fully
// synchronized. In that case the scaling should be retried.
//
// FIXME: The error returned from admission should not be forbidden but come-back-later error.
if errors.IsForbidden(scaleErr) && strings.Contains(scaleErr.Error(), "not yet ready to handle request") {
if errors.IsForbidden(updateScaleErr) && strings.Contains(updateScaleErr.Error(), "not yet ready to handle request") {
return false, nil
}
return false, scaleErr
return true, updateScaleErr
})
if err == wait.ErrWaitTimeout {
return nil, fmt.Errorf("%v: %v", err, scaleErr)
}
if err != nil {
return nil, err
}

// TODO: We need to do polling as the upstream watches are broken atm.
// We should replace this with the same solution as kubectl rollout status will use.
err = wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
scale, scaleErr := s.rcClient.ReplicationControllers(deployment.Namespace).GetScale(deployment.Name, metav1.GetOptions{})
if scaleErr != nil {
return true, scaleErr
}
return scale.Status.Replicas == int32(replicas), nil
})
return s.rcClient.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
}

Expand Down
Loading

0 comments on commit 7f7125b

Please sign in to comment.