diff --git a/tools/portforward/fallback_dialer_test.go b/tools/portforward/fallback_dialer_test.go index 1a6805f122..70583958ea 100644 --- a/tools/portforward/fallback_dialer_test.go +++ b/tools/portforward/fallback_dialer_test.go @@ -17,10 +17,12 @@ limitations under the License. package portforward import ( + "errors" "fmt" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/httpstream" ) @@ -36,7 +38,7 @@ func TestFallbackDialer(t *testing.T) { assert.True(t, primary.dialed, "no fallback; primary should have dialed") assert.False(t, secondary.dialed, "no fallback; secondary should *not* have dialed") assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned") - assert.Nil(t, err, "error from primary dialer should be nil") + require.NoError(t, err, "error from primary dialer should be nil") // If primary dialer error is upgrade error, then fallback returning secondary dial response. primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}} secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol} @@ -45,7 +47,18 @@ func TestFallbackDialer(t *testing.T) { assert.True(t, primary.dialed, "fallback; primary should have dialed") assert.True(t, secondary.dialed, "fallback; secondary should have dialed") assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer") - assert.Nil(t, err, "error from secondary dialer should be nil") + require.NoError(t, err, "error from secondary dialer should be nil") + // If primary dialer error is https proxy dialing error, then fallback returning secondary dial response. + primary = &fakeDialer{negotiatedProtocol: primaryProtocol, err: errors.New("proxy: unknown scheme: https")} + secondary = &fakeDialer{negotiatedProtocol: secondaryProtocol} + fallbackDialer = NewFallbackDialer(primary, secondary, func(err error) bool { + return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) + }) + _, negotiated, err = fallbackDialer.Dial(protocols...) + assert.True(t, primary.dialed, "fallback; primary should have dialed") + assert.True(t, secondary.dialed, "fallback; secondary should have dialed") + assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer") + require.NoError(t, err, "error from secondary dialer should be nil") // If primary dialer returns non-upgrade error, then primary error is returned. nonUpgradeErr := fmt.Errorf("This is a non-upgrade error") primary = &fakeDialer{dialed: false, err: nonUpgradeErr} diff --git a/tools/remotecommand/fallback_test.go b/tools/remotecommand/fallback_test.go index 52e1f7b160..0dcca20634 100644 --- a/tools/remotecommand/fallback_test.go +++ b/tools/remotecommand/fallback_test.go @@ -20,15 +20,19 @@ import ( "bytes" "context" "crypto/rand" + "crypto/tls" "io" "net/http" "net/http/httptest" "net/url" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/httpstream" + utilnettesting "k8s.io/apimachinery/pkg/util/net/testing" "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" @@ -165,6 +169,178 @@ func TestFallbackClient_SPDYSecondarySucceeds(t *testing.T) { } } +// localhostCert was generated from crypto/tls/generate_cert.go with the following command: +// +// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDGTCCAgGgAwIBAgIRALL5AZcefF4kkYV1SEG6YrMwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP +ADCCAQoCggEBALQ/FHcyVwdFHxARbbD2KBtDUT7Eni+8ioNdjtGcmtXqBv45EC1C +JOqqGJTroFGJ6Q9kQIZ9FqH5IJR2fOOJD9kOTueG4Vt1JY1rj1Kbpjefu8XleZ5L +SBwIWVnN/lEsEbuKmj7N2gLt5AH3zMZiBI1mg1u9Z5ZZHYbCiTpBrwsq6cTlvR9g +dyo1YkM5hRESCzsrL0aUByoo0qRMD8ZsgANJwgsiO0/M6idbxDwv1BnGwGmRYvOE +Hxpy3v0Jg7GJYrvnpnifJTs4nw91N5X9pXxR7FFzi/6HTYDWRljvTb0w6XciKYAz +bWZ0+cJr5F7wB7ovlbm7HrQIR7z7EIIu2d8CAwEAAaNoMGYwDgYDVR0PAQH/BAQD +AgKkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wLgYDVR0R +BCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZI +hvcNAQELBQADggEBAFPPWopNEJtIA2VFAQcqN6uJK+JVFOnjGRoCrM6Xgzdm0wxY +XCGjsxY5dl+V7KzdGqu858rCaq5osEBqypBpYAnS9C38VyCDA1vPS1PsN8SYv48z +DyBwj+7R2qar0ADBhnhWxvYO9M72lN/wuCqFKYMeFSnJdQLv3AsrrHe9lYqOa36s +8wxSwVTFTYXBzljPEnSaaJMPqFD8JXaZK1ryJPkO5OsCNQNGtatNiWAf3DcmwHAT +MGYMzP0u4nw47aRz9shB8w+taPKHx2BVwE1m/yp3nHVioOjXqA1fwRQVGclCJSH1 +D2iq3hWVHRENgjTjANBPICLo9AZ4JfN6PH19mnU= +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAtD8UdzJXB0UfEBFtsPYoG0NRPsSeL7yKg12O0Zya1eoG/jkQ +LUIk6qoYlOugUYnpD2RAhn0WofkglHZ844kP2Q5O54bhW3UljWuPUpumN5+7xeV5 +nktIHAhZWc3+USwRu4qaPs3aAu3kAffMxmIEjWaDW71nllkdhsKJOkGvCyrpxOW9 +H2B3KjViQzmFERILOysvRpQHKijSpEwPxmyAA0nCCyI7T8zqJ1vEPC/UGcbAaZFi +84QfGnLe/QmDsYliu+emeJ8lOzifD3U3lf2lfFHsUXOL/odNgNZGWO9NvTDpdyIp +gDNtZnT5wmvkXvAHui+VubsetAhHvPsQgi7Z3wIDAQABAoIBAGmw93IxjYCQ0ncc +kSKMJNZfsdtJdaxuNRZ0nNNirhQzR2h403iGaZlEpmdkhzxozsWcto1l+gh+SdFk +bTUK4MUZM8FlgO2dEqkLYh5BcMT7ICMZvSfJ4v21E5eqR68XVUqQKoQbNvQyxFk3 +EddeEGdNrkb0GDK8DKlBlzAW5ep4gjG85wSTjR+J+muUv3R0BgLBFSuQnIDM/IMB +LWqsja/QbtB7yppe7jL5u8UCFdZG8BBKT9fcvFIu5PRLO3MO0uOI7LTc8+W1Xm23 +uv+j3SY0+v+6POjK0UlJFFi/wkSPTFIfrQO1qFBkTDQHhQ6q/7GnILYYOiGbIRg2 +NNuP52ECgYEAzXEoy50wSYh8xfFaBuxbm3ruuG2W49jgop7ZfoFrPWwOQKAZS441 +VIwV4+e5IcA6KkuYbtGSdTYqK1SMkgnUyD/VevwAqH5TJoEIGu0pDuKGwVuwqioZ +frCIAV5GllKyUJ55VZNbRr2vY2fCsWbaCSCHETn6C16DNuTCe5C0JBECgYEA4JqY +5GpNbMG8fOt4H7hU0Fbm2yd6SHJcQ3/9iimef7xG6ajxsYrIhg1ft+3IPHMjVI0+ +9brwHDnWg4bOOx/VO4VJBt6Dm/F33bndnZRkuIjfSNpLM51P+EnRdaFVHOJHwKqx +uF69kihifCAG7YATgCveeXImzBUSyZUz9UrETu8CgYARNBimdFNG1RcdvEg9rC0/ +p9u1tfecvNySwZqU7WF9kz7eSonTueTdX521qAHowaAdSpdJMGODTTXaywm6cPhQ +jIfj9JZZhbqQzt1O4+08Qdvm9TamCUB5S28YLjza+bHU7nBaqixKkDfPqzCyilpX +yVGGL8SwjwmN3zop/sQXAQKBgC0JMsESQ6YcDsRpnrOVjYQc+LtW5iEitTdfsaID +iGGKihmOI7B66IxgoCHMTws39wycKdSyADVYr5e97xpR3rrJlgQHmBIrz+Iow7Q2 +LiAGaec8xjl6QK/DdXmFuQBKqyKJ14rljFODP4QuE9WJid94bGqjpf3j99ltznZP +4J8HAoGAJb4eb4lu4UGwifDzqfAPzLGCoi0fE1/hSx34lfuLcc1G+LEu9YDKoOVJ +9suOh0b5K/bfEy9KrVMBBriduvdaERSD8S3pkIQaitIz0B029AbE4FLFf9lKQpP2 +KR8NJEkK99Vh/tew6jAMll70xFrE7aF8VLXJVE7w4sQzuvHxl9Q= +-----END RSA PRIVATE KEY----- +`) + +// See (https://github.com/kubernetes/kubernetes/issues/126134). +func TestFallbackClient_WebSocketHTTPSProxyCausesSPDYFallback(t *testing.T) { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Errorf("https (valid hostname): proxy_test: %v", err) + } + + var proxyCalled atomic.Int64 + proxyHandler := utilnettesting.NewHTTPProxyHandler(t, func(req *http.Request) bool { + proxyCalled.Add(1) + return true + }) + defer proxyHandler.Wait() + + proxyServer := httptest.NewUnstartedServer(proxyHandler) + proxyServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}} + proxyServer.StartTLS() + defer proxyServer.Close() //nolint:errcheck + + proxyLocation, err := url.Parse(proxyServer.URL) + require.NoError(t, err) + + // Create fake SPDY server. Copy received STDIN data back onto STDOUT stream. + spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + var stdin, stdout bytes.Buffer + ctx, err := createHTTPStreams(w, req, &StreamOptions{ + Stdin: &stdin, + Stdout: &stdout, + }) + if err != nil { + w.WriteHeader(http.StatusForbidden) + return + } + defer ctx.conn.Close() //nolint:errcheck + _, err = io.Copy(ctx.stdoutStream, ctx.stdinStream) + if err != nil { + t.Fatalf("error copying STDIN to STDOUT: %v", err) + } + })) + defer spdyServer.Close() //nolint:errcheck + + backendLocation, err := url.Parse(spdyServer.URL) + require.NoError(t, err) + + clientConfig := &rest.Config{ + Host: spdyServer.URL, + TLSClientConfig: rest.TLSClientConfig{CAData: localhostCert}, + Proxy: func(req *http.Request) (*url.URL, error) { + return proxyLocation, nil + }, + } + + // Websocket with https proxy will fail in dialing (falling back to SPDY). + websocketExecutor, err := NewWebSocketExecutor(clientConfig, "GET", backendLocation.String()) + require.NoError(t, err) + spdyExecutor, err := NewSPDYExecutor(clientConfig, "POST", backendLocation) + require.NoError(t, err) + // Fallback to spdyExecutor with websocket https proxy error; spdyExecutor succeeds against fake spdy server. + sawHTTPSProxyError := false + exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(err error) bool { + if httpstream.IsUpgradeFailure(err) { + t.Errorf("saw upgrade failure: %v", err) + return true + } + if httpstream.IsHTTPSProxyError(err) { + sawHTTPSProxyError = true + t.Logf("saw https proxy error: %v", err) + return true + } + return false + }) + require.NoError(t, err) + + // Generate random data, and set it up to stream on STDIN. The data will be + // returned on the STDOUT buffer. + randomSize := 1024 * 1024 + randomData := make([]byte, randomSize) + if _, err := rand.Read(randomData); err != nil { + t.Errorf("unexpected error reading random data: %v", err) + } + var stdout bytes.Buffer + options := &StreamOptions{ + Stdin: bytes.NewReader(randomData), + Stdout: &stdout, + } + errorChan := make(chan error) + go func() { + errorChan <- exec.StreamWithContext(context.Background(), *options) + }() + + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("expect stream to be closed after connection is closed.") + case err := <-errorChan: + if err != nil { + t.Errorf("unexpected error") + } + } + + data, err := io.ReadAll(bytes.NewReader(stdout.Bytes())) + if err != nil { + t.Errorf("error reading the stream: %v", err) + return + } + // Check the random data sent on STDIN was the same returned on STDOUT. + if !bytes.Equal(randomData, data) { + t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData)) + } + + // Ensure the https proxy error was observed + if !sawHTTPSProxyError { + t.Errorf("expected to see https proxy error") + } + // Ensure the proxy was called once + if e, a := int64(1), proxyCalled.Load(); e != a { + t.Errorf("expected %d proxy call, got %d", e, a) + } +} + func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) { // Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream. websocketServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/tools/remotecommand/websocket_test.go b/tools/remotecommand/websocket_test.go index 4a333b0b24..e1b69c0045 100644 --- a/tools/remotecommand/websocket_test.go +++ b/tools/remotecommand/websocket_test.go @@ -39,6 +39,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" @@ -814,6 +815,42 @@ func TestWebSocketClient_BadHandshake(t *testing.T) { } } +// See (https://github.com/kubernetes/kubernetes/issues/126134). +func TestWebSocketClient_HTTPSProxyErrorExpected(t *testing.T) { + urlStr := "http://127.0.0.1/never-used" + "?" + "stdin=true" + "&" + "stdout=true" + websocketLocation, err := url.Parse(urlStr) + if err != nil { + t.Fatalf("Unable to parse WebSocket server URL: %s", urlStr) + } + // proxy url with https scheme will trigger websocket dialing error. + httpsProxyFunc := func(req *http.Request) (*url.URL, error) { return url.Parse("https://127.0.0.1") } + exec, err := NewWebSocketExecutor(&rest.Config{Host: websocketLocation.Host, Proxy: httpsProxyFunc}, "GET", urlStr) + if err != nil { + t.Errorf("unexpected error creating websocket executor: %v", err) + } + var stdout bytes.Buffer + options := &StreamOptions{ + Stdout: &stdout, + } + errorChan := make(chan error) + go func() { + // Start the streaming on the WebSocket "exec" client. + errorChan <- exec.StreamWithContext(context.Background(), *options) + }() + + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("expect stream to be closed after connection is closed.") + case err := <-errorChan: + if err == nil { + t.Errorf("expected error but received none") + } + if !httpstream.IsHTTPSProxyError(err) { + t.Errorf("expected https proxy error, got (%s)", err) + } + } +} + // TestWebSocketClient_HeartbeatTimeout tests the heartbeat by forcing a // timeout by setting the ping period greater than the deadline. func TestWebSocketClient_HeartbeatTimeout(t *testing.T) {