Skip to content

Commit

Permalink
Merge pull request #14322 from tnozicka/set-owner-refs-in-rcs-owned-b…
Browse files Browse the repository at this point in the history
…y-dc

Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Jun 1, 2017
2 parents b6fd9db + 13326de commit 1bf8a17
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 57 deletions.
7 changes: 6 additions & 1 deletion pkg/cmd/server/bootstrappolicy/controller_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func init() {
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraDeployerControllerServiceAccountName},
Rules: []rbac.PolicyRule{
rbac.NewRule("create", "get", "list", "watch", "patch", "delete").Groups(kapiGroup).Resources("pods").RuleOrDie(),

// "delete" is required here for compatibility with older deployer images
// (see https://github.com/openshift/origin/pull/14322#issuecomment-303968976)
// TODO: remove "delete" rule few releases after 3.6
rbac.NewRule("delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "update").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
eventsRule(),
},
Expand All @@ -73,7 +78,7 @@ func init() {
addControllerRole(rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraDeploymentConfigControllerServiceAccountName},
Rules: []rbac.PolicyRule{
rbac.NewRule("create", "get", "list", "watch", "update", "delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("create", "get", "list", "watch", "update", "patch", "delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
rbac.NewRule("update").Groups(deployGroup, legacyDeployGroup).Resources("deploymentconfigs/status").RuleOrDie(),
rbac.NewRule("get", "list", "watch").Groups(deployGroup, legacyDeployGroup).Resources("deploymentconfigs").RuleOrDie(),
eventsRule(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/server/bootstrappolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,10 @@ func GetOpenshiftBootstrapClusterRoles() []authorizationapi.ClusterRole {
},
},
Rules: []authorizationapi.PolicyRule{
// "delete" is required here for compatibility with older deployer images
// (see https://github.com/openshift/origin/pull/14322#issuecomment-303968976)
// TODO: remove "delete" rule few releases after 3.6
authorizationapi.NewRule("delete").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
authorizationapi.NewRule("get", "list", "watch", "update").Groups(kapiGroup).Resources("replicationcontrollers").RuleOrDie(),
authorizationapi.NewRule("get", "list", "watch", "create").Groups(kapiGroup).Resources("pods").RuleOrDie(),
authorizationapi.NewRule("get").Groups(kapiGroup).Resources("pods/log").RuleOrDie(),
Expand Down
175 changes: 175 additions & 0 deletions pkg/controller/controller_ref_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package controller

import (
"fmt"

"github.com/golang/glog"
kerrors "k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
kschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
kapi "k8s.io/kubernetes/pkg/api"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcontroller "k8s.io/kubernetes/pkg/controller"
)

// RSControlInterface is an interface that knows how to add or delete
// ReplicationControllers, as well as increment or decrement them. It is used
// by the DeploymentConfig controller to ease testing of actions that it takes.
type RCControlInterface interface {
PatchReplicationController(namespace, name string, data []byte) error
}

// RealRCControl is the default implementation of RCControlInterface.
type RealRCControl struct {
KubeClient kclientset.Interface
Recorder record.EventRecorder
}

// To make sure RealRCControl implements RCControlInterface
var _ RCControlInterface = &RealRCControl{}

// PatchReplicationController executes a strategic merge patch contained in 'data' on RC specified by 'namespace' and 'name'
func (r RealRCControl) PatchReplicationController(namespace, name string, data []byte) error {
_, err := r.KubeClient.Core().ReplicationControllers(namespace).Patch(name, types.StrategicMergePatchType, data)
return err
}

type RCControllerRefManager struct {
kcontroller.BaseControllerRefManager
controllerKind kschema.GroupVersionKind
rcControl RCControlInterface
}

// NewRCControllerRefManager returns a RCControllerRefManager that exposes
// methods to manage the controllerRef of ReplicationControllers.
//
// The CanAdopt() function can be used to perform a potentially expensive check
// (such as a live GET from the API server) prior to the first adoption.
// It will only be called (at most once) if an adoption is actually attempted.
// If CanAdopt() returns a non-nil error, all adoptions will fail.
//
// NOTE: Once CanAdopt() is called, it will not be called again by the same
// RCControllerRefManager instance. Create a new instance if it
// makes sense to check CanAdopt() again (e.g. in a different sync pass).
func NewRCControllerRefManager(
rcControl RCControlInterface,
controller kmetav1.Object,
selector klabels.Selector,
controllerKind kschema.GroupVersionKind,
canAdopt func() error,
) *RCControllerRefManager {
return &RCControllerRefManager{
BaseControllerRefManager: kcontroller.BaseControllerRefManager{
Controller: controller,
Selector: selector,
CanAdoptFunc: canAdopt,
},
controllerKind: controllerKind,
rcControl: rcControl,
}
}

// ClaimReplicationController tries to take ownership of a ReplicationController.
//
// It will reconcile the following:
// * Adopt the ReplicationController if it's an orphan.
// * Release owned ReplicationController if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The returned boolean indicates whether you now
// own the object.
func (m *RCControllerRefManager) ClaimReplicationController(rc *kapi.ReplicationController) (bool, error) {
match := func(obj kmetav1.Object) bool {
return m.Selector.Matches(klabels.Set(obj.GetLabels()))
}
adopt := func(obj kmetav1.Object) error {
return m.AdoptReplicationController(obj.(*kapi.ReplicationController))
}
release := func(obj kmetav1.Object) error {
return m.ReleaseReplicationController(obj.(*kapi.ReplicationController))
}

return m.ClaimObject(rc, match, adopt, release)
}

// ClaimReplicationControllers tries to take ownership of a list of ReplicationControllers.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attempted and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of ReplicationControllers that you now own is
// returned.
func (m *RCControllerRefManager) ClaimReplicationControllers(rcs []*kapi.ReplicationController) ([]*kapi.ReplicationController, error) {
var claimed []*kapi.ReplicationController
var errlist []error

for _, rc := range rcs {
ok, err := m.ClaimReplicationController(rc)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, rc)
}
}
return claimed, kutilerrors.NewAggregate(errlist)
}

// AdoptReplicationController sends a patch to take control of the ReplicationController. It returns the error if
// the patching fails.
func (m *RCControllerRefManager) AdoptReplicationController(rs *kapi.ReplicationController) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt ReplicationController %s/%s (%s): %v", rs.Namespace, rs.Name, rs.UID, err)
}
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{
"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],
"uid":"%s",
"finalizers": ["%s"]
}
}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), rs.UID,
kmetav1.FinalizerDeleteDependents)
return m.rcControl.PatchReplicationController(rs.Namespace, rs.Name, []byte(addControllerPatch))
}

