Skip to content

Commit

Permalink
Broker, Trigger, and Namespace Controllers (#788)
Browse files Browse the repository at this point in the history
* Initial API outline for Broker.

* Metadata is exported

* Initial Trigger API.

* Remove old comments.

* Register the types.

* Initial work on the Broker controller.

* Initial work on the Trigger controller.

* Initial work on the new model and its corresponding broker controller.

* Initial work on the Trigger controller.

* Add simple mains.

* Small fixes, still not working.

* Add the Istio injection annotation.

* Resolve subscriber in the Trigger controller.

* Standardize on 'Any'.

* Make Broker and Trigger generational

* Happy control path.

* TargetPort is 8080

* Watch channels.

* Custom service account for filter (needs trigger watch).

* Increase logging in the Filter to debug level.

* Use the default channel provisioner.

* Adding filtering using k8s label selectors

* Watch namespaces and create a default Broker.

* Updating trigger example with filters

* Broker changes cause the namespace watcher to reconcile.

* Move the Logging package import and format.

* Updating after review comments. Only doing exact header matching.
In a follow up PR, I plan to include matching of expressions. It turns
our that they are somewhat more involved than expected.

* Adding filtering expressions. Currently using LabelSelectors without
validations. Using reflection to set some unexposed fields for now.

* Changes to compile

* moving filter

* Moving cmds to broker

* updating controller

* Moving provider and reconciler to reconciler folder, and merging them into one

* Adding verbs to dispatcher

* Moving back mains to cmd

* Updating config

* Moving resources to reconcilers

* Moving broker folder back where it belongs

* Revert "Merge branch 'broker-new-model' into broker-new"

This reverts commit d567412, reversing
changes made to bd6a8fa.

* Updating headers and headerExpressions to attributes and attributeExpressions

* Reconcilers notice when mroe things change.

* PR comment.

* Remove redundant watch.

* Unit test scaffold.

* Tests for the namespace reconciler.

* Exact matching for filters without using k8s selectors-based syntax.
The matching can only be done on source and type for now.

* Removing t3 as we don't have set expressions

* Updates after code review

* Attempt to reconcile broker in trigger controller

* Broker controller unit tests.

* Deleting and re-creating subscription object as the backing channel
spec is immutable.

* Adding comment

* Rename Trigger.Spec.Filter.ExactMatch to Trigger.Spec.Filter.SourceAndType. Add unit tests for the filter binary.

* Adding event messages

* Using the broker's namespacedNamed as key to the triggers map.
With this we allow to reconcile only the triggers that belong to the
particular broker that changed

* Adding some tests to trigger

* More UTs

* More UTs

* More UTs

* Namespace reconciler automatically creates the Broker Filter's ServiceAccount and RBAC.

Sadly this doesn't work well because we have such an old version of controller-runtime that the Filter ends up trying to watch _all_ Triggers, not just those in its namespace. And it only gets permission for the Triggers in its own namespace.

* Remove no longer needed label.

* Broker and trigger types UTs

* WIP early E2E test

* Changes after code review. Adding trigger defaults and validation tests

* Cleaner trigger validation

* Adding dummy tests for broker validation...
Should be implemented

* Compiling and moving things around

* Updating test

* More updates

* Waiting for potentially multiple contents.
Removing check for corev1.Service ready.
Removing grant's great design. Just making it simpler for now.

* Compiling

* Fixing compilation

* Fixing compilation errors. Adding AnnotateNamespace function.

* Adding ns

* Adding logs. Changing to lowercase any otherwise the pod name is invalid

* Removing namespace when creating trigger subscriber spec.
Adding wait time constant for default broker creation.

* Checking if all triggers are ready

* Updated logs

* Working

* Adding logs... Still not receiving the events.

* More logs

* Adding build constraint

* Removing unnecessary stuff

* Removing ugly structs

* More logs

* Removing quotes

* More logs

* Adding delay

* Listing triggers in receiver when we create it, so that we don't miss
any message because the client couldn't find the existing trigger.
This is a problem in the in-memory-channel as it doesn't do retries.
Maybe the right solution is to add that functionality there.

* Adding delay to sender pod

* Removing withDelay method and just sleep for a while

* Improve log...

* Updates after code review.

* Adding some more logs and trailing dots.

* Switch import order.

* Updating comments.

* Updating comments.

* Replace the bad errgroup usage with the runnableServer.

* Namespace scoped the Broker Filter's client.

* Fix unit tests.

* Fix yaml

* Setting source to source not type.
Updating comment.

* Switch from annotating the namespace to labeling it, to match Istio.

* General clean up.

* Initial docs for the Broker.

* Fill some of the usage section.

* Add instructions for installing the Broker without using Namespace annotation.

* Create example_{brokers,triggers}.yaml to document how they can be used.

* Fix MD linter issues.

* Fix MD linter issues.

* Fix MD linter issues.

* Minor clean up.

* Clean up some spots the merge didn't catch.

* Fix the bad merge by replacing logger.BaseLogger with logger.FormatLogger.

* Add extra columns when using kubectl get.

* MarkBrokerDoesNotExist

* Rename extra columns.

* Replace the Trigger reconciler's in-memory map with a simple list, utilizing the fact that Controller Runtime already has the results of the List cached.

* Accept v0.1 and v0.2 cloud events. Adding UTs.
Updating initClient as well, removing unnecessary paging.

* Change to resolve.SubscriberSpec().

* Remove restClient as it wasn't actually used.

* Only reconcile the Namespace if the specific resource we care about changes.

* lowercase

* Mark the Broker's Ingress and Filter status condidionts failed when they have failed.

* Do not DeepCopy() in Reconcile(), as controller-runtime already did it for us.
  • Loading branch information
Harwayne authored and knative-prow-robot committed Mar 18, 2019
1 parent 3fc44ec commit 76603c8
Show file tree
Hide file tree
Showing 63 changed files with 7,913 additions and 29 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/HEAD
1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/LICENSE
1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/VENDOR-LICENSE
85 changes: 85 additions & 0 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"flag"
"log"
"os"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/broker"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
NAMESPACE = "NAMESPACE"
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()

flag.Parse()

logger.Info("Starting...")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{
Namespace: getRequiredEnv(NAMESPACE),
})
if err != nil {
logger.Fatal("Error starting up.", zap.Error(err))
}

