From 213acced664bea213b747a2c32e023310f5f27ad Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 29 Aug 2020 18:04:34 -0400 Subject: [PATCH] :bug: Source.Channel: Cope with pre-existing events in the channel --- pkg/source/source.go | 6 +++--- pkg/source/source_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/pkg/source/source.go b/pkg/source/source.go index b2c6b2bbc3..d670b8ce05 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -213,12 +213,14 @@ func (cs *Channel) Start( cs.DestBufferSize = defaultBufferSize } + dst := make(chan event.GenericEvent, cs.DestBufferSize) + cs.dest = append(cs.dest, dst) + cs.once.Do(func() { // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source go cs.syncLoop() }) - dst := make(chan event.GenericEvent, cs.DestBufferSize) go func() { for evt := range dst { shouldHandle := true @@ -238,8 +240,6 @@ func (cs *Channel) Start( cs.destLock.Lock() defer cs.destLock.Unlock() - cs.dest = append(cs.dest, dst) - return nil } diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 6c6dd718d3..742eae196e 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -405,6 +405,43 @@ var _ = Describe("Source", func() { close(done) }) + It("should be able to cope with events in the channel before the source is started", func(done Done) { + ch := make(chan event.GenericEvent, 1) + processed := make(chan struct{}) + evt := event.GenericEvent{} + ch <- evt + + q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") + // Add a handler to get distribution blocked + instance := &source.Channel{Source: ch} + instance.DestBufferSize = 1 + Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) + + err := instance.Start(handler.Funcs{ + CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected CreateEvent") + }, + UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected UpdateEvent") + }, + DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Fail("Unexpected DeleteEvent") + }, + GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) { + defer GinkgoRecover() + + close(processed) + }, + }, q) + Expect(err).NotTo(HaveOccurred()) + + <-processed + + close(done) + }) It("should get error if no source specified", func(done Done) { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{ /*no source specified*/ }