Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiServerSource serve v1alpha2 #2872

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 25 additions & 2 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,33 @@ limitations under the License.
package main

import (
"knative.dev/eventing/pkg/adapter"
"context"
"fmt"

"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
)

const (
component = "apiserversource"
)

func main() {
adapter.Main("apiserversource", apiserver.NewEnvConfig, apiserver.NewAdapter)
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Start the injection clients and informers.
go func(ctx context.Context) {
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
panic(fmt.Sprintf("Failed to start informers - %s", err))
}
<-ctx.Done()
}(ctx)

adapter.MainWithContext(ctx, component, apiserver.NewEnvConfig, apiserver.NewAdapter)
}
2 changes: 1 addition & 1 deletion config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ spec:
served: true
storage: true
- name: v1alpha2
served: false
served: true
storage: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused between this and the comments. Shouldn't this be flipped to storage = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is stored as v1alpha1 still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is lossy so I think we have to do that manual upgrade touch thing. So I am leaving it as v1alpha1 stored until we fix this issue as a whole

211 changes: 36 additions & 175 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Knative Authors
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,213 +17,74 @@ limitations under the License.
package apiserver

import (
"context"
"flag"
"fmt"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter"
)

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
)

type StringList []string

// Decode splits list of strings separated by '|',
// overriding the default comma separator which is
// a valid label selector character.
func (s *StringList) Decode(value string) error {
*s = strings.Split(value, ";")
return nil
}

type envConfig struct {
adapter.EnvConfig
Name string `envconfig:"NAME" required:"true"`

Mode string `envconfig:"MODE"`
ApiVersion StringList `split_words:"true" required:"true"`
Kind StringList `required:"true"`
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
OwnerApiVersion StringList `envconfig:"OWNER_API_VERSION" required:"true"`
OwnerKind StringList `envconfig:"OWNER_KIND" required:"true"`
Name string `envconfig:"NAME" required:"true"`
}

const (
// RefMode produces payloads of ObjectReference
RefMode = "Ref"
// ResourceMode produces payloads of ResourceEvent
ResourceMode = "Resource"

resourceGroup = "apiserversources.sources.knative.dev"
)

// GVRC is a combination of GroupVersionResource, Controller flag, LabelSelector and OwnerRef
type GVRC struct {
GVR schema.GroupVersionResource
Controller bool
LabelSelector string
OwnerApiVersion string
OwnerKind string
ConfigJson string `envconfig:"K_SOURCE_CONFIG" required:"true"`
}

type apiServerAdapter struct {
namespace string
ce cloudevents.Client
reporter source.StatsReporter
logger *zap.SugaredLogger

gvrcs []GVRC
k8s dynamic.Interface
source string
mode string
delegate eventDelegate
name string
}

func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client, reporter source.StatsReporter) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
logger.Fatal("error building kubeconfig", zap.Error(err))
}

client, err := dynamic.NewForConfig(cfg)
if err != nil {
logger.Fatal("error building dynamic client", zap.Error(err))
}

gvrcs := []GVRC(nil)

for i, apiVersion := range env.ApiVersion {
kind := env.Kind[i]
controlled := env.Controller[i]
selector := env.LabelSelector[i]
ownerApiVersion := env.OwnerApiVersion[i]
ownerKind := env.OwnerKind[i]

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatal("error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
gvrcs = append(gvrcs, GVRC{
GVR: gvr,
Controller: controlled,
LabelSelector: selector,
OwnerApiVersion: ownerApiVersion,
OwnerKind: ownerKind,
})
}
ce cloudevents.Client
logger *zap.SugaredLogger

mode := env.Mode
switch env.Mode {
case ResourceMode, RefMode:
// ok
default:
logger.Warn("unknown mode ", mode)
mode = RefMode
logger.Warn("defaulting mode to ", mode)
}

a := &apiServerAdapter{
k8s: client,
ce: ceClient,
source: cfg.Host,
logger: logger,
gvrcs: gvrcs,
namespace: env.Namespace,
mode: mode,
reporter: reporter,
name: env.Name,
}
return a
}
config Config

type eventDelegate interface {
cache.Store
addControllerWatch(gvr schema.GroupVersionResource)
k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
// Local stop channel.
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)
var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}
resyncPeriod := 10 * time.Hour

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}

default:
return fmt.Errorf("mode %q not understood", a.mode)
var delegate cache.Store = &resourceDelegate{
ce: a.ce,
source: a.source,
logger: a.logger,
ref: a.config.EventMode == v1alpha2.ReferenceMode,
}

for _, gvrc := range a.gvrcs {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List, gvrc.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch, gvrc.LabelSelector),
if a.config.ResourceOwner != nil {
if a.config.ResourceOwner.APIVersion != nil && a.config.ResourceOwner.Kind != nil {
a.logger.Infow("will be filtered",
zap.String("APIVersion", *a.config.ResourceOwner.APIVersion),
zap.String("Kind", *a.config.ResourceOwner.Kind))
delegate = &controllerFilter{
apiVersion: *a.config.ResourceOwner.APIVersion,
kind: *a.config.ResourceOwner.Kind,
delegate: delegate,
}
}
}

if gvrc.Controller {
d.addControllerWatch(gvrc.GVR)
}
a.logger.Infof("STARTING -- %#v", a.config)

var store cache.Store
if gvrc.OwnerApiVersion != "" || gvrc.OwnerKind != "" {
store = &controller{
apiVersion: gvrc.OwnerApiVersion,
kind: gvrc.OwnerKind,
delegate: store,
}
} else {
store = d
for _, r := range a.config.Resources {
lw := &cache.ListWatch{
// TODO: this will not work with cluster scoped resources.
n3wscott marked this conversation as resolved.
Show resolved Hide resolved
ListFunc: asUnstructuredLister(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).List, r.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).Watch, r.LabelSelector),
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, store, resyncPeriod)
reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/adapter/apiserver/adapter_injection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"context"
"encoding/json"

cloudevents "github.com/cloudevents/sdk-go/v2"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
)

func init() {
injection.Default.RegisterClient(withStoredHost)
}

// Key is used as the key for associating information
// with a context.Context.
type Key struct{}

func withStoredHost(ctx context.Context, cfg *rest.Config) context.Context {
return context.WithValue(ctx, Key{}, cfg.Host)
}

// Get extracts the k8s Host from the context.
func Get(ctx context.Context) string {
untyped := ctx.Value(Key{})
if untyped == nil {
logging.FromContext(ctx).Panic(
"Unable to fetch k8s.io/client-go/rest/Config.Host from context.")
}
return untyped.(string)
}

// ---- New

func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

config := Config{}
if err := json.Unmarshal([]byte(env.ConfigJson), &config); err != nil {
panic("failed to create config from json")
}

return &apiServerAdapter{
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,

logger: logger,
}
}
Loading