if err = eventingv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err))
}

// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
_, runnable := broker.New(logger, mgr.GetClient())
err = mgr.Add(runnable)
if err != nil {
logger.Fatal("Unable to start the receivers runnable", zap.Error(err), zap.Any("runnable", runnable))
}

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()

// Start blocks forever.
logger.Info("Manager starting...")
err = mgr.Start(stopCh)
if err != nil {
logger.Fatal("Manager.Start() returned an error", zap.Error(err))
}
logger.Info("Exiting...")
}

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/HEAD
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/LICENSE
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/VENDOR-LICENSE
158 changes: 158 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2019 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"time"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var (
port = 8080

readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()
flag.Parse()

logger.Info("Starting...")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Fatal("Error starting up.", zap.Error(err))
}

if err = eventingv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatal("Unable to add eventingv1alpha1 scheme", zap.Error(err))
}

c := getRequiredEnv("CHANNEL")

h := NewHandler(logger, c)

s := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: h,
ErrorLog: zap.NewStdLog(logger),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}

err = mgr.Add(&runnableServer{
logger: logger,
s: s,
})
if err != nil {
logger.Fatal("Unable to add runnableServer", zap.Error(err))
}

// Set up signals so we handle the first shutdown signal gracefully.
stopCh := signals.SetupSignalHandler()
// Start blocks forever.
if err = mgr.Start(stopCh); err != nil {
logger.Error("manager.Start() returned an error", zap.Error(err))
}
logger.Info("Exiting...")

ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
defer cancel()
if err = s.Shutdown(ctx); err != nil {
logger.Error("Shutdown returned an error", zap.Error(err))
}
}

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}

// http.Handler that takes a single request in and sends it out to a single destination.
type Handler struct {
receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher
destination string

logger *zap.Logger
}

// NewHandler creates a new ingress.Handler.
func NewHandler(logger *zap.Logger, destination string) *Handler {
handler := &Handler{
logger: logger,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
destination: fmt.Sprintf("http://%s", destination),
}
// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
handler.receiver = provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar())

return handler
}

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
// TODO Filter.
return f.dispatch(m)
}
}

// http.Handler interface.
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
f.receiver.HandleRequest(w, r)
}

