Skip to content

Commit

Permalink
fix: draining remote stream after port-forward connection broken
Browse files Browse the repository at this point in the history
Signed-off-by: Nic <[email protected]>
  • Loading branch information
nic-6443 authored and soltysh committed Nov 7, 2024
1 parent 847be85 commit dbe6b66
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
5 changes: 5 additions & 0 deletions staging/src/k8s.io/client-go/tools/portforward/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
case <-remoteDone:
case <-localError:
}
/*
reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
*/
_ = dataStream.Reset()

// always expect something on errorChan (it may be nil)
err = <-errorChan
Expand Down
94 changes: 94 additions & 0 deletions test/e2e/kubectl/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net"
"net/http"
"os/exec"
"regexp"
"strconv"
Expand Down Expand Up @@ -123,6 +124,36 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
}
}

func testWebServerPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "testwebserver",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"test-webserver"},
Ports: []v1.ContainerPort{{ContainerPort: int32(80)}},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt32(int32(80)),
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 3,
FailureThreshold: 10,
},
},
},
},
}
}

// WaitForTerminatedContainer waits till a given container be terminated for a given pod.
func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error {
return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
Expand Down Expand Up @@ -493,6 +524,69 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
doTestOverWebSockets(ctx, "localhost", f)
})
})

ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
ginkgo.By("Creating the target pod")
pod := testWebServerPod()
if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
framework.Failf("Couldn't create pod: %v", err)
}
if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
framework.Failf("Pod did not start running: %v", err)
}

ginkgo.By("Running 'kubectl port-forward'")
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
defer cmd.Stop()

ginkgo.By("Send a http request to verify port-forward working")
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
if err != nil {
framework.Failf("Couldn't get http response from port-forward: %v", err)
}
if resp.StatusCode != http.StatusOK {
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}

ginkgo.By("Dialing the local port")
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
if err != nil {
framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
}

// use raw tcp connection to emulate client close connection without reading response
ginkgo.By("Request agohost binary file (40MB+)")
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
for _, line := range requestLines {
if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil {
framework.Failf("Couldn't write http request to local connection: %v", err)
}
}

ginkgo.By("Read only one byte from the connection")
if _, err := conn.Read(make([]byte, 1)); err != nil {
framework.Logf("Couldn't reading from the local connection: %v", err)
}

ginkgo.By("Close client connection without reading remain data")
if err := conn.Close(); err != nil {
framework.Failf("Couldn't close local connection: %v", err)
}

ginkgo.By("Send another http request through port-forward again")
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
if err != nil {
framework.Failf("Couldn't get http response from port-forward: %v", err)
}
if resp.StatusCode != http.StatusOK {
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
})
})
})

func wsRead(conn *websocket.Conn) (byte, []byte, error) {
Expand Down

0 comments on commit dbe6b66

Please sign in to comment.