Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imc-dispatcher starts a TLS server, accepts host based routing on http receiver and path based routing on https receiver #6954

Merged
merged 5 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,32 @@ roleRef:
kind: ClusterRole
name: imc-dispatcher
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: imc-dispatcher-tls-role-binding
namespace: knative-eventing
subjects:
- kind: ServiceAccount
name: imc-dispatcher
apiGroup: ""
roleRef:
kind: Role
name: imc-dispatcher-tls-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: imc-dispatcher-tls-role
namespace: knative-eventing
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: https-dispatcher
port: 443
protocol: TCP
targetPort: 8443
- name: http-metrics
port: 9090
targetPort: 9090
3 changes: 3 additions & 0 deletions config/channels/in-memory-channel/deployments/dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ spec:
- containerPort: 8080
name: http
protocol: TCP
- containerPort: 8443
name: https
protocol: TCP
- containerPort: 9090
name: metrics
securityContext:
Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type FanoutMessageHandler struct {

// NewMessageHandler creates a new fanout.MessageHandler.

func NewFanoutMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter) (*FanoutMessageHandler, error) {
func NewFanoutMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter, receiverOpts ...channel.MessageReceiverOptions) (*FanoutMessageHandler, error) {
handler := &FanoutMessageHandler{
logger: logger,
dispatcher: messageDispatcher,
Expand All @@ -101,7 +101,7 @@ func NewFanoutMessageHandler(logger *zap.Logger, messageDispatcher channel.Messa
copy(handler.subscriptions, config.Subscriptions)
// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger, reporter)
receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger, reporter, receiverOpts...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
t.Fatal(err)
}

calledChan := make(chan bool, 1)
recvOptionFunc := func(*channel.MessageReceiver) error {
calledChan <- true
return nil
}

h, err := NewFanoutMessageHandler(
logger,
channel.NewMessageDispatcher(logger),
Expand All @@ -333,7 +339,9 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
AsyncHandler: async,
},
reporter,
recvOptionFunc,
)
<-calledChan
if err != nil {
t.Fatal("NewHandler failed =", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/channel/multichannelfanout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ type ChannelConfig struct {
Namespace string
Name string
HostName string
Path string
FanoutConfig fanout.Config
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ type MultiChannelMessageHandler interface {
CountChannelHandlers() int
}

// makeChannelKeyFromConfig creates the channel key for a given channelConfig. It is a helper around
// MakeChannelKey.
func makeChannelKeyFromConfig(config ChannelConfig) string {
return config.HostName
}

// Handler is an http.Handler that introspects the incoming request to determine what Channel it is
// on, and then delegates handling of that request to the single fanout.FanoutMessageHandler corresponding to
// that Channel.
Expand All @@ -70,21 +64,26 @@ func NewMessageHandler(_ context.Context, logger *zap.Logger, messageDispatcher

// NewMessageHandlerWithConfig creates a new Handler with the specified configuration. This is really meant for tests
// where you want to apply a fully specified configuration for tests. Reconciler operates on single channel at a time.
func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageDispatcher channel.MessageDispatcher, conf Config, reporter channel.StatsReporter) (*MessageHandler, error) {
func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageDispatcher channel.MessageDispatcher, conf Config, reporter channel.StatsReporter, recvOptions ...channel.MessageReceiverOptions) (*MessageHandler, error) {
handlers := make(map[string]fanout.MessageHandler, len(conf.ChannelConfigs))

for _, cc := range conf.ChannelConfigs {
key := makeChannelKeyFromConfig(cc)
handler, err := fanout.NewFanoutMessageHandler(logger, messageDispatcher, cc.FanoutConfig, reporter)
if err != nil {
logger.Error("Failed creating new fanout handler.", zap.Error(err))
return nil, err
keys := []string{cc.HostName, cc.Path}
for _, key := range keys {
if key == "" {
continue
}
handler, err := fanout.NewFanoutMessageHandler(logger, messageDispatcher, cc.FanoutConfig, reporter, recvOptions...)
if err != nil {
logger.Error("Failed creating new fanout handler.", zap.Error(err))
return nil, err
}
if _, present := handlers[key]; present {
logger.Error("Duplicate channel key", zap.String("channelKey", key))
return nil, fmt.Errorf("duplicate channel key: %v", key)
}
handlers[key] = handler
}
if _, present := handlers[key]; present {
logger.Error("Duplicate channel key", zap.String("channelKey", key))
return nil, fmt.Errorf("duplicate channel key: %v", key)
}
handlers[key] = handler
}
return &MessageHandler{
logger: logger,
Expand Down Expand Up @@ -120,6 +119,17 @@ func (h *MessageHandler) CountChannelHandlers() int {
// request's channel key.
func (h *MessageHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
channelKey := request.Host

if request.URL.Path != "/" {
channelRef, err := channel.ParseChannelFromPath(request.URL.Path)
if err != nil {
h.logger.Error("unable to retrieve channel from path")
response.WriteHeader(http.StatusBadRequest)
return
}
channelKey = fmt.Sprintf("%s/%s", channelRef.Namespace, channelRef.Name)
}

fh := h.GetChannelHandler(channelKey)
if fh == nil {
h.logger.Info("Unable to find a handler for request", zap.String("channelKey", channelKey))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,63 @@ func TestServeHTTPMessageHandler(t *testing.T) {
config Config
eventID *string
respStatusCode int
key string
hostKey string
pathKey string
recvOptions []channel.MessageReceiverOptions
expectedStatusCode int
}{
"non-existent channel": {
"non-existent channel host based": {
config: Config{},
key: "default.does-not-exist",
hostKey: "default.does-not-exist",
expectedStatusCode: http.StatusInternalServerError,
},
"non-existent channel path based": {
hostKey: "first-channel.default",
config: Config{
ChannelConfigs: []ChannelConfig{
{
Namespace: "ns",
Name: "name",
HostName: "first-channel.default",
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
Reply: replaceDomain,
},
},
},
},
},
},
pathKey: "some-namespace/wrong-channel",
expectedStatusCode: http.StatusInternalServerError,
},
"bad host": {
config: Config{},
key: "no-dot",
hostKey: "no-dot",
expectedStatusCode: http.StatusInternalServerError,
},
"malformed path": {
hostKey: "first-channel.default",
config: Config{
ChannelConfigs: []ChannelConfig{
{
Namespace: "ns",
Name: "name",
HostName: "first-channel.default",
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
Reply: replaceDomain,
},
},
},
},
},
},
pathKey: "missing-forward-slash",
expectedStatusCode: http.StatusBadRequest,
},
"pass through failure": {
config: Config{
ChannelConfigs: []ChannelConfig{
Expand All @@ -153,7 +197,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
},
},
respStatusCode: http.StatusInternalServerError,
key: "first-channel.default",
hostKey: "first-channel.default",
expectedStatusCode: http.StatusInternalServerError,
},
"invalid event": {
Expand Down Expand Up @@ -188,7 +232,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
},
eventID: ptr.String(""), // invalid id
respStatusCode: http.StatusOK,
key: "second-channel.default",
hostKey: "second-channel.default",
expectedStatusCode: http.StatusBadRequest,
},
"choose channel": {
Expand Down Expand Up @@ -222,8 +266,33 @@ func TestServeHTTPMessageHandler(t *testing.T) {
},
},
respStatusCode: http.StatusOK,
key: "second-channel.default",
hostKey: "second-channel.default",
expectedStatusCode: http.StatusAccepted,
},
"path based": {
config: Config{
ChannelConfigs: []ChannelConfig{
{

Namespace: "ns",
Name: "name",
HostName: "first-channel.default",
Path: "default/first-channel",
FanoutConfig: fanout.Config{
Subscriptions: []fanout.Subscription{
{
Subscriber: replaceDomain,
},
},
},
},
},
},
respStatusCode: http.StatusOK,
hostKey: "host.should.be.ignored",
pathKey: "default/first-channel",
expectedStatusCode: http.StatusAccepted,
recvOptions: []channel.MessageReceiverOptions{channel.ResolveMessageChannelFromPath(channel.ParseChannelFromPath)},
},
}
for n, tc := range testCases {
Expand All @@ -236,7 +305,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
reporter := channel.NewStatsReporter("testcontainer", "testpod")
h, err := NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), tc.config, reporter)
h, err := NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), tc.config, reporter, tc.recvOptions...)
if err != nil {
t.Fatalf("Unexpected NewHandler error: '%v'", err)
}
Expand All @@ -255,7 +324,7 @@ func TestServeHTTPMessageHandler(t *testing.T) {
event.SetSource("testsource")
event.SetData(cloudevents.ApplicationJSON, "{}")

req := httptest.NewRequest(http.MethodPost, "http://"+tc.key+"/", nil)
req := httptest.NewRequest(http.MethodPost, "http://"+tc.hostKey+"/"+tc.pathKey, nil)
err = bindingshttp.WriteRequest(ctx, binding.ToMessage(&event), req)
if err != nil {
t.Fatal(err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/eventingtls/eventingtls.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,16 @@ func GetCertificateFromSecret(ctx context.Context, informer coreinformersv1.Secr
},
})

// Store the current value so that we have certHolder initialized.
// If the Secret already exists, store its value
firstValue, err := informer.Lister().Secrets(secret.Namespace).Get(secret.Name)
if err != nil {
// Try to get the secret from the API Server when the lister failed.
firstValue, err = kube.CoreV1().Secrets(secret.Namespace).Get(ctx, secret.Name, metav1.GetOptions{})
if err != nil {
logger.Fatal(err.Error())
}
// Ignore any errors as the Secret may not be available yet.
firstValue, _ = kube.CoreV1().Secrets(secret.Namespace).Get(ctx, secret.Name, metav1.GetOptions{})
}
if firstValue != nil {
store(firstValue)
}
store(firstValue)

return func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := certHolder.Load()
Expand Down
4 changes: 4 additions & 0 deletions pkg/inmemorychannel/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (d *InMemoryMessageDispatcher) GetHandler(ctx context.Context) multichannel
return d.handler
}

func (d *InMemoryMessageDispatcher) GetReceiver() kncloudevents.HTTPMessageReceiver {
return *d.httpBindingsReceiver
}

// Start starts the inmemory dispatcher's message processing.
// This is a blocking call.
func (d *InMemoryMessageDispatcher) Start(ctx context.Context) error {
Expand Down
Loading