Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker, Trigger, and Namespace Controllers #788

Merged
merged 191 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
191 commits
Select commit Hold shift + click to select a range
d7ed39b
Initial API outline for Broker.
Harwayne Jan 28, 2019
080f5eb
Metadata is exported
Harwayne Jan 28, 2019
f20a2df
Initial Trigger API.
Harwayne Jan 28, 2019
163474d
Remove old comments.
Harwayne Jan 28, 2019
5f53e05
Register the types.
Harwayne Jan 28, 2019
2f6a03f
Initial work on the Broker controller.
Harwayne Jan 28, 2019
3ee0b8b
Initial work on the Trigger controller.
Harwayne Jan 31, 2019
da9fe36
Initial work on the new model and its corresponding broker controller.
Harwayne Feb 4, 2019
f8cca2e
Initial work on the Trigger controller.
Harwayne Feb 5, 2019
d7ebdcb
Add simple mains.
Harwayne Feb 5, 2019
c5f7a83
Small fixes, still not working.
Harwayne Feb 5, 2019
054c10d
Add the Istio injection annotation.
Harwayne Feb 5, 2019
942b979
Resolve subscriber in the Trigger controller.
Harwayne Feb 5, 2019
afd71af
Standardize on 'Any'.
Harwayne Feb 5, 2019
7a1f69b
Make Broker and Trigger generational
Harwayne Feb 5, 2019
2be56bd
Happy control path.
Harwayne Feb 5, 2019
10acfa3
TargetPort is 8080
Harwayne Feb 5, 2019
2108d83
Watch channels.
Harwayne Feb 5, 2019
6aa7086
Custom service account for filter (needs trigger watch).
Harwayne Feb 5, 2019
3cd236f
Increase logging in the Filter to debug level.
Harwayne Feb 5, 2019
be350a0
Use the default channel provisioner.
Harwayne Feb 6, 2019
0068232
Adding filtering using k8s label selectors
nachocano Feb 6, 2019
b4e2358
Watch namespaces and create a default Broker.
Harwayne Feb 6, 2019
3a9fd94
Merge remote-tracking branch 'broker/broker-new-model' into broker-ne…
nachocano Feb 6, 2019
5ca1b9f
Updating trigger example with filters
nachocano Feb 6, 2019
bd53cda
Broker changes cause the namespace watcher to reconcile.
Harwayne Feb 6, 2019
17889fb
Merge remote-tracking branch 'broker/broker-new-model' into broker-ne…
nachocano Feb 7, 2019
582eb4c
Merge branch 'master' into broker-new-model
Harwayne Feb 7, 2019
4f803a2
Move the Logging package import and format.
Harwayne Feb 7, 2019
eec0f8d
Updating after review comments. Only doing exact header matching.
nachocano Feb 8, 2019
ac3587e
Merge remote-tracking branch 'broker/broker-new-model' into broker-ne…
nachocano Feb 8, 2019
34d42e0
Adding filtering expressions. Currently using LabelSelectors without
nachocano Feb 8, 2019
1788a9b
Merge remote-tracking branch 'upstream/master' into broker-new
nachocano Feb 11, 2019
ad4e6f5
Changes to compile
nachocano Feb 11, 2019
bc519d8
moving filter
nachocano Feb 11, 2019
3f47238
Moving cmds to broker
nachocano Feb 11, 2019
3070157
updating controller
nachocano Feb 11, 2019
0957fda
Moving provider and reconciler to reconciler folder, and merging them…
nachocano Feb 11, 2019
ea53702
Merge branch 'master' into broker-new-model
Harwayne Feb 11, 2019
f0106aa
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 11, 2019
509c8a1
Adding verbs to dispatcher
nachocano Feb 12, 2019
d33308b
Moving back mains to cmd
nachocano Feb 12, 2019
af83baa
Updating config
nachocano Feb 12, 2019
8df699c
Moving resources to reconcilers
nachocano Feb 12, 2019
bd6a8fa
Moving broker folder back where it belongs
nachocano Feb 12, 2019
d2011e6
Merge pull request #5 from nachocano/broker-new
Harwayne Feb 12, 2019
d567412
Merge branch 'broker-new-model' into broker-new
nachocano Feb 12, 2019
3b84c91
Revert "Merge branch 'broker-new-model' into broker-new"
nachocano Feb 12, 2019
1d6c759
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 12, 2019
0e43a7f
Merge remote-tracking branch 'broker/broker-new-model' into filter-exp
nachocano Feb 12, 2019
2df6bdc
Updating headers and headerExpressions to attributes and attributeExp…
nachocano Feb 12, 2019
bfe61c1
Merge branch 'master' into broker-clean-up
Harwayne Feb 12, 2019
22efbe0
Reconcilers notice when mroe things change.
Harwayne Feb 13, 2019
3a3c13c
PR comment.
Harwayne Feb 13, 2019
f834f89
Remove redundant watch.
Harwayne Feb 13, 2019
698a40c
Merge pull request #6 from Harwayne/broker-clean-up
Harwayne Feb 13, 2019
6b817ef
Unit test scaffold.
Harwayne Feb 13, 2019
451b607
Merge pull request #7 from Harwayne/broker-unit-test-scaffold
Harwayne Feb 13, 2019
4097d0a
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 13, 2019
97e642d
Tests for the namespace reconciler.
Harwayne Feb 13, 2019
5e0319f
Merge pull request #8 from Harwayne/ns-test
Harwayne Feb 13, 2019
4bb7fda
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 13, 2019
da32bae
Exact matching for filters without using k8s selectors-based syntax.
nachocano Feb 13, 2019
7bc8b7a
Merge remote-tracking branch 'broker/broker-new-model' into filter-exp
nachocano Feb 13, 2019
8ccfcca
Removing t3 as we don't have set expressions
nachocano Feb 13, 2019
29acf36
Updates after code review
nachocano Feb 13, 2019
e75141c
Attempt to reconcile broker in trigger controller
nachocano Feb 13, 2019
e79b8f7
Broker controller unit tests.
Harwayne Feb 13, 2019
ae665bd
Merge pull request #9 from Harwayne/broker-test
Harwayne Feb 13, 2019
af2f475
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 13, 2019
dbe650a
Merge pull request #4 from nachocano/filter-exp
Harwayne Feb 14, 2019
30f1c74
Deleting and re-creating subscription object as the backing channel
nachocano Feb 14, 2019
f363e02
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 14, 2019
e45d19a
Adding comment
nachocano Feb 14, 2019
1628333
Rename Trigger.Spec.Filter.ExactMatch to Trigger.Spec.Filter.SourceAn…
Harwayne Feb 14, 2019
a6d2650
Merge pull request #11 from Harwayne/filter-test
Harwayne Feb 14, 2019
434bf9d
Merge remote-tracking branch 'broker/broker-new-model' into broker-new
nachocano Feb 14, 2019
cbbd066
Adding event messages
nachocano Feb 14, 2019
c33c569
Using the broker's namespacedNamed as key to the triggers map.
nachocano Feb 14, 2019
66f6f44
Merge pull request #10 from nachocano/broker-new
Harwayne Feb 14, 2019
40ca8da
Adding some tests to trigger
nachocano Feb 14, 2019
650ef47
More UTs
nachocano Feb 15, 2019
d2c1ef8
More UTs
nachocano Feb 15, 2019
ae04007
More UTs
nachocano Feb 15, 2019
6d6c488
Merge pull request #12 from nachocano/broker-new-trigger-ut
Harwayne Feb 15, 2019
9c63dbc
Namespace reconciler automatically creates the Broker Filter's Servic…
Harwayne Feb 15, 2019
3ad193f
Remove no longer needed label.
Harwayne Feb 15, 2019
d18acab
Broker and trigger types UTs
nachocano Feb 15, 2019
2a02ba5
WIP early E2E test
grantr Feb 15, 2019
54cdb3c
Changes after code review. Adding trigger defaults and validation tests
nachocano Feb 19, 2019
11a1734
Cleaner trigger validation
nachocano Feb 19, 2019
d713eeb
Adding dummy tests for broker validation...
nachocano Feb 19, 2019
89cdd58
Merge pull request #14 from nachocano/broker-new-trigger-ut
Harwayne Feb 19, 2019
bd4345a
Merge remote-tracking branch 'grantr/broker-new-model-e2e' into broke…
nachocano Feb 19, 2019
49fd1a0
Compiling and moving things around
nachocano Feb 19, 2019
4cdfd58
Updating test
nachocano Feb 20, 2019
b42d359
More updates
nachocano Feb 20, 2019
29be515
Waiting for potentially multiple contents.
nachocano Feb 20, 2019
1edbf0a
Compiling
nachocano Feb 20, 2019
9c415e6
Fixing compilation
nachocano Feb 20, 2019
ac9b485
Fixing compilation errors. Adding AnnotateNamespace function.
nachocano Feb 20, 2019
2b39168
Adding ns
nachocano Feb 20, 2019
ed9fcbe
Adding logs. Changing to lowercase any otherwise the pod name is invalid
nachocano Feb 20, 2019
4848952
Removing namespace when creating trigger subscriber spec.
nachocano Feb 20, 2019
311e174
Checking if all triggers are ready
nachocano Feb 20, 2019
d361afb
Updated logs
nachocano Feb 20, 2019
d1198a9
Working
nachocano Feb 20, 2019
6bd659c
Adding logs... Still not receiving the events.
nachocano Feb 21, 2019
0d80f9c
More logs
nachocano Feb 21, 2019
cd862ff
Adding build constraint
nachocano Feb 21, 2019
2ea7aa3
Removing unnecessary stuff
nachocano Feb 21, 2019
aa60a8a
Removing ugly structs
nachocano Feb 21, 2019
6471b4f
More logs
nachocano Feb 21, 2019
6bc7f64
Merge pull request #13 from Harwayne/ns-rbac
Harwayne Feb 21, 2019
56ad43c
Merge remote-tracking branch 'broker/broker-new-model' into broker-e2e
nachocano Feb 21, 2019
c4959e2
Removing quotes
nachocano Feb 21, 2019
69ebf18
More logs
nachocano Feb 21, 2019
1dccad8
Adding delay
nachocano Feb 21, 2019
2b67eb8
Listing triggers in receiver when we create it, so that we don't miss
nachocano Feb 22, 2019
a44d6d2
Adding delay to sender pod
nachocano Feb 22, 2019
41ab0a8
Removing withDelay method and just sleep for a while
nachocano Feb 22, 2019
6a832ea
Improve log...
nachocano Feb 22, 2019
446022a
Updates after code review.
nachocano Feb 23, 2019
7f24f14
Adding some more logs and trailing dots.
nachocano Feb 23, 2019
2a3eb19
Merge branch 'master' into broker-controller-runtime
Harwayne Feb 25, 2019
9debaf2
Switch import order.
Harwayne Feb 25, 2019
abf3427
Merge pull request #16 from Harwayne/broker-controller-runtime
Harwayne Feb 25, 2019
a83ca82
Updating comments.
nachocano Feb 25, 2019
fda6550
Updating comments.
nachocano Feb 25, 2019
3ee6343
Merge remote-tracking branch 'broker/broker-new-model' into broker-ge…
nachocano Feb 25, 2019
2aae7eb
Replace the bad errgroup usage with the runnableServer.
Harwayne Feb 25, 2019
e15431d
Merge pull request #18 from Harwayne/codegen
Harwayne Feb 25, 2019
5f201fa
Namespace scoped the Broker Filter's client.
Harwayne Feb 25, 2019
ba726b3
Fix unit tests.
Harwayne Feb 25, 2019
e7ff5b2
Fix yaml
Harwayne Feb 25, 2019
d640c7a
Merge pull request #19 from Harwayne/namespace-rolebinding
Harwayne Feb 25, 2019
dc91427
Merge remote-tracking branch 'broker/broker-new-model' into broker-e2e
nachocano Feb 25, 2019
0ea0402
Merge branch 'broker-e2e' of github.com:nachocano/eventing into broke…
nachocano Feb 25, 2019
dcb8774
Setting source to source not type.
nachocano Feb 25, 2019
df88eab
Merge remote-tracking branch 'broker/broker-new-model' into broker-ge…
nachocano Feb 25, 2019
86f7330
Merge pull request #15 from nachocano/broker-e2e
Harwayne Feb 26, 2019
b6fe6bd
Merge pull request #17 from nachocano/broker-gen-pass
Harwayne Feb 26, 2019
71c5250
Switch from annotating the namespace to labeling it, to match Istio.
Harwayne Feb 26, 2019
b12da7e
Merge pull request #20 from Harwayne/ns-eventing-label
Harwayne Feb 26, 2019
8d6df67
General clean up.
Harwayne Feb 27, 2019
c27d283
Merge pull request #21 from Harwayne/broker-cleanup
Harwayne Feb 27, 2019
d9a607c
Merge branch 'master' into broker-merge-master
Harwayne Feb 27, 2019
77a69f9
Merge pull request #22 from Harwayne/broker-merge-master
Harwayne Feb 27, 2019
edcab3e
Initial docs for the Broker.
Harwayne Mar 6, 2019
26586ab
Fill some of the usage section.
Harwayne Mar 6, 2019
a3e485f
Add instructions for installing the Broker without using Namespace an…
Harwayne Mar 7, 2019
3e5f80e
Merge branch 'master' into broker-merge
Harwayne Mar 8, 2019
4bdde4e
Merge pull request #29 from Harwayne/broker-merge
Harwayne Mar 8, 2019
19f9653
Merge pull request #28 from Harwayne/broker-docs
Harwayne Mar 8, 2019
a367206
Create example_{brokers,triggers}.yaml to document how they can be used.
Harwayne Mar 8, 2019
3447c4d
Merge pull request #30 from Harwayne/broker-cleanup
Harwayne Mar 8, 2019
a0599d6
Fix MD linter issues.
Harwayne Mar 8, 2019
a099404
Merge pull request #31 from Harwayne/md-fix
Harwayne Mar 8, 2019
31f5e52
Fix MD linter issues.
Harwayne Mar 8, 2019
d0bd9dc
Merge pull request #32 from Harwayne/md-fix
Harwayne Mar 8, 2019
ac179cd
Fix MD linter issues.
Harwayne Mar 8, 2019
ac4e3f8
Merge pull request #33 from Harwayne/md-fix
Harwayne Mar 8, 2019
dfa159f
Minor clean up.
Harwayne Mar 8, 2019
17ddb6c
Merge pull request #34 from Harwayne/broker-cleanup
Harwayne Mar 8, 2019
8c779be
Merge branch 'master' into broker-merge
Harwayne Mar 12, 2019
30f9031
Clean up some spots the merge didn't catch.
Harwayne Mar 12, 2019
c6a05c6
Merge pull request #35 from Harwayne/broker-merge
Harwayne Mar 12, 2019
ce0b024
Merge branch 'master' into broker-merge-2
Harwayne Mar 13, 2019
0a34a1a
Merge branch 'master' into broker-new-model
Harwayne Mar 13, 2019
ac38587
Fix the bad merge by replacing logger.BaseLogger with logger.FormatLo…
Harwayne Mar 13, 2019
923dc01
Merge pull request #37 from Harwayne/broker-merge-2
Harwayne Mar 13, 2019
542d4a7
Add extra columns when using kubectl get.
Harwayne Mar 13, 2019
abf2495
MarkBrokerDoesNotExist
Harwayne Mar 13, 2019
bd5252d
Rename extra columns.
Harwayne Mar 13, 2019
215fdd9
Merge pull request #38 from Harwayne/broker-pr-comments
Harwayne Mar 13, 2019
5f9a5d4
Replace the Trigger reconciler's in-memory map with a simple list, ut…
Harwayne Mar 14, 2019
6927fc6
Merge pull request #39 from Harwayne/trigger-list
Harwayne Mar 14, 2019
b9a7d0f
Accept v0.1 and v0.2 cloud events. Adding UTs.
nachocano Mar 14, 2019
0a09e11
Merge branch 'master' into broker-fix-merge
Harwayne Mar 14, 2019
7d8ee9e
Change to resolve.SubscriberSpec().
Harwayne Mar 14, 2019
8691236
Merge remote-tracking branch 'broker/broker-new-model' into cloud_events
nachocano Mar 14, 2019
99e7e45
Remove restClient as it wasn't actually used.
Harwayne Mar 14, 2019
24e5285
Merge pull request #42 from Harwayne/remove-rest-config
Harwayne Mar 14, 2019
24d7603
Only reconcile the Namespace if the specific resource we care about c…
Harwayne Mar 14, 2019
7f7d554
Merge pull request #43 from Harwayne/namespace-resources
Harwayne Mar 14, 2019
c27bf22
lowercase
nachocano Mar 14, 2019
0d169db
Merge pull request #40 from nachocano/cloud_events
Harwayne Mar 14, 2019
37f0e4a
Mark the Broker's Ingress and Filter status condidionts failed when t…
Harwayne Mar 18, 2019
aef69fd
Merge pull request #45 from Harwayne/broker-mark-failed
Harwayne Mar 18, 2019
6abe39c
Do not DeepCopy() in Reconcile(), as controller-runtime already did i…
Harwayne Mar 18, 2019
0a60069
Merge pull request #46 from Harwayne/broker-no-deepcopy
Harwayne Mar 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions b.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: eventing.knative.dev/v1alpha1
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
kind: Broker
metadata:
name: default
spec:
channelTemplate:
provisioner:
apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
name: gcp-pubsub

