Skip to content

Commit

Permalink
Merge pull request #715 from clusterpedia-io/pod_subresource
Browse files Browse the repository at this point in the history
support pod subresources
  • Loading branch information
Iceber authored Dec 26, 2024
2 parents 03d8a3a + 49eb28a commit 8f0eb33
Show file tree
Hide file tree
Showing 31 changed files with 10,208 additions and 73 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/registry/clusterpedia/resources/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *REST) GetSingularName() string {

// ConnectMethods returns the list of HTTP methods handled by Connect
func (r *REST) ConnectMethods() []string {
return []string{"GET"}
return []string{"GET", "POST"}
}

// NewConnectOptions returns an empty options object that will be used to pass options to the Connect method.
Expand Down
14 changes: 13 additions & 1 deletion pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery"
podrest "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest/pod"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
)
Expand Down Expand Up @@ -137,7 +138,18 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)

_ = NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())
controller := NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())

for _, rest := range podrest.GetPodSubresourceRESTs(controller) {
restManager.preRegisterSubresource(subresource{
gr: schema.GroupResource{Group: "", Resource: "pods"},
kind: "Pod",
namespaced: true,

name: rest.Subresource(),
connecter: rest,
})
}
return genericserver, nil
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/kubeapiserver/clusterresource_controller.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kubeapiserver

import (
"context"
"errors"
"net/http"
"reflect"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
Expand Down Expand Up @@ -102,6 +107,66 @@ func (c *ClusterResourceController) removeCluster(name string) {
delete(c.clusterresources, name)
}

func (c *ClusterResourceController) resolveClusterRestConfig(name string) (*rest.Config, error) {
cluster, err := c.clusterLister.Get(name)
if err != nil {
return nil, err
}

if len(cluster.Spec.Kubeconfig) != 0 {
clientconfig, err := clientcmd.NewClientConfigFromBytes(cluster.Spec.Kubeconfig)
if err != nil {
return nil, err
}
return clientconfig.ClientConfig()
}

if cluster.Spec.APIServer == "" {
return nil, errors.New("Cluster APIServer Endpoint is required")
}

if len(cluster.Spec.TokenData) == 0 &&
(len(cluster.Spec.CertData) == 0 || len(cluster.Spec.KeyData) == 0) {
return nil, errors.New("Cluster APIServer's Token or Cert is required")
}

config := &rest.Config{
Host: cluster.Spec.APIServer,
}

if len(cluster.Spec.CAData) != 0 {
config.TLSClientConfig.CAData = cluster.Spec.CAData
} else {
config.TLSClientConfig.Insecure = true
}

if len(cluster.Spec.CertData) != 0 && len(cluster.Spec.KeyData) != 0 {
config.TLSClientConfig.CertData = cluster.Spec.CertData
config.TLSClientConfig.KeyData = cluster.Spec.KeyData
}

if len(cluster.Spec.TokenData) != 0 {
config.BearerToken = string(cluster.Spec.TokenData)
}
return config, nil
}

func (c *ClusterResourceController) GetClusterDefaultConnection(ctx context.Context, name string) (string, http.RoundTripper, error) {
config, err := c.resolveClusterRestConfig(name)
if err != nil {
return "", nil, err
}
transport, err := rest.TransportFor(config)
if err != nil {
return "", nil, err
}
return config.Host, transport, nil
}

func (c *ClusterResourceController) GetClusterConnectionWithTLSConfig(ctx context.Context, name string) (string, http.RoundTripper, error) {
return "", nil, errors.New("CetClusterConnectionWithTLSConfig not implemented")
}

type resourceInfo struct {
Namespaced bool
Kind string
Expand Down
99 changes: 62 additions & 37 deletions pkg/kubeapiserver/resource_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubeapiserver

import (
"context"
"fmt"
"net/http"
"time"
Expand All @@ -12,12 +13,14 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
registryrest "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/warning"
"k8s.io/klog/v2"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
clusterlister "github.com/clusterpedia-io/clusterpedia/pkg/generated/listers/cluster/v1alpha2"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/discovery"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcerest"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
)

Expand Down Expand Up @@ -76,8 +79,9 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

info := r.rest.GetRESTResourceInfo(gvr)
if info.Empty() {
resource, reqScope, storage, existed := r.rest.GetResourceREST(gvr, requestInfo.Subresource)
if !existed {
// TODO(iceber): Add the specialized error for subresources
err := fmt.Errorf("not found request scope or resource storage")
klog.ErrorS(err, "Failed to handle resource request", "resource", gvr)
responsewriters.ErrorNegotiated(
Expand All @@ -87,58 +91,79 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

resource, reqScope, storage := info.APIResource, info.RequestScope, info.Storage
if requestInfo.Namespace != "" && !resource.Namespaced {
r.delegate.ServeHTTP(w, req)
return
}

// Check the health of the cluster
if cluster != nil {
var msg string
healthyCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterv1alpha2.ClusterHealthyCondition)
switch {
case healthyCondition == nil:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate.", clusterName)
case healthyCondition.Status != metav1.ConditionTrue:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate, reason: %s", clusterName, healthyCondition.Reason)
checkClusterAndWarning(req.Context(), cluster)

switch storage := storage.(type) {
case *resourcerest.RESTStorage:
var handler http.Handler
switch requestInfo.Verb {
case "get":
if clusterName == "" {
responsewriters.ErrorNegotiated(
apierrors.NewBadRequest("please specify the cluster name when using the resource name to get a specific resource."),
Codecs, gvr.GroupVersion(), w, req,
)
return
}
handler = handlers.GetResource(storage, reqScope)
case "list":
handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
case "watch":
handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Codecs, gvr.GroupVersion(), w, req,
)
return
}
/*
TODO(scyda): Determine the synchronization status of a specific resource
handler.ServeHTTP(w, req)

for _, resource := range c.Status.Resources {
case registryrest.Connecter:
var supported bool
for _, method := range storage.ConnectMethods() {
if req.Method == method {
supported = true
}
*/

if msg != "" {
warning.AddWarning(req.Context(), "", msg)
}
}

var handler http.Handler
switch requestInfo.Verb {
case "get":
if clusterName == "" {
if !supported {
responsewriters.ErrorNegotiated(
apierrors.NewBadRequest("please specify the cluster name when using the resource name to get a specific resource."),
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Codecs, gvr.GroupVersion(), w, req,
)
return
}

handler = handlers.GetResource(storage, reqScope)
case "list":
handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
case "watch":
handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Codecs, gvr.GroupVersion(), w, req,
)
handlers.ConnectResource(storage, reqScope, nil, "", true).ServeHTTP(w, req)
}
}

if handler != nil {
handler.ServeHTTP(w, req)
func checkClusterAndWarning(ctx context.Context, cluster *clusterv1alpha2.PediaCluster) {
if cluster == nil {
return
}
var msg string
healthyCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterv1alpha2.ClusterHealthyCondition)
switch {
case healthyCondition == nil:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate.", cluster.Name)
case healthyCondition.Status != metav1.ConditionTrue:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate, reason: %s", cluster.Name, healthyCondition.Reason)
}
/*
TODO(scyda): Determine the synchronization status of a specific resource
for _, resource := range c.Status.Resources {
}
*/

if msg != "" {
warning.AddWarning(ctx, "", msg)
}
}
Loading

0 comments on commit 8f0eb33

Please sign in to comment.