@@ -446,6 +446,50 @@ var _ = Describe("Source", func() {
446446
447447 close (done )
448448 })
449+ It ("should stop when the source channel is closed" , func () {
450+ q := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "test" )
451+ // if we didn't stop, we'd start spamming the queue with empty
452+ // messages as we "received" a zero-valued GenericEvent from
453+ // the source channel
454+
455+ By ("creating a channel with one element, then closing it" )
456+ ch := make (chan event.GenericEvent , 1 )
457+ evt := event.GenericEvent {}
458+ ch <- evt
459+ close (ch )
460+
461+ By ("feeding that channel to a channel source" )
462+ src := & source.Channel {Source : ch }
463+ Expect (inject .StopChannelInto (ctx .Done (), src )).To (BeTrue ())
464+
465+ processed := make (chan struct {})
466+ defer close (processed )
467+
468+ err := src .Start (ctx , handler.Funcs {
469+ CreateFunc : func (event.CreateEvent , workqueue.RateLimitingInterface ) {
470+ defer GinkgoRecover ()
471+ Fail ("Unexpected CreateEvent" )
472+ },
473+ UpdateFunc : func (event.UpdateEvent , workqueue.RateLimitingInterface ) {
474+ defer GinkgoRecover ()
475+ Fail ("Unexpected UpdateEvent" )
476+ },
477+ DeleteFunc : func (event.DeleteEvent , workqueue.RateLimitingInterface ) {
478+ defer GinkgoRecover ()
479+ Fail ("Unexpected DeleteEvent" )
480+ },
481+ GenericFunc : func (evt event.GenericEvent , q2 workqueue.RateLimitingInterface ) {
482+ defer GinkgoRecover ()
483+
484+ processed <- struct {}{}
485+ },
486+ }, q )
487+ Expect (err ).NotTo (HaveOccurred ())
488+
489+ By ("expecting to only get one event" )
490+ Eventually (processed ).Should (Receive ())
491+ Consistently (processed ).ShouldNot (Receive ())
492+ })
449493 It ("should get error if no source specified" , func (done Done ) {
450494 q := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "test" )
451495 instance := & source.Channel { /*no source specified*/ }
@@ -461,7 +505,6 @@ var _ = Describe("Source", func() {
461505 Expect (err ).To (Equal (fmt .Errorf ("must call InjectStop on Channel before calling Start" )))
462506 close (done )
463507 })
464-
465508 })
466509 Context ("for multi sources (handlers)" , func () {
467510 It ("should provide GenericEvents for all handlers" , func (done Done ) {
0 commit comments