5 changes: 5 additions & 0 deletions b2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: eventing.knative.dev/v1alpha1
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
kind: Broker
metadata:
name: default
spec: {}
1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/HEAD
1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/LICENSE
1 change: 1 addition & 0 deletions cmd/broker/filter/kodata/VENDOR-LICENSE
69 changes: 69 additions & 0 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2018 The Knative Authors
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main
Harwayne marked this conversation as resolved.
Show resolved Hide resolved

import (
"flag"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/broker"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logConfig.LoggingLevel["provisioner"] = zapcore.DebugLevel
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()

flag.Parse()

logger.Info("Starting...")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Fatal("Error starting up.", zap.Error(err))
}

// Add custom types to this array to get them into the manager's scheme.
eventingv1alpha1.AddToScheme(mgr.GetScheme())

// We are running both the receiver (takes messages in from the cluster and writes them to
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
// PubSub) and the dispatcher (takes messages in PubSub and sends them in cluster) in this
// binary.

_, runnable := broker.New(logger, mgr.GetClient())
err = mgr.Add(runnable)
if err != nil {
logger.Fatal("Unable to start the receivers runnable", zap.Error(err), zap.Any("runnable", runnable))
}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// Start blocks forever.
logger.Info("Manager starting...")
err = mgr.Start(stopCh)
if err != nil {
logger.Fatal("Manager.Start() returned an error", zap.Error(err))
}
}
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/HEAD
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/LICENSE
1 change: 1 addition & 0 deletions cmd/broker/ingress/kodata/VENDOR-LICENSE
144 changes: 144 additions & 0 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2018 The Knative Authors
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"time"

