Skip to content

Commit

Permalink
Add asynchronous deployer pod invariant checker for every test
Browse files Browse the repository at this point in the history
  • Loading branch information
tnozicka committed Nov 29, 2017
1 parent 705e69b commit 020235f
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/apps/controller/deployer/deployer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func (c *DeploymentController) makeDeployerPod(deployment *v1.ReplicationControl
ObjectMeta: metav1.ObjectMeta{
Name: deployutil.DeployerPodNameForDeployment(deployment.Name),
Annotations: map[string]string{
deployapi.DeploymentAnnotation: deployment.Name,
deployapi.DeploymentAnnotation: deployment.Name,
deployapi.DeploymentConfigAnnotation: deployutil.DeploymentConfigNameFor(deployment),
},
Labels: map[string]string{
deployapi.DeployerPodForDeploymentLabel: deployment.Name,
Expand Down
48 changes: 47 additions & 1 deletion test/extended/deployments/deployments.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deployments

import (
"context"
"errors"
"fmt"
"math/rand"
Expand All @@ -27,10 +28,55 @@ import (
const deploymentRunTimeout = 5 * time.Minute
const deploymentChangeTimeout = 30 * time.Second

type dicEntry struct {
dic *deployerPodInvariantChecker
ctx context.Context
cancel func()
}

var _ = g.Describe("[Feature:DeploymentConfig] deploymentconfigs", func() {
defer g.GinkgoRecover()

dicMap := make(map[string]dicEntry)
var oc *exutil.CLI

g.JustBeforeEach(func() {
namespace := oc.Namespace()
o.Expect(namespace).NotTo(o.BeEmpty())
o.Expect(dicMap).NotTo(o.HaveKey(namespace))

dic := NewDeployerPodInvariantChecker(namespace, oc.AdminKubeClient())
ctx, cancel := context.WithCancel(context.Background())
dic.Start(ctx)

dicMap[namespace] = dicEntry{
dic: dic,
ctx: ctx,
cancel: cancel,
}
})

// This have to be registered before we create kube framework (NewCLI).
// It is probably a bug with Ginkgo because AfterEach description say innermost will be run first
// but it runs outermost first.
g.AfterEach(func() {
namespace := oc.Namespace()
o.Expect(namespace).NotTo(o.BeEmpty(), "There is something wrong with testing framework or the AfterEach functions have been registered in wrong order")
o.Expect(dicMap).To(o.HaveKey(namespace))

// Give some time to the checker to catch up
time.Sleep(2 * time.Second)

entry := dicMap[namespace]
delete(dicMap, namespace)

entry.cancel()
entry.dic.Wait()
})

oc = exutil.NewCLI("cli-deployment", exutil.KubeConfigPath())

var (
oc = exutil.NewCLI("cli-deployment", exutil.KubeConfigPath())
deploymentFixture = exutil.FixturePath("testdata", "deployments", "test-deployment-test.yaml")
simpleDeploymentFixture = exutil.FixturePath("testdata", "deployments", "deployment-simple.yaml")
customDeploymentFixture = exutil.FixturePath("testdata", "deployments", "custom-deployment.yaml")
Expand Down
165 changes: 158 additions & 7 deletions test/extended/deployments/util.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package deployments

import (
"context"
"fmt"
"io/ioutil"
"reflect"
"sort"
"strings"
"sync"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/ghodss/yaml"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
kapi "k8s.io/kubernetes/pkg/api"
kapiv1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -426,15 +433,33 @@ func rCConditionFromMeta(condition func(metav1.Object) (bool, error)) func(rc *c
}
}

func waitForPodModification(oc *exutil.CLI, namespace string, name string, timeout time.Duration, resourceVersion string, condition func(pod *corev1.Pod) (bool, error)) (*corev1.Pod, error) {
watcher, err := oc.KubeClient().CoreV1().Pods(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name, ResourceVersion: resourceVersion}))
if err != nil {
return nil, err
}

event, err := watch.Until(timeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Modified && (resourceVersion == "" && event.Type != watch.Added) {
return true, fmt.Errorf("different kind of event appeared while waiting for Pod modification: event: %#v", event)
}
return condition(event.Object.(*corev1.Pod))
})
if err != nil {
return nil, err
}
return event.Object.(*corev1.Pod), nil
}

func waitForRCModification(oc *exutil.CLI, namespace string, name string, timeout time.Duration, resourceVersion string, condition func(rc *corev1.ReplicationController) (bool, error)) (*corev1.ReplicationController, error) {
watcher, err := oc.KubeClient().CoreV1().ReplicationControllers(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name, ResourceVersion: resourceVersion}))
if err != nil {
return nil, err
}

event, err := watch.Until(timeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Modified {
return false, fmt.Errorf("different kind of event appeared while waiting for modification: event: %#v", event)
if event.Type != watch.Modified && (resourceVersion == "" && event.Type != watch.Added) {
return true, fmt.Errorf("different kind of event appeared while waiting for RC modification: event: %#v", event)
}
return condition(event.Object.(*corev1.ReplicationController))
})
Expand All @@ -454,17 +479,14 @@ func waitForDCModification(oc *exutil.CLI, namespace string, name string, timeou
}

event, err := watch.Until(timeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Modified {
return false, fmt.Errorf("different kind of event appeared while waiting for modification: event: %#v", event)
if event.Type != watch.Modified && (resourceVersion == "" && event.Type != watch.Added) {
return true, fmt.Errorf("different kind of event appeared while waiting for DC modification: event: %#v", event)
}
return condition(event.Object.(*deployapi.DeploymentConfig))
})
if err != nil {
return nil, err
}
if event.Type != watch.Modified {
return nil, fmt.Errorf("waiting for DC modification failed: event: %v", event)
}
return event.Object.(*deployapi.DeploymentConfig), nil
}

Expand Down Expand Up @@ -623,3 +645,132 @@ func readDCFixtureOrDie(path string) *deployapi.DeploymentConfig {
}
return data
}

type deployerPodInvariantChecker struct {
ctx context.Context
wg sync.WaitGroup
namespace string
client kubernetes.Interface
cache map[string][]*corev1.Pod
}

func NewDeployerPodInvariantChecker(namespace string, client kubernetes.Interface) *deployerPodInvariantChecker {
return &deployerPodInvariantChecker{
namespace: namespace,
client: client,
cache: make(map[string][]*corev1.Pod),
}
}

func (d *deployerPodInvariantChecker) getCacheKey(pod *corev1.Pod) string {
dcName, found := pod.Annotations[deployapi.DeploymentConfigAnnotation]
o.Expect(found).To(o.BeTrue(), fmt.Sprintf("internal error - deployment is missing %q annotation\npod: %#v", deployapi.DeploymentConfigAnnotation, pod))
o.Expect(dcName).NotTo(o.BeEmpty())

return fmt.Sprintf("%s/%s", pod.Namespace, dcName)
}
func (d *deployerPodInvariantChecker) getPodIndex(list []*corev1.Pod, pod *corev1.Pod) int {
for i, p := range list {
if p.Name == pod.Name && p.Namespace == pod.Namespace {
// Internal check
o.Expect(p.UID).To(o.Equal(pod.UID))
return i
}
}

// Internal check
o.Expect(fmt.Errorf("couldn't find pod %#v \n\n in list %#v", pod, list)).NotTo(o.HaveOccurred())
return -1
}

func (d *deployerPodInvariantChecker) checkInvariants(dc string, pods []*corev1.Pod) {
var unterminatedPods []*corev1.Pod
for _, pod := range pods {
if pod.Status.Phase != corev1.PodSucceeded && pod.Status.Phase != corev1.PodFailed {
unterminatedPods = append(unterminatedPods, pod)
}
}

// INVARIANT: There can be no more than one unterminated deployer pod present
message := fmt.Sprintf("Deployer pod invariant broken! More than one unterminated deployer pod exists for DC %s!", dc)
o.Expect(len(unterminatedPods)).To(o.BeNumerically("<=", 1), spew.Sprintf(`%v: %s
List of unterminated pods: %#+v
`, time.Now(), message, unterminatedPods))
}

func (d *deployerPodInvariantChecker) AddPod(pod *corev1.Pod) {
key := d.getCacheKey(pod)
d.cache[key] = append(d.cache[key], pod)

d.checkInvariants(key, d.cache[key])
}

func (d *deployerPodInvariantChecker) RemovePod(pod *corev1.Pod) {
key := d.getCacheKey(pod)
index := d.getPodIndex(d.cache[key], pod)

d.cache[key] = append(d.cache[key][:index], d.cache[key][index+1:]...)

d.checkInvariants(key, d.cache[key])
}

func (d *deployerPodInvariantChecker) UpdatePod(pod *corev1.Pod) {
key := d.getCacheKey(pod)
index := d.getPodIndex(d.cache[key], pod)

// Check for sanity.
// This is not paranoid; kubelet has already been broken this way:
// https://github.com/openshift/origin/issues/17011
oldPhase := d.cache[key][index].Status.Phase
oldPhaseIsTerminated := oldPhase == corev1.PodSucceeded || oldPhase == corev1.PodFailed
o.Expect(oldPhaseIsTerminated && pod.Status.Phase != oldPhase).To(o.BeFalse(),
fmt.Sprintf("%v: detected deployer pod transition from terminated phase: %q -> %q", time.Now(), oldPhase, pod.Status.Phase))

d.cache[key][index] = pod

d.checkInvariants(key, d.cache[key])
}

func (d *deployerPodInvariantChecker) doChecking() {
defer g.GinkgoRecover()

watcher, err := d.client.CoreV1().Pods(d.namespace).Watch(metav1.ListOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
defer d.wg.Done()
defer watcher.Stop()

for {
select {
case <-d.ctx.Done():
return
case event := <-watcher.ResultChan():
t := event.Type
if t != watch.Added && t != watch.Modified && t != watch.Deleted {
o.Expect(fmt.Errorf("unexpected event: %#v", event)).NotTo(o.HaveOccurred())
}
pod := event.Object.(*corev1.Pod)
if !strings.HasSuffix(pod.Name, "-deploy") {
continue
}

switch t {
case watch.Added:
d.AddPod(pod)
case watch.Modified:
d.UpdatePod(pod)
case watch.Deleted:
d.RemovePod(pod)
}
}
}
}

func (d *deployerPodInvariantChecker) Start(ctx context.Context) {
d.ctx = ctx
go d.doChecking()
d.wg.Add(1)
}

func (d *deployerPodInvariantChecker) Wait() {
d.wg.Wait()
}

0 comments on commit 020235f

Please sign in to comment.