Skip to content

Commit

Permalink
Add a test case for conflict detection and write backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed Mar 8, 2018
1 parent 9cfd001 commit 82c6b10
Showing 1 changed file with 361 additions and 0 deletions.
361 changes: 361 additions & 0 deletions test/extended/router/stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
package images

import (
"bytes"
"fmt"
"os"
"strings"
"text/tabwriter"
"time"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
rbacv1 "k8s.io/api/rbac/v1"
kapierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
e2e "k8s.io/kubernetes/test/e2e/framework"

routev1 "github.com/openshift/api/route/v1"
routeclientset "github.com/openshift/client-go/route/clientset/versioned"
exutil "github.com/openshift/origin/test/extended/util"
)

var _ = g.Describe("[Conformance][Area:Networking][Feature:Router]", func() {
defer g.GinkgoRecover()
var (
routerImage string

oc = exutil.NewCLI("router-stress", exutil.KubeConfigPath())
)

g.BeforeEach(func() {
dc, err := oc.AdminAppsClient().Apps().DeploymentConfigs("default").Get("router", metav1.GetOptions{})
if kapierrs.IsNotFound(err) {
g.Skip("no router installed on the cluster")
imagePrefix := os.Getenv("OS_IMAGE_PREFIX")
if len(imagePrefix) == 0 {
imagePrefix = "openshift/origin"
}
routerImage = imagePrefix + "-haproxy-router:latest"
return
}
o.Expect(err).NotTo(o.HaveOccurred())
routerImage = dc.Spec.Template.Spec.Containers[0].Image

_, err = oc.AdminKubeClient().Rbac().RoleBindings(oc.Namespace()).Create(&rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "router",
},
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: "default",
},
},
RoleRef: rbacv1.RoleRef{
Kind: "ClusterRole",
Name: "system:router",
},
})
o.Expect(err).NotTo(o.HaveOccurred())
})

g.AfterEach(func() {
if g.CurrentGinkgoTestDescription().Failed {
client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).Route().Routes(oc.Namespace())
if routes, _ := client.List(metav1.ListOptions{}); routes != nil {
outputIngress(routes.Items...)
}
exutil.DumpPodLogsStartingWith("router-", oc)
}
})

g.Describe("The HAProxy router", func() {
g.It("converges when multiple routers are writing status", func() {
g.By("deploying a scaled out namespace scoped router")
client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).Route().Routes(oc.Namespace())
var rv string
for i := 0; i < 10; i++ {
_, err := client.Create(&routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{Name: "test"},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(8080),
},
},
})
o.Expect(err).NotTo(o.HaveOccurred())
}
rs, err := oc.KubeClient().Extensions().ReplicaSets(oc.Namespace()).Create(
scaledRouter(
routerImage,
[]string{
"--loglevel=4",
fmt.Sprintf("--namespace=%s", oc.Namespace()),
"--resync-interval=2m",
"--name=namespaced",
},
),
)
o.Expect(err).NotTo(o.HaveOccurred())
err = e2e.WaitForReadyReplicaSet(oc.KubeClient(), oc.Namespace(), rs.Name)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("waiting for all routes to have a status")
err = wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
routes, err := client.List(metav1.ListOptions{})
if err != nil {
return false, err
}
o.Expect(routes.Items).To(o.HaveLen(10))
for _, route := range routes.Items {
ingress := findIngress(&route, "namespaced")
if ingress == nil {
return false, nil
}
o.Expect(ingress.Host).NotTo(o.BeEmpty())
o.Expect(ingress.Conditions).NotTo(o.BeEmpty())
o.Expect(ingress.Conditions[0].LastTransitionTime).NotTo(o.BeNil())
o.Expect(ingress.Conditions[0].Type).To(o.Equal(routev1.RouteAdmitted))
o.Expect(ingress.Conditions[0].Status).To(o.Equal(corev1.ConditionTrue))
}
outputIngress(routes.Items...)
rv = routes.ResourceVersion
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that we don't continue to write")
writes := 0
w, err := client.Watch(metav1.ListOptions{Watch: true, ResourceVersion: rv})
o.Expect(err).NotTo(o.HaveOccurred())
defer w.Stop()
timer := time.NewTimer(10 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
o.Expect(i).To(o.BeNumerically("<", 10))
case <-timer.C:
break Wait
}
}
e2e.Logf("Recorded %d writes total", writes)

verifyCommandEquivalent(oc.KubeClient(), rs, "md5sum /var/lib/haproxy/conf/*")
})

g.It("converges when multiple routers are writing conflicting status", func() {
g.By("deploying a scaled out namespace scoped router")

client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).Route().Routes(oc.Namespace())
var rv string
for i := 0; i < 10; i++ {
_, err := client.Create(&routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{Name: "test"},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(8080),
},
},
})
o.Expect(err).NotTo(o.HaveOccurred())
}