"golang.org/x/sync/errgroup"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/signals"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
)

func main() {
logConfig := provisioners.NewLoggingConfig()
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer logger.Sync()
flag.Parse()

logger.Info("Starting...")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Fatal("Error starting up.", zap.Error(err))
}

// Add custom types to this array to get them into the manager's scheme.
err = eventingv1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
logger.Fatal("Unable to add scheme", zap.Error(err))
}

c := getRequiredEnv("CHANNEL")

h := NewHandler(logger, c)

s := &http.Server{
Addr: ":8080",
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
Handler: h,
ErrorLog: zap.NewStdLog(logger),
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}

// Start both the manager (which notices ConfigMap changes) and the HTTP server.
var g errgroup.Group
g.Go(func() error {
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
// Start blocks forever, so run it in a goroutine.
return mgr.Start(stopCh)
})
logger.Info("Ingress Listening...", zap.String("Address", s.Addr))
g.Go(s.ListenAndServe)
err = g.Wait()
if err != nil {
logger.Error("HTTP server failed.", zap.Error(err))
}

ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
defer cancel()
s.Shutdown(ctx)
}

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}

// http.Handler that takes a single request in and fans it out to N other servers.
type Handler struct {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher
destination string

logger *zap.Logger
}

// NewHandler creates a new fanout.Handler.
func NewHandler(logger *zap.Logger, destination string) *Handler {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
handler := &Handler{
logger: logger,
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),
destination: fmt.Sprintf("http://%s", destination),
}
// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
handler.receiver = provisioners.NewMessageReceiver(createReceiverFunction(handler), logger.Sugar())

