Skip to content

Commit c70ee03

Browse files
tomekl007pedja4
authored andcommitted
BAEL-846 code for a syncrhonous queue article (eugenp#1699)
* code for the unsafe article * more descriptive example * proper eng * better test name * free memory call * java 8 style * BAEL-814 Added call to freeMemory() in off-heap test * BAEL-846 code for a syncrhonous queue article * BAEL-814 Switched from Random to ThreadLocalRandom
1 parent 2648a4e commit c70ee03

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.baeldung.synchronousqueue;
2+
3+
import org.junit.FixMethodOrder;
4+
import org.junit.Test;
5+
import org.junit.runners.MethodSorters;
6+
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import static junit.framework.TestCase.assertEquals;
11+
12+
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
13+
public class SynchronousQueueTest {
14+
15+
@Test
16+
public void givenTwoThreads_whenWantToExchangeUsingLockGuardedVariable_thenItSucceed() throws InterruptedException {
17+
//given
18+
ExecutorService executor = Executors.newFixedThreadPool(2);
19+
AtomicInteger sharedState = new AtomicInteger();
20+
CountDownLatch countDownLatch = new CountDownLatch(1);
21+
22+
Runnable producer = () -> {
23+
Integer producedElement = ThreadLocalRandom.current().nextInt();
24+
System.out.println("Saving an element: " + producedElement + " to the exchange point");
25+
sharedState.set(producedElement);
26+
countDownLatch.countDown();
27+
};
28+
29+
Runnable consumer = () -> {
30+
try {
31+
countDownLatch.await();
32+
Integer consumedElement = sharedState.get();
33+
System.out.println("consumed an element: " + consumedElement + " from the exchange point");
34+
} catch (InterruptedException ex) {
35+
ex.printStackTrace();
36+
}
37+
};
38+
39+
//when
40+
executor.execute(producer);
41+
executor.execute(consumer);
42+
43+
//then
44+
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
45+
executor.shutdown();
46+
assertEquals(countDownLatch.getCount(), 0);
47+
}
48+
49+
@Test
50+
public void givenTwoThreads_whenWantToExchangeUsingSynchronousQueue_thenItSucceed() throws InterruptedException {
51+
//given
52+
ExecutorService executor = Executors.newFixedThreadPool(2);
53+
final SynchronousQueue<Integer> queue = new SynchronousQueue<>();
54+
55+
Runnable producer = () -> {
56+
Integer producedElement = ThreadLocalRandom.current().nextInt();
57+
try {
58+
System.out.println("Saving an element: " + producedElement + " to the exchange point");
59+
queue.put(producedElement);
60+
} catch (InterruptedException ex) {
61+
ex.printStackTrace();
62+
}
63+
};
64+
65+
Runnable consumer = () -> {
66+
try {
67+
Integer consumedElement = queue.take();
68+
System.out.println("consumed an element: " + consumedElement + " from the exchange point");
69+
} catch (InterruptedException ex) {
70+
ex.printStackTrace();
71+
}
72+
};
73+
74+
//when
75+
executor.execute(producer);
76+
executor.execute(consumer);
77+
78+
//then
79+
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
80+
executor.shutdown();
81+
assertEquals(queue.size(), 0);
82+
}
83+
}

0 commit comments

Comments
 (0)