Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiServerSource serve v1alpha2 #2872

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package main

import (
"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/adapter/v2"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ spec:
served: true
storage: true
- name: v1alpha2
served: false
served: true
storage: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused between this and the comments. Shouldn't this be flipped to storage = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is stored as v1alpha1 still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is lossy so I think we have to do that manual upgrade touch thing. So I am leaving it as v1alpha1 stored until we fix this issue as a whole

202 changes: 65 additions & 137 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,25 @@ package apiserver

import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter"
)

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

type StringList []string
Expand All @@ -57,173 +51,107 @@ func (s *StringList) Decode(value string) error {

type envConfig struct {
adapter.EnvConfig
Name string `envconfig:"NAME" required:"true"`

Mode string `envconfig:"MODE"`
ApiVersion StringList `split_words:"true" required:"true"`
Kind StringList `required:"true"`
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
OwnerApiVersion StringList `envconfig:"OWNER_API_VERSION" required:"true"`
OwnerKind StringList `envconfig:"OWNER_KIND" required:"true"`
Name string `envconfig:"NAME" required:"true"`
}

const (
// RefMode produces payloads of ObjectReference
RefMode = "Ref"
// ResourceMode produces payloads of ResourceEvent
ResourceMode = "Resource"

resourceGroup = "apiserversources.sources.knative.dev"
)

// GVRC is a combination of GroupVersionResource, Controller flag, LabelSelector and OwnerRef
type GVRC struct {
GVR schema.GroupVersionResource
Controller bool
LabelSelector string
OwnerApiVersion string
OwnerKind string
ConfigJson string `envconfig:"K_SOURCE_CONFIG" required:"true"`
}

type apiServerAdapter struct {
namespace string
ce cloudevents.Client
reporter source.StatsReporter
logger *zap.SugaredLogger

gvrcs []GVRC
k8s dynamic.Interface
source string
mode string
delegate eventDelegate
name string
config Config

k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
}

func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client, reporter source.StatsReporter) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)
// ParseAndGetConfigOrDie parses the rest config flags and creates a client or
// dies by calling log.Fatalf.
func ParseAndGetConfigOrDie() *rest.Config {
var (
masterURL = flag.String("master", "",
"The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "",
"Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatal("error building kubeconfig", zap.Error(err))
log.Fatalf("Error building kubeconfig: %v", err)
}

return cfg
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

cfg := ParseAndGetConfigOrDie()
client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatal("error building dynamic client", zap.Error(err))
}

gvrcs := []GVRC(nil)

for i, apiVersion := range env.ApiVersion {
kind := env.Kind[i]
controlled := env.Controller[i]
selector := env.LabelSelector[i]
ownerApiVersion := env.OwnerApiVersion[i]
ownerKind := env.OwnerKind[i]

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatal("error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
gvrcs = append(gvrcs, GVRC{
GVR: gvr,
Controller: controlled,
LabelSelector: selector,
OwnerApiVersion: ownerApiVersion,
OwnerKind: ownerKind,
})
config := Config{}
if err := json.Unmarshal([]byte(env.ConfigJson), &config); err != nil {
panic("failed to create config from json")
}

mode := env.Mode
switch env.Mode {
case ResourceMode, RefMode:
// ok
default:
logger.Warn("unknown mode ", mode)
mode = RefMode
logger.Warn("defaulting mode to ", mode)
}
return &apiServerAdapter{
k8s: client,
ce: ceClient,
source: cfg.Host,
name: env.Name,
config: config,

a := &apiServerAdapter{
k8s: client,
ce: ceClient,
source: cfg.Host,
logger: logger,
gvrcs: gvrcs,
namespace: env.Namespace,
mode: mode,
reporter: reporter,
name: env.Name,
logger: logger,
}
return a
}

type eventDelegate interface {
cache.Store
addControllerWatch(gvr schema.GroupVersionResource)
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
// Local stop channel.
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)
var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}
resyncPeriod := 10 * time.Hour

default:
return fmt.Errorf("mode %q not understood", a.mode)
var delegate cache.Store = &resourceDelegate{
ce: a.ce,
source: a.source,
logger: a.logger,
ref: a.config.EventMode == v1alpha2.ReferenceMode,
}

for _, gvrc := range a.gvrcs {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List, gvrc.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch, gvrc.LabelSelector),
if a.config.ResourceOwner != nil {
if a.config.ResourceOwner.APIVersion != nil && a.config.ResourceOwner.Kind != nil {
a.logger.Infow("will be filtered",
zap.String("APIVersion", *a.config.ResourceOwner.APIVersion),
zap.String("Kind", *a.config.ResourceOwner.Kind))
delegate = &controllerFilter{
apiVersion: *a.config.ResourceOwner.APIVersion,
kind: *a.config.ResourceOwner.Kind,
delegate: delegate,
}
}
}

if gvrc.Controller {
d.addControllerWatch(gvrc.GVR)
}
a.logger.Infof("STARTING -- %#v", a.config)

var store cache.Store
if gvrc.OwnerApiVersion != "" || gvrc.OwnerKind != "" {
store = &controller{
apiVersion: gvrc.OwnerApiVersion,
kind: gvrc.OwnerKind,
delegate: store,
}
} else {
store = d
for _, gvr := range a.config.Resources {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvr).Namespace(a.namespace).List, a.config.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvr).Namespace(a.namespace).Watch, a.config.LabelSelector),
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, store, resyncPeriod)
reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
}

Expand Down
Loading