@@ -93,11 +93,10 @@ var _ = Describe("Controllerworkqueue", func() {
9393 q .AddWithOpts (AddOpts {}, "foo" )
9494 q .AddWithOpts (AddOpts {}, "foo" )
9595
96- Consistently (q .Len ). Should (Equal (1 ))
96+ Expect (q .Len ()). To (Equal (1 ))
9797
98- cwq := q .(* priorityqueue [string ])
99- cwq .lockedLock .Lock ()
100- Expect (cwq .locked .Len ()).To (Equal (0 ))
98+ q .lockedLock .Lock ()
99+ Expect (q .locked .Len ()).To (Equal (0 ))
101100
102101 Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 1 }))
103102 Expect (metrics .adds ["test" ]).To (Equal (1 ))
@@ -156,22 +155,13 @@ var _ = Describe("Controllerworkqueue", func() {
156155 })
157156
158157 It ("returns an item only after after has passed" , func () {
159- q , metrics := newQueue ()
158+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
160159 defer q .ShutDown ()
161160
162- now := time .Now ().Round (time .Second )
163- nowLock := sync.Mutex {}
164- tick := make (chan time.Time )
165-
166- cwq := q .(* priorityqueue [string ])
167- cwq .now = func () time.Time {
168- nowLock .Lock ()
169- defer nowLock .Unlock ()
170- return now
171- }
172- cwq .tick = func (d time.Duration ) <- chan time.Time {
161+ originalTick := q .tick
162+ q .tick = func (d time.Duration ) <- chan time.Time {
173163 Expect (d ).To (Equal (time .Second ))
174- return tick
164+ return originalTick ( d )
175165 }
176166
177167 retrievedItem := make (chan struct {})
@@ -186,10 +176,7 @@ var _ = Describe("Controllerworkqueue", func() {
186176
187177 Consistently (retrievedItem ).ShouldNot (BeClosed ())
188178
189- nowLock .Lock ()
190- now = now .Add (time .Second )
191- nowLock .Unlock ()
192- tick <- now
179+ forwardQueueTimeBy (time .Second )
193180 Eventually (retrievedItem ).Should (BeClosed ())
194181
195182 Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 0 }))
@@ -223,20 +210,11 @@ var _ = Describe("Controllerworkqueue", func() {
223210 })
224211
225212 It ("returns multiple items with after in correct order" , func () {
226- q , metrics := newQueue ()
213+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
227214 defer q .ShutDown ()
228215
229- now := time .Now ().Round (time .Second )
230- nowLock := sync.Mutex {}
231- tick := make (chan time.Time )
232-
233- cwq := q .(* priorityqueue [string ])
234- cwq .now = func () time.Time {
235- nowLock .Lock ()
236- defer nowLock .Unlock ()
237- return now
238- }
239- cwq .tick = func (d time.Duration ) <- chan time.Time {
216+ originalTick := q .tick
217+ q .tick = func (d time.Duration ) <- chan time.Time {
240218 // What a bunch of bs. Deferring in here causes
241219 // ginkgo to deadlock, presumably because it
242220 // never returns after the defer. Not deferring
@@ -254,7 +232,7 @@ var _ = Describe("Controllerworkqueue", func() {
254232 Expect (d ).To (Or (Equal (200 * time .Millisecond ), Equal (time .Second )))
255233 }()
256234 <- done
257- return tick
235+ return originalTick ( d )
258236 }
259237
260238 retrievedItem := make (chan struct {})
@@ -276,10 +254,7 @@ var _ = Describe("Controllerworkqueue", func() {
276254
277255 Consistently (retrievedItem ).ShouldNot (BeClosed ())
278256
279- nowLock .Lock ()
280- now = now .Add (time .Second )
281- nowLock .Unlock ()
282- tick <- now
257+ forwardQueueTimeBy (time .Second )
283258 Eventually (retrievedItem ).Should (BeClosed ())
284259 Eventually (retrievedSecondItem ).Should (BeClosed ())
285260
@@ -462,21 +437,12 @@ var _ = Describe("Controllerworkqueue", func() {
462437 })
463438
464439 It ("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items" , func () {
465- q , metrics := newQueue ()
440+ q , metrics , forwardQueueTimeBy := newQueueWithTimeForwarder ()
466441 defer q .ShutDown ()
467442
468- now := time .Now ().Round (time .Second )
469- nowLock := sync.Mutex {}
470- tick := make (chan time.Time )
471-
472- cwq := q .(* priorityqueue [string ])
473- cwq .rateLimiter = workqueue .NewTypedItemExponentialFailureRateLimiter [string ](5 * time .Millisecond , 1000 * time .Second )
474- cwq .now = func () time.Time {
475- nowLock .Lock ()
476- defer nowLock .Unlock ()
477- return now
478- }
479- cwq .tick = func (d time.Duration ) <- chan time.Time {
443+ q .rateLimiter = workqueue .NewTypedItemExponentialFailureRateLimiter [string ](5 * time .Millisecond , 1000 * time .Second )
444+ originalTick := q .tick
445+ q .tick = func (d time.Duration ) <- chan time.Time {
480446 done := make (chan struct {})
481447 go func () {
482448 defer GinkgoRecover ()
@@ -485,7 +451,7 @@ var _ = Describe("Controllerworkqueue", func() {
485451 Expect (d ).To (Or (Equal (5 * time .Millisecond ), Equal (635 * time .Millisecond )))
486452 }()
487453 <- done
488- return tick
454+ return originalTick ( d )
489455 }
490456
491457 retrievedItem := make (chan struct {})
@@ -504,22 +470,16 @@ var _ = Describe("Controllerworkqueue", func() {
504470
505471 // after 7 calls, the next When("bar") call will return 640ms.
506472 for range 7 {
507- cwq .rateLimiter .When ("bar" )
473+ q .rateLimiter .When ("bar" )
508474 }
509475 q .AddWithOpts (AddOpts {RateLimited : true }, "foo" , "bar" )
510476
511477 Consistently (retrievedItem ).ShouldNot (BeClosed ())
512- nowLock .Lock ()
513- now = now .Add (5 * time .Millisecond )
514- nowLock .Unlock ()
515- tick <- now
478+ forwardQueueTimeBy (5 * time .Millisecond )
516479 Eventually (retrievedItem ).Should (BeClosed ())
517480
518481 Consistently (retrievedSecondItem ).ShouldNot (BeClosed ())
519- nowLock .Lock ()
520- now = now .Add (635 * time .Millisecond )
521- nowLock .Unlock ()
522- tick <- now
482+ forwardQueueTimeBy (635 * time .Millisecond )
523483 Eventually (retrievedSecondItem ).Should (BeClosed ())
524484
525485 Expect (metrics .depth ["test" ]).To (Equal (map [int ]int {0 : 0 }))
@@ -692,7 +652,31 @@ func TestFuzzPriorityQueue(t *testing.T) {
692652 wg .Wait ()
693653}
694654
695- func newQueue () (PriorityQueue [string ], * fakeMetricsProvider ) {
655+ func newQueueWithTimeForwarder () (_ * priorityqueue [string ], _ * fakeMetricsProvider , forwardQueueTime func (time.Duration )) {
656+ q , m := newQueue ()
657+
658+ now := time .Now ().Round (time .Second )
659+ nowLock := sync.Mutex {}
660+ tick := make (chan time.Time )
661+
662+ q .now = func () time.Time {
663+ nowLock .Lock ()
664+ defer nowLock .Unlock ()
665+ return now
666+ }
667+ q .tick = func (d time.Duration ) <- chan time.Time {
668+ return tick
669+ }
670+
671+ return q , m , func (d time.Duration ) {
672+ nowLock .Lock ()
673+ now = now .Add (d )
674+ nowLock .Unlock ()
675+ tick <- now
676+ }
677+ }
678+
679+ func newQueue () (* priorityqueue [string ], * fakeMetricsProvider ) {
696680 metrics := newFakeMetricsProvider ()
697681 q := New ("test" , func (o * Opts [string ]) {
698682 o .MetricProvider = metrics
@@ -710,7 +694,7 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
710694 }
711695 return upstreamTick (d )
712696 }
713- return q , metrics
697+ return q .( * priorityqueue [ string ]) , metrics
714698}
715699
716700type btreeInteractionValidator struct {
0 commit comments