// dispatch takes the request, and sends it out the f.destination. If the dispatched
// request returns successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
err := f.dispatcher.DispatchMessage(msg, f.destination, "", provisioners.DispatchDefaults{})
if err != nil {
f.logger.Error("Error dispatching message", zap.String("destination", f.destination))
}
return err
}

// runnableServer is a small wrapper around http.Server so that it matches the manager.Runnable
// interface.
type runnableServer struct {
logger *zap.Logger
s *http.Server
}

func (r *runnableServer) Start(<-chan struct{}) error {
r.logger.Info("Ingress Listening...", zap.String("Address", r.s.Addr))
return r.s.ListenAndServe()
}
33 changes: 27 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"flag"
"log"
"net/http"
"os"
"time"

"github.com/knative/eventing/pkg/reconciler/v1alpha1/broker"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/channel"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/namespace"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -111,7 +115,7 @@ func main() {
eventingv1alpha1.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
if err := schemeFunc(mgr.GetScheme()); err != nil {
if err = schemeFunc(mgr.GetScheme()); err != nil {
logger.Fatalf("Error adding type to manager's scheme: %v", err)
}
}
Expand All @@ -121,17 +125,26 @@ func main() {
providers := []ProvideFunc{
subscription.ProvideController,
channel.ProvideController,
broker.ProvideController(
broker.ReconcilerArgs{
IngressImage: getRequiredEnv("BROKER_INGRESS_IMAGE"),
IngressServiceAccountName: getRequiredEnv("BROKER_INGRESS_SERVICE_ACCOUNT"),
FilterImage: getRequiredEnv("BROKER_FILTER_IMAGE"),
FilterServiceAccountName: getRequiredEnv("BROKER_FILTER_SERVICE_ACCOUNT"),
}),
trigger.ProvideController,
namespace.ProvideController,
}
for _, provider := range providers {
if _, err := provider(mgr, logger.Desugar()); err != nil {
if _, err = provider(mgr, logger.Desugar()); err != nil {
logger.Fatalf("Error adding controller to manager: %v", err)
}
}

// Start the Manager
go func() {
if err := mgr.Start(stopCh); err != nil {
logger.Fatalf("Error starting manager: %v", err)
if localErr := mgr.Start(stopCh); localErr != nil {
logger.Fatalf("Error starting manager: %v", localErr)
}
}()

Expand All @@ -140,8 +153,8 @@ func main() {
http.Handle(metricsScrapePath, promhttp.Handler())
go func() {
logger.Infof("Starting metrics listener at %s", metricsScrapeAddr)
if err := srv.ListenAndServe(); err != nil {
logger.Infof("Httpserver: ListenAndServe() finished with error: %s", err)
if localErr := srv.ListenAndServe(); localErr != nil {
logger.Infof("Httpserver: ListenAndServe() finished with error: %s", localErr)
}
}()

Expand Down Expand Up @@ -190,3 +203,11 @@ func getLoggingConfigOrDie() map[string]string {
return cm
}
}

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}
15 changes: 10 additions & 5 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ func main() {
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.Webhook))

logger.Info("Starting the Eventing Webhook")
logger.Infow("Starting the Eventing Webhook")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
logger.Fatal("Failed to get in cluster config", zap.Error(err))
logger.Fatalw("Failed to get in cluster config", zap.Error(err))
}

kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
logger.Fatal("Failed to get the client set", zap.Error(err))
logger.Fatalw("Failed to get the client set", zap.Error(err))
}

// Watch the logging config map and dynamically update logging levels.
Expand Down Expand Up @@ -96,14 +96,19 @@ func main() {
Options: options,
Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{
// For group eventing.knative.dev,
eventingv1alpha1.SchemeGroupVersion.WithKind("Broker"): &eventingv1alpha1.Broker{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &eventingv1alpha1.Channel{},
eventingv1alpha1.SchemeGroupVersion.WithKind("ClusterChannelProvisioner"): &eventingv1alpha1.ClusterChannelProvisioner{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &eventingv1alpha1.Subscription{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{},
},
Logger: logger,
}
if err != nil {
logger.Fatal("Failed to create the admission controller", zap.Error(err))
logger.Fatalw("Failed to create the admission controller", zap.Error(err))
}
controller.Run(stopCh)
if err = controller.Run(stopCh); err != nil {
logger.Errorw("controller.Run() failed", zap.Error(err))
}
logger.Infow("Webhook stopping")
}
Loading

0 comments on commit 76603c8

Please sign in to comment.