// ReleaseReplicationController sends a patch to free the ReplicationController from the control of the Deployment controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *RCControllerRefManager) ReleaseReplicationController(rc *kapi.ReplicationController) error {
glog.V(4).Infof("patching ReplicationController %s/%s to remove its controllerRef to %s/%s:%s",
rc.Namespace, rc.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), rc.UID)
err := m.rcControl.PatchReplicationController(rc.Namespace, rc.Name, []byte(deleteOwnerRefPatch))
if err != nil {
if kerrors.IsNotFound(err) {
// If the ReplicationController no longer exists, ignore it.
return nil
}
if kerrors.IsInvalid(err) {
// Invalid error will be returned in two cases: 1. the ReplicationController
// has no owner reference, 2. the uid of the ReplicationController doesn't
// match, which means the ReplicationController is deleted and then recreated.
// In both cases, the error can be ignored.
return nil
}
}
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

kapierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
kutilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -17,9 +18,11 @@ import (
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
Expand Down Expand Up @@ -68,6 +71,8 @@ type DeploymentConfigController struct {
rcLister kcorelisters.ReplicationControllerLister
// rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config.
rcListerSynced func() bool
// rcControl is used for adopting/releasing replication controllers.
rcControl oscontroller.RCControlInterface

// codec is used to build deployments from configs.
codec runtime.Codec
Expand All @@ -84,11 +89,29 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, []*kapi.ReplicationController{})
}