rs, err := oc.KubeClient().Extensions().ReplicaSets(oc.Namespace()).Create(
scaledRouter(
routerImage,
[]string{
"--loglevel=4",
fmt.Sprintf("--namespace=%s", oc.Namespace()),
"--resync-interval=2m",
"--name=conflicting",
"--override-hostname",
// causes each pod to have a different value
"--hostname-template=${name}-${namespace}.$(NAME).local",
},
),
)
o.Expect(err).NotTo(o.HaveOccurred())
err = e2e.WaitForReadyReplicaSet(oc.KubeClient(), oc.Namespace(), rs.Name)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("waiting for sufficient routes to have a status")
err = wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
routes, err := client.List(metav1.ListOptions{})
if err != nil {
return false, err
}
o.Expect(routes.Items).To(o.HaveLen(10))
conflicting := 0
for _, route := range routes.Items {
ingress := findIngress(&route, "conflicting")
if ingress == nil {
continue
}
conflicting++
o.Expect(ingress.Host).NotTo(o.BeEmpty())
o.Expect(ingress.Conditions).NotTo(o.BeEmpty())
o.Expect(ingress.Conditions[0].LastTransitionTime).NotTo(o.BeNil())
o.Expect(ingress.Conditions[0].Type).To(o.Equal(routev1.RouteAdmitted))
o.Expect(ingress.Conditions[0].Status).To(o.Equal(corev1.ConditionTrue))
}
if conflicting < 3 {
return false, nil
}
outputIngress(routes.Items...)
rv = routes.ResourceVersion
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that we stop writing conflicts rapidly")
writes := 0
w, err := client.Watch(metav1.ListOptions{Watch: true, ResourceVersion: rv})
o.Expect(err).NotTo(o.HaveOccurred())
func() {
defer w.Stop()
timer := time.NewTimer(10 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
o.Expect(i).To(o.BeNumerically("<", 10))
case <-timer.C:
break Wait
}
}
e2e.Logf("Recorded %d writes total", writes)
}()

// the os_http_be.map file will vary, so only check the haproxy config
verifyCommandEquivalent(oc.KubeClient(), rs, "md5sum /var/lib/haproxy/conf/haproxy.config")

g.By("clearing a single route's status")
route, err := client.Patch("9", types.MergePatchType, []byte(`{"status":{"ingress":[]}}`), "status")
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that only get a few updates")
writes = 0
w, err = client.Watch(metav1.ListOptions{Watch: true, ResourceVersion: route.ResourceVersion})
o.Expect(err).NotTo(o.HaveOccurred())
func() {
defer w.Stop()
timer := time.NewTimer(10 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
o.Expect(i).To(o.BeNumerically("<", 3))
case <-timer.C:
break Wait
}
}
e2e.Logf("Recorded %d writes total", writes)
}()
})
})
})

func findIngress(route *routev1.Route, name string) *routev1.RouteIngress {
for i, ingress := range route.Status.Ingress {
if ingress.RouterName == name {
return &route.Status.Ingress[i]
}
}
return nil
}

func scaledRouter(image string, args []string) *extensionsv1beta1.ReplicaSet {
one := int64(1)
scale := int32(3)
return &extensionsv1beta1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "router",
},
Spec: extensionsv1beta1.ReplicaSetSpec{
Replicas: &scale,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "router"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "router"},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: &one,
Containers: []corev1.Container{
{
Env: []corev1.EnvVar{
{Name: "NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}},
},
Name: "router",
Image: image,
Args: args,
},
},
},
},
},
}
}

func outputIngress(routes ...routev1.Route) {
b := &bytes.Buffer{}
w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "NAME\tROUTER\tHOST\tLAST TRANSITION\n")
for _, route := range routes {
for _, ingress := range route.Status.Ingress {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", route.Name, ingress.RouterName, ingress.Host, ingress.Conditions[0].LastTransitionTime)
}
}
w.Flush()
e2e.Logf("Routes:\n%s", b.String())
}

func verifyCommandEquivalent(c clientset.Interface, rs *extensionsv1beta1.ReplicaSet, cmd string) {
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
o.Expect(err).NotTo(o.HaveOccurred())
podList, err := c.CoreV1().Pods(rs.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
o.Expect(err).NotTo(o.HaveOccurred())

var values map[string]string
err = wait.PollImmediate(5*time.Second, time.Minute, func() (bool, error) {
values = make(map[string]string)
uniques := make(map[string]struct{})
for _, pod := range podList.Items {
stdout, err := e2e.RunHostCmdWithRetries(pod.Namespace, pod.Name, cmd, e2e.StatefulSetPoll, e2e.StatefulPodTimeout)
o.Expect(err).NotTo(o.HaveOccurred())
values[pod.Name] = stdout
uniques[stdout] = struct{}{}
}
return len(uniques) == 1, nil
})
for name, stdout := range values {
stdout = strings.TrimSuffix(stdout, "\n")
e2e.Logf(name + ": " + strings.Join(strings.Split(stdout, "\n"), fmt.Sprintf("\n%s: ", name)))
}
o.Expect(err).NotTo(o.HaveOccurred())
}

0 comments on commit 82c6b10

Please sign in to comment.