Skip to content

Commit

Permalink
Refactor imc dispatcher (#4359)
Browse files Browse the repository at this point in the history
* checkpoint

* surgical strike

* new editor, whodis?

* rebase, fix ut

* update codegen

* ut, fix slice handling. duh

* cleanups

* ut, fix removals

* goimports

* remove unnecessary file added by mistake

* remove old cruft

* add tracking issues as per pr review
  • Loading branch information
vaikas authored Oct 26, 2020
1 parent 7734430 commit 8dca90e
Show file tree
Hide file tree
Showing 15 changed files with 667 additions and 681 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ rules:
verbs:
- create
- patch
# Updates the status to reflect subscribable status.
# Updates the finalizer so we can remove our handlers when channel is deleted
- apiGroups:
- messaging.knative.dev
resources:
- inmemorychannels/status
- inmemorychannels/finalizers
- inmemorychannels
verbs:
- update
- patch
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
90 changes: 65 additions & 25 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
nethttp "net/http"
"net/url"
"sync"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -57,9 +58,24 @@ type Config struct {
AsyncHandler bool `json:"asyncHandler,omitempty"`
}

// MessageHandler is an http.Handler but has methods for managing
// the fanout Subscriptions. Get/Set methods are synchronized, and
// GetSubscriptions returns a copy of the Subscriptions, so you can
// use it to fetch a snapshot and use it after that safely.
type MessageHandler interface {
nethttp.Handler
SetSubscriptions(ctx context.Context, subs []Subscription)
GetSubscriptions(ctx context.Context) []Subscription
}

// MessageHandler is a http.Handler that takes a single request in and fans it out to N other servers.
type MessageHandler struct {
config Config
type FanoutMessageHandler struct {
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
asyncHandler bool

subscriptionsMutex sync.RWMutex
subscriptions []Subscription

receiver *channel.MessageReceiver
dispatcher channel.MessageDispatcher
Expand All @@ -73,13 +89,18 @@ type MessageHandler struct {
}

// NewMessageHandler creates a new fanout.MessageHandler.
func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter) (*MessageHandler, error) {
handler := &MessageHandler{
logger: logger,
config: config,
dispatcher: messageDispatcher,
timeout: defaultTimeout,
reporter: reporter,

func NewFanoutMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter) (*FanoutMessageHandler, error) {
handler := &FanoutMessageHandler{
logger: logger,
dispatcher: messageDispatcher,
timeout: defaultTimeout,
reporter: reporter,
asyncHandler: config.AsyncHandler,
}
handler.subscriptions = make([]Subscription, len(config.Subscriptions))
for i := range config.Subscriptions {
handler.subscriptions[i] = config.Subscriptions[i]
}
// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
Expand All @@ -105,6 +126,8 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript

var deadLetter *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
// TODO: Bug(?) this does not seem to support refing the Ref field.
// https://github.com/knative/eventing/issues/4376
deadLetter = sub.Delivery.DeadLetterSink.URI.URL()
}

Expand All @@ -120,10 +143,28 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript
return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil
}

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
if f.config.AsyncHandler {
func (f *FanoutMessageHandler) SetSubscriptions(ctx context.Context, subs []Subscription) {
f.subscriptionsMutex.Lock()
defer f.subscriptionsMutex.Unlock()
s := make([]Subscription, len(subs))
copy(s, subs)
f.subscriptions = s
}

func (f *FanoutMessageHandler) GetSubscriptions(ctx context.Context) []Subscription {
f.subscriptionsMutex.RLock()
defer f.subscriptionsMutex.RUnlock()
ret := make([]Subscription, len(f.subscriptions))
copy(ret, f.subscriptions)
return ret
}

func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
if f.asyncHandler {
return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
if len(f.config.Subscriptions) == 0 {
subs := f.GetSubscriptions(ctx)

if len(subs) == 0 {
// Nothing to do here, finish the message and return
_ = message.Finish(nil)
return nil
Expand All @@ -149,14 +190,15 @@ func createMessageReceiverFunction(f *MessageHandler) func(context.Context, chan
// Run async dispatch with background context.
ctx = trace.NewContext(context.Background(), s)
// Any returned error is already logged in f.dispatch().
dispatchResultForFanout := f.dispatch(ctx, m, h)
dispatchResultForFanout := f.dispatch(ctx, subs, m, h)
_ = parseFanoutResultAndReportMetrics(dispatchResultForFanout, *r, *args)
}(bufferedMessage, additionalHeaders, parentSpan, &f.reporter, &reportArgs)
return nil
}
}
return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
if len(f.config.Subscriptions) == 0 {
subs := f.GetSubscriptions(ctx)
if len(subs) == 0 {
// Nothing to do here, finish the message and return
_ = message.Finish(nil)
return nil
Expand All @@ -175,12 +217,12 @@ func createMessageReceiverFunction(f *MessageHandler) func(context.Context, chan
reportArgs := channel.ReportArgs{}
reportArgs.EventType = string(te)
reportArgs.Ns = ref.Namespace
dispatchResultForFanout := f.dispatch(ctx, bufferedMessage, additionalHeaders)
dispatchResultForFanout := f.dispatch(ctx, subs, bufferedMessage, additionalHeaders)
return parseFanoutResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
}

func (f *MessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
f.receiver.ServeHTTP(response, request)
}

Expand All @@ -203,16 +245,14 @@ func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.S
return err
}

// dispatch takes the event, fans it out to each subscription in f.config. If all the fanned out
// dispatch takes the event, fans it out to each subscription in subs. If all the fanned out
// events return successfully, then return nil. Else, return an error.
func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult {
subs := len(f.config.Subscriptions)

func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult {
// Bind the lifecycle of the buffered message to the number of subs
bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, subs)
bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs))

errorCh := make(chan dispatchResult, subs)
for _, sub := range f.config.Subscriptions {
errorCh := make(chan dispatchResult, len(subs))
for _, sub := range subs {
go func(s Subscription) {
dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub}
Expand All @@ -226,7 +266,7 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M
ResponseCode: channel.NoResponse,
},
}
for range f.config.Subscriptions {
for range subs {
select {
case dispatchResult := <-errorCh:
if dispatchResult.info != nil {
Expand Down Expand Up @@ -257,7 +297,7 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M

// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) (*channel.DispatchExecutionInfo, error) {
func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) (*channel.DispatchExecutionInfo, error) {
return f.dispatcher.DispatchMessageWithRetries(
ctx,
message,
Expand Down
74 changes: 73 additions & 1 deletion pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
pkgduckv1 "knative.dev/pkg/apis/duck/v1"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
bindingshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
Expand All @@ -43,6 +49,72 @@ var (
replaceReplier = apis.HTTP("replaceReplier").URL()
)

func TestSubscriberSpecToFanoutConfig(t *testing.T) {
three := int32(3)
linear := eventingduckv1.BackoffPolicyLinear
delay := "PT1S"
spec := &eventingduckv1.SubscriberSpec{
SubscriberURI: apis.HTTP("subscriber.example.com"),
ReplyURI: apis.HTTP("reply.example.com"),
Delivery: &eventingduckv1.DeliverySpec{
DeadLetterSink: &pkgduckv1.Destination{
Ref: &pkgduckv1.KReference{
Kind: "mykind",
Namespace: "mynamespace",
Name: "myname",
APIVersion: "myapiversion",
},
URI: apis.HTTP("dls.example.com"),
},
Retry: &three,
BackoffPolicy: &linear,
BackoffDelay: &delay,
},
}
want := Subscription{
Subscriber: apis.HTTP("subscriber.example.com").URL(),
Reply: apis.HTTP("reply.example.com").URL(),
DeadLetter: apis.HTTP("dls.example.com").URL(),
RetryConfig: &kncloudevents.RetryConfig{
RetryMax: 3,
},
}
got, err := SubscriberSpecToFanoutConfig(*spec)
if err != nil {
t.Error("Failed to convert using SubscriberSpecToFanoutConfig:", err)
}
if diff := cmp.Diff(&want, got, cmpopts.IgnoreFields(kncloudevents.RetryConfig{}, "Backoff", "CheckRetry")); diff != "" {
t.Error("Unexpected diff", diff)
}
}

func TestGetSetSubscriptions(t *testing.T) {
h := &FanoutMessageHandler{subscriptions: make([]Subscription, 0)}
subs := h.GetSubscriptions(context.TODO())
if len(subs) != 0 {
t.Error("Wanted 0 subs, got: ", len(subs))
}
h.SetSubscriptions(context.TODO(), []Subscription{{Subscriber: apis.HTTP("subscriber.example.com").URL()}})
subs = h.GetSubscriptions(context.TODO())
if len(subs) != 1 {
t.Error("Wanted 1 subs, got: ", len(subs))

}
h.SetSubscriptions(context.TODO(), []Subscription{{Subscriber: apis.HTTP("subscriber.example.com").URL()}, {Subscriber: apis.HTTP("subscriber2.example.com").URL()}})
subs = h.GetSubscriptions(context.TODO())
if len(subs) != 2 {
t.Error("Wanted 2 subs, got: ", len(subs))

}
h.SetSubscriptions(context.TODO(), []Subscription{{Subscriber: apis.HTTP("subscriber.example.com").URL()}, {Subscriber: apis.HTTP("subscriber3.example.com").URL()}})
subs = h.GetSubscriptions(context.TODO())
if len(subs) != 2 {
t.Error("Wanted 2 subs, got: ", len(subs))

}

}

func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
testCases := map[string]struct {
receiverFunc channel.UnbufferedMessageReceiverFunc
Expand Down Expand Up @@ -251,7 +323,7 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
t.Fatal(err)
}

h, err := NewMessageHandler(
h, err := NewFanoutMessageHandler(
logger,
channel.NewMessageDispatcher(logger),
Config{
Expand Down
2 changes: 0 additions & 2 deletions pkg/channel/multichannelfanout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"knative.dev/eventing/pkg/channel/fanout"
)

// Config for a multichannelfanout.Handler.
type Config struct {
// The configuration of each channel in this handler.
ChannelConfigs []ChannelConfig
}

Expand Down
Loading

0 comments on commit 8dca90e

Please sign in to comment.