return handler
}

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
// TODO Filter.
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
return f.dispatch(m)
}
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
f.receiver.HandleRequest(w, r)
}

// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
err := f.dispatcher.DispatchMessage(msg, f.destination, "", provisioners.DispatchDefaults{})
if err != nil {
f.logger.Error("Error dispatching message", zap.String("destination", f.destination))
}
return err
}
17 changes: 16 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
"flag"
"log"
"net/http"
"os"
"time"

"github.com/knative/eventing/pkg/reconciler/v1alpha1/namespace"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/broker"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -110,7 +114,7 @@ func main() {
eventingv1alpha1.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
if err := schemeFunc(mgr.GetScheme()); err != nil {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
if err = schemeFunc(mgr.GetScheme()); err != nil {
logger.Fatalf("Error adding type to manager's scheme: %v", err)
}
}
Expand All @@ -119,6 +123,9 @@ func main() {
// manager run it.
providers := []ProvideFunc{
subscription.ProvideController,
broker.ProvideController(logger.Desugar(), getRequiredEnv("INGRESS_IMAGE"), getRequiredEnv("INGRESS_SERVICE_ACCOUNT"), getRequiredEnv("FILTER_IMAGE"), getRequiredEnv("FILTER_SERVICE_ACCOUNT")),
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
trigger.ProvideController(logger.Desugar()),
namespace.ProvideController(logger.Desugar()),
}
for _, provider := range providers {
if _, err := provider(mgr); err != nil {
Expand Down Expand Up @@ -188,3 +195,11 @@ func getLoggingConfigOrDie() map[string]string {
return cm
}
}

func getRequiredEnv(envKey string) string {
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}
5 changes: 4 additions & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,17 @@ func main() {
Options: options,
Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{
// For group eventing.knative.dev,
eventingv1alpha1.SchemeGroupVersion.WithKind("Broker"): &eventingv1alpha1.Broker{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &eventingv1alpha1.Channel{},
eventingv1alpha1.SchemeGroupVersion.WithKind("ClusterChannelProvisioner"): &eventingv1alpha1.ClusterChannelProvisioner{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &eventingv1alpha1.Subscription{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{},
},
Logger: logger,
}
if err != nil {
logger.Fatal("Failed to create the admission controller", zap.Error(err))
}
controller.Run(stopCh)
err = controller.Run(stopCh)
logger.Errorw("Webhook stopping", zap.Error(err))
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
}
38 changes: 38 additions & 0 deletions config/300-broker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: brokers.eventing.knative.dev
spec:
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
group: eventing.knative.dev
version: v1alpha1
names:
kind: Broker
plural: brokers
singular: broker
categories:
- all
- knative
- eventing
scope: Namespaced
subresources:
status: {}
additionalPrinterColumns:
- name: Ready
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
type: string
JSONPath: ".status.conditions[?(@.type==\"Ready\")].status"
- name: Reason
type: string
JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason"
Loading