Skip to content

Commit

Permalink
Merge pull request #187 from aojea/race_http
Browse files Browse the repository at this point in the history
Improve startup latency
  • Loading branch information
k8s-ci-robot authored Jan 15, 2025
2 parents 0ca7080 + ed431a0 commit 36dd9ee
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 59 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/k8s.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ jobs:
sudo cp ${TMP_DIR}/e2e.test /usr/local/bin/e2e.test
sudo cp ${TMP_DIR}/kubectl /usr/local/bin/kubectl
sudo cp ${TMP_DIR}/kind /usr/local/bin/kind
sudo chmod +x /usr/local/bin/*
sudo chmod +x /usr/local/bin/ginkgo
sudo chmod +x /usr/local/bin/e2e.test
sudo chmod +x /usr/local/bin/kubectl
sudo chmod +x /usr/local/bin/kind
# Create folder to store artifacts
mkdir -p _artifacts
Expand Down
111 changes: 53 additions & 58 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package controller

import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"sync"
"time"
Expand All @@ -14,6 +12,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
cloudprovider "k8s.io/cloud-provider"
nodecontroller "k8s.io/cloud-provider/controllers/node"
Expand Down Expand Up @@ -109,80 +108,76 @@ func (c *Controller) Run(ctx context.Context) {
}
}

func (c *Controller) getKubeConfig(cluster string, internal bool) (*rest.Config, error) {
kconfig, err := c.kind.KubeConfig(cluster, internal)
if err != nil {
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
return nil, err
}

config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
if err != nil {
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
return nil, err
}
return config, nil
}

// getKubeClient returns a kubeclient for the cluster passed as argument
// It tries first to connect to the internal endpoint.
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
addresses := []string{}
internalConfig, err := c.getKubeConfig(cluster, true)
if err != nil {
klog.Errorf("Failed to get internal kubeconfig for cluster %s: %v", cluster, err)
} else {
addresses = append(addresses, internalConfig.Host)
}
externalConfig, err := c.getKubeConfig(cluster, false)
if err != nil {
klog.Errorf("Failed to get external kubeconfig for cluster %s: %v", cluster, err)
} else {
addresses = append(addresses, externalConfig.Host)
}
// prefer internal (direct connectivity) over no-internal (commonly portmap)
for _, internal := range []bool{true, false} {
kconfig, err := c.kind.KubeConfig(cluster, internal)
if err != nil {
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
continue
}

config, err := clientcmd.RESTConfigFromKubeConfig([]byte(kconfig))
if err != nil {
klog.Errorf("Failed to convert kubeconfig for cluster %s: %v", cluster, err)
continue
}
if len(addresses) == 0 {
return nil, fmt.Errorf("could not find kubeconfig for cluster %s", cluster)
}

// check that the apiserver is reachable before continue
// to fail fast and avoid waiting until the client operations timeout
var ok bool
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if probeHTTP(httpClient, config.Host) {
ok = true
break
}
var host string
for i := 0; i < 5; i++ {
host, err = firstSuccessfulProbe(ctx, addresses)
if err != nil {
klog.Errorf("Failed to connect to any address in %v: %v", addresses, err)
time.Sleep(time.Second * time.Duration(i))
} else {
klog.Infof("Connected succesfully to %s", host)
break
}
if !ok {
klog.Errorf("Failed to connect to apiserver %s: %v", cluster, err)
continue
}
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
continue
}
var config *rest.Config
switch host {
case internalConfig.Host:
config = internalConfig
// the first cluster will give us the type of connectivity between
// cloud-provider-kind and the clusters and load balancer containers.
// In Linux or containerized cloud-provider-kind this will be direct.
once.Do(func() {
if internal {
cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct
}
cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct
})
return kubeClient, err
case externalConfig.Host:
config = externalConfig
default:
return nil, fmt.Errorf("restConfig for host %s not avaliable", host)
}
return nil, fmt.Errorf("can not find a working kubernetes clientset")
}

func probeHTTP(client *http.Client, address string) bool {
klog.Infof("probe HTTP address %s", address)
resp, err := client.Get(address)
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Infof("Failed to connect to HTTP address %s: %v", address, err)
return false
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
return kubeClient, err
}
defer resp.Body.Close()
// drain the body
io.ReadAll(resp.Body) // nolint:errcheck
// we only want to verify connectivity so don't need to check the http status code
// as the apiserver may not be ready
return true
return kubeClient, nil
}

// TODO: implement leader election to not have problems with multiple providers
Expand Down
74 changes: 74 additions & 0 deletions pkg/controller/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package controller

import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"sync"
"time"

"k8s.io/klog/v2"
)

func probeHTTP(ctx context.Context, address string) bool {
klog.Infof("probe HTTP address %s", address)
httpClient := &http.Client{
Timeout: 2 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
req, err := http.NewRequest("GET", address, nil)
if err != nil {
return false
}
req = req.WithContext(ctx)
resp, err := httpClient.Do(req)
if err != nil {
klog.Infof("Failed to connect to HTTP address %s: %v", address, err)
return false
}
defer resp.Body.Close()
// drain the body
io.ReadAll(resp.Body) // nolint:errcheck
// we only want to verify connectivity so don't need to check the http status code
// as the apiserver may not be ready
return true
}

// firstSuccessfulProbe probes the given addresses in parallel and returns the first address to succeed, cancelling the other probes.
func firstSuccessfulProbe(ctx context.Context, addresses []string) (string, error) {
var wg sync.WaitGroup
resultChan := make(chan string, 1)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for _, addr := range addresses {
wg.Add(1)
go func(address string) {
defer wg.Done()
if probeHTTP(ctx, address) {
select {
case resultChan <- address:
default:
}
cancel()
}
}(addr)
}

go func() {
wg.Wait()
close(resultChan)
}()

select {
case result := <-resultChan:
return result, nil
case <-ctx.Done():
return "", fmt.Errorf("no address succeeded")
}
}
37 changes: 37 additions & 0 deletions pkg/controller/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package controller

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
)

func Test_firstSuccessfulProbe(t *testing.T) {
reqCh := make(chan struct{})
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("received connection ")
close(reqCh)
}))
ts.EnableHTTP2 = true
ts.StartTLS()
defer ts.Close()
// use an address that is not likely to exist to avoid flakes
addresses := []string{"https://127.0.1.201:12349", ts.URL}
got, err := firstSuccessfulProbe(context.Background(), addresses)
if err != nil {
t.Errorf("firstSuccessfulProbe() error = %v", err)
return
}
if got != ts.URL {
t.Errorf("firstSuccessfulProbe() = %v, want %v", got, ts.URL)
}

select {
case <-reqCh:
case <-time.After(10 * time.Second):
t.Fatalf("test timed out, no request received")
}

}

0 comments on commit 36dd9ee

Please sign in to comment.