Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiServerSource serve v1alpha2 #2872

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package main

import (
"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/adapter/v2"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ spec:
served: true
storage: true
- name: v1alpha2
served: false
served: true
storage: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused between this and the comments. Shouldn't this be flipped to storage = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is stored as v1alpha1 still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is lossy so I think we have to do that manual upgrade touch thing. So I am leaving it as v1alpha1 stored until we fix this issue as a whole

167 changes: 37 additions & 130 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@ package apiserver
import (
"context"
"flag"
"fmt"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/pkg/logging"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter"
"knative.dev/eventing/pkg/adapter/v2"
)

var (
Expand All @@ -57,54 +54,28 @@ func (s *StringList) Decode(value string) error {

type envConfig struct {
adapter.EnvConfig
Name string `envconfig:"NAME" required:"true"`

Mode string `envconfig:"MODE"`
ApiVersion StringList `split_words:"true" required:"true"`
Kind StringList `required:"true"`
Controller []bool `required:"true"`
LabelSelector StringList `envconfig:"SELECTOR" required:"true"`
OwnerApiVersion StringList `envconfig:"OWNER_API_VERSION" required:"true"`
OwnerKind StringList `envconfig:"OWNER_KIND" required:"true"`
Name string `envconfig:"NAME" required:"true"`
}

const (
// RefMode produces payloads of ObjectReference
RefMode = "Ref"
// ResourceMode produces payloads of ResourceEvent
ResourceMode = "Resource"

resourceGroup = "apiserversources.sources.knative.dev"
)

// GVRC is a combination of GroupVersionResource, Controller flag, LabelSelector and OwnerRef
type GVRC struct {
GVR schema.GroupVersionResource
Controller bool
LabelSelector string
OwnerApiVersion string
OwnerKind string
Config Config `envconfig:"K_SOURCE_CONFIG" required:"true"`
}

type apiServerAdapter struct {
namespace string
ce cloudevents.Client
reporter source.StatsReporter
logger *zap.SugaredLogger

gvrcs []GVRC
k8s dynamic.Interface
source string
mode string
delegate eventDelegate
name string
config Config

k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
}

func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client, reporter source.StatsReporter) adapter.Adapter {
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

Expand All @@ -118,112 +89,48 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
logger.Fatal("error building dynamic client", zap.Error(err))
}

gvrcs := []GVRC(nil)

for i, apiVersion := range env.ApiVersion {
kind := env.Kind[i]
controlled := env.Controller[i]
selector := env.LabelSelector[i]
ownerApiVersion := env.OwnerApiVersion[i]
ownerKind := env.OwnerKind[i]

gv, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
logger.Fatal("error parsing APIVersion", zap.Error(err))
}
// TODO: pass down the resource and the kind so we do not have to guess.
gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version})
gvrcs = append(gvrcs, GVRC{
GVR: gvr,
Controller: controlled,
LabelSelector: selector,
OwnerApiVersion: ownerApiVersion,
OwnerKind: ownerKind,
})
}

mode := env.Mode
switch env.Mode {
case ResourceMode, RefMode:
// ok
default:
logger.Warn("unknown mode ", mode)
mode = RefMode
logger.Warn("defaulting mode to ", mode)
}

a := &apiServerAdapter{
k8s: client,
ce: ceClient,
source: cfg.Host,
logger: logger,
gvrcs: gvrcs,
namespace: env.Namespace,
mode: mode,
reporter: reporter,
name: env.Name,
k8s: client,
ce: ceClient,
source: cfg.Host,
name: env.Name,
config: env.Config,

logger: logger,
}
return a
}

type eventDelegate interface {
cache.Store
addControllerWatch(gvr schema.GroupVersionResource)
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
// Local stop channel.
stop := make(chan struct{})

resyncPeriod := time.Duration(10 * time.Hour)
var d eventDelegate
switch a.mode {
case ResourceMode:
d = &resource{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}
resyncPeriod := 10 * time.Hour

case RefMode:
d = &ref{
ce: a.ce,
source: a.source,
logger: a.logger,
reporter: a.reporter,
namespace: a.namespace,
name: a.name,
}

default:
return fmt.Errorf("mode %q not understood", a.mode)
var delegate cache.Store = &resourceDelegate{
ce: a.ce,
source: a.source,
logger: a.logger,
ref: a.config.EventMode == v1alpha2.ReferenceMode,
}

for _, gvrc := range a.gvrcs {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List, gvrc.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch, gvrc.LabelSelector),
}

if gvrc.Controller {
d.addControllerWatch(gvrc.GVR)
if a.config.ResourceOwner != nil {
if a.config.ResourceOwner.APIVersion != nil && a.config.ResourceOwner.Kind != nil {
delegate = &controllerFilter{
apiVersion: *a.config.ResourceOwner.APIVersion,
kind: *a.config.ResourceOwner.Kind,
delegate: delegate,
}
}
}

var store cache.Store
if gvrc.OwnerApiVersion != "" || gvrc.OwnerKind != "" {
store = &controller{
apiVersion: gvrc.OwnerApiVersion,
kind: gvrc.OwnerKind,
delegate: store,
}
} else {
store = d
for _, gvr := range a.config.Resources {
lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(a.k8s.Resource(gvr).Namespace(a.namespace).List, a.config.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvr).Namespace(a.namespace).Watch, a.config.LabelSelector),
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, store, resyncPeriod)
reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
}

Expand Down
Loading