-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Found some race conditions in OperatorOnBackpressureBuffer. Here is the test: zsxwing@66737b4
Because it's hard to write a test to reproduce it, I have to add Thread.sleep(...) in OperatorOnBackpressureBuffer.
We are using the following pattern in OperatorOnBackpressureBuffer:
0 Update some state
1 if (wip.getAndIncrement() == 0) {
2 while(true) {
3 // do something
4 if (some_condition_about_state is false) {
5 wip.decrementAndGet();
6 }
7 }
8 }Assume thread T1 and thread T2 runs the above codes at the same time. At first, T2 is suspended in L0 and T1 runs normally. However, when T1 reaches L5 (before decreasing wip), it's suspended. Then T2 starts to run, it will update the state, and some_condition_about_state becomes true. However, T2 finds wip is not 0, so it won't enter the while loop. T2 will exit. Now T1 is back, it doesn't know some_condition_about_state becomes true, it will directlly desceases wip and exits.
If there is no thread enters the above codes after T1 and T2, OperatorOnBackpressureBuffer will swallows the values in the queue.