Skip to content

Commit

Permalink
fix the error when cert-mgr-mode set to kubelet
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc committed Jun 18, 2021
1 parent 004e0b9 commit 0c8f3b4
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 129 deletions.
2 changes: 1 addition & 1 deletion cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewYurtHubOptions() *YurtHubOptions {
YurtHubProxyPort: "10261",
YurtHubPort: "10267",
GCFrequency: 120,
CertMgrMode: "hubself",
CertMgrMode: util.YurtHubCertificateManagerName,
LBMode: "rr",
HeartbeatFailedRetry: 3,
HeartbeatHealthyThreshold: 2,
Expand Down
11 changes: 10 additions & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/restconfig"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"

Expand Down Expand Up @@ -110,6 +111,14 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
healthChecker.Run()
trace++

klog.Infof("%d. new restConfig manager for %s mode", trace, cfg.CertMgrMode)
restConfigMgr, err := restconfig.NewRestConfigManager(cfg, certManager, healthChecker)
if err != nil {
klog.Errorf("could not new restConfig manager, %v", err)
return err
}
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager)
if err != nil {
Expand All @@ -119,7 +128,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, transportManager, stopCh)
gcMgr, err := gc.NewGCManager(cfg, transportManager, restConfigMgr, stopCh)
if err != nil {
klog.Errorf("could not new gc manager, %v", err)
return err
Expand Down
101 changes: 36 additions & 65 deletions pkg/yurthub/certificate/hubself/cert_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,21 @@ import (
)

const (
CertificateManagerName = "hubself"
HubName = "yurthub"
HubRootDir = "/var/lib/"
HubPkiDirName = "pki"
HubCaFileName = "ca.crt"
HubConfigFileName = "%s.conf"
BootstrapConfigFileName = "bootstrap-hub.conf"
BootstrapUser = "token-bootstrap-client"
DefaultClusterName = "kubernetes"
ClusterInfoName = "cluster-info"
KubeconfigName = "kubeconfig"
yurtHubName = "yurthub"
hubRootDir = "/var/lib/"
hubPkiDirName = "pki"
hubCaFileName = "ca.crt"
hubConfigFileName = "%s.conf"
bootstrapConfigFileName = "bootstrap-hub.conf"
bootstrapUser = "token-bootstrap-client"
defaultClusterName = "kubernetes"
clusterInfoName = "cluster-info"
kubeconfigName = "kubeconfig"
)

// Register registers a YurtCertificateManager
func Register(cmr *hubcert.CertificateManagerRegistry) {
cmr.Register(CertificateManagerName, func(cfg *config.YurtHubConfiguration) (interfaces.YurtCertificateManager, error) {
cmr.Register(util.YurtHubCertificateManagerName, func(cfg *config.YurtHubConfiguration) (interfaces.YurtCertificateManager, error) {
return NewYurtHubCertManager(cfg)
})
}
Expand All @@ -92,12 +91,12 @@ func NewYurtHubCertManager(cfg *config.YurtHubConfiguration) (interfaces.YurtCer

hubName := projectinfo.GetHubName()
if len(hubName) == 0 {
hubName = HubName
hubName = yurtHubName
}

rootDir := cfg.RootDir
if len(rootDir) == 0 {
rootDir = filepath.Join(HubRootDir, hubName)
rootDir = filepath.Join(hubRootDir, hubName)
}

ycm := &yurtHubCertManager{
Expand Down Expand Up @@ -185,44 +184,16 @@ func (ycm *yurtHubCertManager) Update(cfg *config.YurtHubConfiguration) error {
return nil
}

// GetRestConfig get rest client config from hub agent conf file.
func (ycm *yurtHubCertManager) GetRestConfig() *restclient.Config {
healthyServer := ycm.remoteServers[0]
if healthyServer == nil {
klog.Infof("all of remote servers are unhealthy, so return nil for rest config")
return nil
}

// certificate expired, rest config can not be used to connect remote server,
// so return nil for rest config
if ycm.Current() == nil {
klog.Infof("certificate expired, so return nil for rest config")
return nil
}

hubConfFile := ycm.getHubConfFile()
if isExist, _ := util.FileExists(hubConfFile); isExist {
cfg, err := util.LoadRESTClientConfig(hubConfFile)
if err != nil {
klog.Errorf("could not get rest config for %s, %v", hubConfFile, err)
return nil
}

// re-fix host connecting healthy server
cfg.Host = healthyServer.String()
klog.Infof("re-fix hub rest config host successfully with server %s", cfg.Host)
return cfg
}

klog.Errorf("%s config file(%s) is not exist", ycm.hubName, hubConfFile)
return nil
}

// GetCaFile returns the path of ca file
func (ycm *yurtHubCertManager) GetCaFile() string {
return ycm.caFile
}

// GetConfFile returns the path of yurtHub config file path
func (ycm *yurtHubCertManager) GetConfFilePath() string {
return ycm.getHubConfFile()
}

// NotExpired returns hub client cert is expired or not.
// True: not expired
// False: expired
Expand Down Expand Up @@ -258,13 +229,13 @@ func (ycm *yurtHubCertManager) initCaCert() error {
}

// make sure configMap kube-public/cluster-info in k8s cluster beforehand
insecureClusterInfo, err := insecureClient.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.Background(), ClusterInfoName, metav1.GetOptions{})
insecureClusterInfo, err := insecureClient.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.Background(), clusterInfoName, metav1.GetOptions{})
if err != nil {
klog.Errorf("failed to get cluster-info configmap, %v", err)
return err
}

kubeconfigStr, ok := insecureClusterInfo.Data[KubeconfigName]
kubeconfigStr, ok := insecureClusterInfo.Data[kubeconfigName]
if !ok || len(kubeconfigStr) == 0 {
return fmt.Errorf("no kubeconfig in cluster-info configmap of kube-public namespace")
}
Expand Down Expand Up @@ -300,7 +271,7 @@ func (ycm *yurtHubCertManager) initBootstrap() error {
}
ycm.bootstrapConfStore = bootstrapConfStore

contents, err := ycm.bootstrapConfStore.Get(BootstrapConfigFileName)
contents, err := ycm.bootstrapConfStore.Get(bootstrapConfigFileName)
if err == storage.ErrStorageNotFound {
klog.Infof("%s bootstrap conf file does not exist, so create it", ycm.hubName)
return ycm.createBootstrapConfFile(ycm.joinToken)
Expand Down Expand Up @@ -462,39 +433,39 @@ func (ycm *yurtHubCertManager) initHubConf() error {

// getPkiDir returns the directory for storing hub agent pki
func (ycm *yurtHubCertManager) getPkiDir() string {
return filepath.Join(ycm.rootDir, HubPkiDirName)
return filepath.Join(ycm.rootDir, hubPkiDirName)
}

// getCaFile returns the path of ca file
func (ycm *yurtHubCertManager) getCaFile() string {
return filepath.Join(ycm.getPkiDir(), HubCaFileName)
return filepath.Join(ycm.getPkiDir(), hubCaFileName)
}

// getBootstrapConfFile returns the path of bootstrap conf file
func (ycm *yurtHubCertManager) getBootstrapConfFile() string {
return filepath.Join(ycm.rootDir, BootstrapConfigFileName)
return filepath.Join(ycm.rootDir, bootstrapConfigFileName)
}

// getHubConfFile returns the path of hub agent conf file.
func (ycm *yurtHubCertManager) getHubConfFile() string {
return filepath.Join(ycm.rootDir, fmt.Sprintf(HubConfigFileName, ycm.hubName))
return filepath.Join(ycm.rootDir, fmt.Sprintf(hubConfigFileName, ycm.hubName))
}

// createBasic create basic client cmd config
func createBasic(apiServerAddr string, caCert []byte) *clientcmdapi.Config {
contextName := fmt.Sprintf("%s@%s", BootstrapUser, DefaultClusterName)
contextName := fmt.Sprintf("%s@%s", bootstrapUser, defaultClusterName)

return &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
DefaultClusterName: {
defaultClusterName: {
Server: apiServerAddr,
CertificateAuthorityData: caCert,
},
},
Contexts: map[string]*clientcmdapi.Context{
contextName: {
Cluster: DefaultClusterName,
AuthInfo: BootstrapUser,
Cluster: defaultClusterName,
AuthInfo: bootstrapUser,
},
},
AuthInfos: map[string]*clientcmdapi.AuthInfo{},
Expand All @@ -508,7 +479,7 @@ func createInsecureRestClientConfig(remoteServer *url.URL) (*restclient.Config,
return nil, fmt.Errorf("no healthy remote server")
}
cfg := createBasic(remoteServer.String(), []byte{})
cfg.Clusters[DefaultClusterName].InsecureSkipTLSVerify = true
cfg.Clusters[defaultClusterName].InsecureSkipTLSVerify = true

restConfig, err := clientcmd.NewDefaultClientConfig(*cfg, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
Expand Down Expand Up @@ -536,7 +507,7 @@ func createBootstrapConf(apiServerAddr, caFile, joinToken string) *clientcmdapi.
}

cfg := createBasic(apiServerAddr, caCert)
cfg.AuthInfos[BootstrapUser] = &clientcmdapi.AuthInfo{Token: joinToken}
cfg.AuthInfos[bootstrapUser] = &clientcmdapi.AuthInfo{Token: joinToken}

return cfg
}
Expand All @@ -559,7 +530,7 @@ func (ycm *yurtHubCertManager) createBootstrapConfFile(joinToken string) error {
return err
}

err = ycm.bootstrapConfStore.Update(BootstrapConfigFileName, content)
err = ycm.bootstrapConfStore.Update(bootstrapConfigFileName, content)
if err != nil {
klog.Errorf("could not create bootstrap conf file(%s), %v", ycm.getBootstrapConfFile(), err)
return err
Expand All @@ -586,21 +557,21 @@ func (ycm *yurtHubCertManager) updateBootstrapConfFile(joinToken string) error {
return fmt.Errorf("could not load bootstrap conf file(%s), %v", ycm.getBootstrapConfFile(), err)
}

if curKubeConfig.AuthInfos[BootstrapUser] != nil {
if curKubeConfig.AuthInfos[BootstrapUser].Token == joinToken {
if curKubeConfig.AuthInfos[bootstrapUser] != nil {
if curKubeConfig.AuthInfos[bootstrapUser].Token == joinToken {
klog.Infof("join token for %s bootstrap conf file is not changed", ycm.hubName)
return nil
}
}

curKubeConfig.AuthInfos[BootstrapUser] = &clientcmdapi.AuthInfo{Token: joinToken}
curKubeConfig.AuthInfos[bootstrapUser] = &clientcmdapi.AuthInfo{Token: joinToken}
content, err := clientcmd.Write(*curKubeConfig)
if err != nil {
klog.Errorf("could not update bootstrap config into bytes, %v", err)
return err
}

err = ycm.bootstrapConfStore.Update(BootstrapConfigFileName, content)
err = ycm.bootstrapConfStore.Update(bootstrapConfigFileName, content)
if err != nil {
klog.Errorf("could not update bootstrap config, %v", err)
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/yurthub/certificate/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package interfaces
import (
"github.com/openyurtio/openyurt/cmd/yurthub/app/config"

"k8s.io/client-go/rest"
"k8s.io/client-go/util/certificate"
)

// YurtCertificateManager is responsible for managing node certificate for yurthub
type YurtCertificateManager interface {
certificate.Manager
Update(cfg *config.YurtHubConfiguration) error
GetRestConfig() *rest.Config
GetConfFilePath() string
GetCaFile() string
NotExpired() bool
}
48 changes: 11 additions & 37 deletions pkg/yurthub/certificate/kubelet/cert_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,20 @@ import (
"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/klog"
)

const (
certificateManagerName = "kubelet"
defaultPairDir = "/var/lib/kubelet/pki"
defaultPairFile = "kubelet-client-current.pem"
defaultCaFile = "/etc/kubernetes/pki/ca.crt"
certVerifyDuration = 30 * time.Minute
defaultPairFile = "kubelet-client-current.pem"
certVerifyDuration = 30 * time.Minute
)

// Register registers a YurtCertificateManager
func Register(cmr *certificate.CertificateManagerRegistry) {
cmr.Register(certificateManagerName, func(cfg *config.YurtHubConfiguration) (interfaces.YurtCertificateManager, error) {
cmr.Register(util.KubeletCertificateManagerName, func(cfg *config.YurtHubConfiguration) (interfaces.YurtCertificateManager, error) {
return NewKubeletCertManager(cfg, 0, "")
})
}
Expand All @@ -59,7 +54,6 @@ type kubeletCertManager struct {
remoteServers []*url.URL
caFile string
certVerifyDuration time.Duration
checker healthchecker.HealthChecker
stopped bool
}

Expand All @@ -76,9 +70,10 @@ func NewKubeletCertManager(cfg *config.YurtHubConfiguration, period time.Duratio
}

if len(certDir) == 0 {
certDir = defaultPairDir
pairFile = util.DefaultKubeletPairFilePath
} else {
pairFile = filepath.Join(certDir, defaultPairFile)
}
pairFile = filepath.Join(certDir, defaultPairFile)

if pairFileExists, err := util.FileExists(pairFile); err != nil {
return nil, err
Expand All @@ -94,17 +89,12 @@ func NewKubeletCertManager(cfg *config.YurtHubConfiguration, period time.Duratio
pairFile: pairFile,
cert: cert,
remoteServers: cfg.RemoteServers,
caFile: defaultCaFile,
caFile: util.DefaultKubeletRootCAFilePath,
certVerifyDuration: period,
stopCh: make(chan struct{}),
}, nil
}

// SetHealthChecker set healthChecker
func (kcm *kubeletCertManager) SetHealthChecker(checker healthchecker.HealthChecker) {
kcm.checker = checker
}

// Stop stop cert manager
func (kcm *kubeletCertManager) Stop() {
kcm.certAccessLock.Lock()
Expand Down Expand Up @@ -157,25 +147,9 @@ func (kcm *kubeletCertManager) GetCaFile() string {
return kcm.caFile
}

// GetRestConfig get *rest.Config from kubelet.conf
func (kcm *kubeletCertManager) GetRestConfig() *rest.Config {
var s *url.URL
for _, server := range kcm.remoteServers {
if kcm.checker.IsHealthy(server) {
s = server
break
}
}
if s == nil {
return nil
}

cfg, err := util.LoadKubeletRestClientConfig(s)
if err != nil {
klog.Errorf("could not load kubelet rest client config, %v", err)
return nil
}
return cfg
// GetConfFile get an kube-config file path, but the kubelet mode just using the ca and pair, so return empty
func (kcm *kubeletCertManager) GetConfFilePath() string {
return ""
}

func (kcm *kubeletCertManager) NotExpired() bool {
Expand All @@ -195,7 +169,7 @@ func (kcm *kubeletCertManager) updateCert(c *tls.Certificate) {
}

// Update do nothing
func (kcm *kubeletCertManager) Update(cfg *config.YurtHubConfiguration) error {
func (kcm *kubeletCertManager) Update(_ *config.YurtHubConfiguration) error {
return nil
}

Expand Down
Loading

0 comments on commit 0c8f3b4

Please sign in to comment.