// Find all deployments owned by the deployment config.
// List all ReplicationControllers to find also those we own but that no longer match our selector.
// They will be orphaned by ClaimReplicationControllers().
rcList, err := c.rcLister.ReplicationControllers(config.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error while deploymentConfigController listing replication controllers: %v", err)
}
selector := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector)
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing ReplicationControllers (see Kubernetes #42639).
canAdoptFunc := kcontroller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := c.dn.DeploymentConfigs(config.Namespace).Get(config.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != config.UID {
return nil, fmt.Errorf("original DeploymentConfig %v/%v is gone: got uid %v, wanted %v", config.Namespace, config.Name, fresh.UID, config.UID)
}
return fresh, nil
})
cm := oscontroller.NewRCControllerRefManager(c.rcControl, config, selector, deployutil.ControllerKind, canAdoptFunc)
existingDeployments, err := cm.ClaimReplicationControllers(rcList)
if err != nil {
return err
return fmt.Errorf("error while deploymentConfigController claiming replication controllers: %v", err)
}

// In case the deployment config has been marked for deletion, merely update its status with
Expand Down Expand Up @@ -125,6 +148,15 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return nil
}

copied, err := deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down Expand Up @@ -157,7 +189,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
return c.updateStatus(config, existingDeployments)
}

return c.reconcileDeployments(existingDeployments, config)
return c.reconcileDeployments(existingDeployments, config, cm)
}
// If the config is paused we shouldn't create new deployments for it.
if config.Spec.Paused {
Expand All @@ -177,10 +209,26 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
}
created, err := c.rn.ReplicationControllers(config.Namespace).Create(deployment)
if err != nil {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
// We need to find out if our controller owns that deployment and report error if not
if kapierrors.IsAlreadyExists(err) {
return c.updateStatus(config, existingDeployments)
rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name)
if err != nil {
return fmt.Errorf("error while deploymentConfigController getting the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller: %v", err)
}
if isOurs {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
return c.updateStatus(config, existingDeployments)
} else {
err = fmt.Errorf("replication controller %s already exists and deployment config is not allowed to claim it.", deployment.Name)
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %v", config.Status.LatestVersion, err)
return c.updateStatus(config, existingDeployments)
}
}
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err)
// We don't care about this error since we need to report the create failure.
Expand Down Expand Up @@ -208,7 +256,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig)
// successful deployment, not necessarily the latest in terms of the config
// version. The active deployment replica count should follow the config, and
// all other deployments should be scaled to zero.
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig) error {
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []*kapi.ReplicationController, config *deployapi.DeploymentConfig, cm *oscontroller.RCControllerRefManager) error {
activeDeployment := deployutil.ActiveDeployment(existingDeployments)

// Reconcile deployments. The active deployment follows the config, and all
Expand Down Expand Up @@ -239,6 +287,18 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments []
if err != nil {
return err
}
// We need to make sure we own that RC or adopt it if possible
isOurs, err := cm.ClaimReplicationController(rc)
if err != nil {
return fmt.Errorf("error while deploymentConfigController claiming the replication controller %s/%s: %v", rc.Namespace, rc.Name, err)
}
if !isOurs {
return fmt.Errorf("deployment config %s/%s (%v) no longer owns replication controller %s/%s (%v)",
config.Namespace, config.Name, config.UID,
deployment.Namespace, deployment.Name, deployment.UID,
)
}

copied, err = deployutil.DeploymentDeepCopy(rc)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/deploy/controller/deploymentconfig/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
kcontroller "k8s.io/kubernetes/pkg/controller"

osclient "github.com/openshift/origin/pkg/client"
oscontroller "github.com/openshift/origin/pkg/controller"
deployapi "github.com/openshift/origin/pkg/deploy/api"
)

Expand Down Expand Up @@ -51,6 +52,10 @@ func NewDeploymentConfigController(

rcLister: rcInformer.Lister(),
rcListerSynced: rcInformer.Informer().HasSynced,
rcControl: oscontroller.RealRCControl{
KubeClient: internalKubeClientset,
Recorder: recorder,
},

recorder: recorder,
codec: codec,
Expand Down
Loading

0 comments on commit 1bf8a17

Please sign in to comment.