Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source.Channel: Cope with pre-existing events in the channel #1146

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,14 @@ func (cs *Channel) Start(
cs.DestBufferSize = defaultBufferSize
}

dst := make(chan event.GenericEvent, cs.DestBufferSize)
cs.dest = append(cs.dest, dst)

cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop()
})

dst := make(chan event.GenericEvent, cs.DestBufferSize)
go func() {
for evt := range dst {
shouldHandle := true
Expand All @@ -238,8 +240,6 @@ func (cs *Channel) Start(
cs.destLock.Lock()
defer cs.destLock.Unlock()

cs.dest = append(cs.dest, dst)

return nil
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,43 @@ var _ = Describe("Source", func() {

close(done)
})
It("should be able to cope with events in the channel before the source is started", func(done Done) {
ch := make(chan event.GenericEvent, 1)
processed := make(chan struct{})
evt := event.GenericEvent{}
ch <- evt

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
// Add a handler to get distribution blocked
instance := &source.Channel{Source: ch}
instance.DestBufferSize = 1
Expect(inject.StopChannelInto(stop, instance)).To(BeTrue())

err := instance.Start(handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected CreateEvent")
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected UpdateEvent")
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()

close(processed)
},
}, q)
Expect(err).NotTo(HaveOccurred())

<-processed

close(done)
})
It("should get error if no source specified", func(done Done) {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Channel{ /*no source specified*/ }
Expand Down