Skip to content

Commit e02c547

Browse files
committed
模拟多个消费者且消费速度较慢得场景
1 parent 4a0f4e4 commit e02c547

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

JavaConcurrent/src/main/java/com/FlammulinaBlog/Java/Concurrent/BlockingQueue/ConsumerProducer.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.FlammulinaBlog.Java.Concurrent.BlockingQueue;
22

3+
import java.util.concurrent.ArrayBlockingQueue;
34
import java.util.concurrent.BlockingQueue;
4-
import java.util.concurrent.LinkedBlockingDeque;
5+
import java.util.concurrent.CountDownLatch;
56

67
/**
78
*
@@ -24,20 +25,29 @@ public class ConsumerProducer {
2425
static class Consumer implements Runnable {
2526
private BlockingQueue blockingQueue;
2627

27-
public Consumer(BlockingQueue queue) {
28-
this.blockingQueue = queue;
28+
private String name;
29+
30+
private CountDownLatch latch;
31+
32+
public Consumer(BlockingQueue blockingQueue, String name, CountDownLatch latch) {
33+
this.blockingQueue = blockingQueue;
34+
this.name = name;
35+
this.latch = latch;
2936
}
3037

38+
3139
@Override
3240
public void run() {
3341
while (true) {
3442
try {
43+
Thread.sleep(500L);
3544
//阻塞式消费
3645
Object val = blockingQueue.take();
37-
System.out.println("消费信息:" + val);
46+
System.out.println(name + "消费信息:" + val);
3847
} catch (InterruptedException e) {
3948
e.printStackTrace();
4049
}
50+
4151
}
4252
}
4353
}
@@ -46,8 +56,10 @@ public void run() {
4656
static class Producer implements Runnable {
4757
private final BlockingQueue blockingQueue;
4858

49-
public Producer(BlockingQueue queue) {
59+
private CountDownLatch latch;
60+
public Producer(BlockingQueue queue, CountDownLatch latch) {
5061
this.blockingQueue = queue;
62+
this.latch = latch;
5163
}
5264

5365
@Override
@@ -56,21 +68,33 @@ public void run() {
5668
try {
5769
//生产消息(阻塞)
5870
blockingQueue.put(i + ":message");
71+
5972
System.out.println("生产消息:" + i + ":message");
73+
latch.countDown();
6074
} catch (InterruptedException e) {
6175
e.printStackTrace();
6276
}
6377
}
78+
6479
}
6580
}
6681

6782

68-
public static void main(String[] args) {
69-
BlockingQueue queue = new LinkedBlockingDeque();
70-
Consumer consumer = new Consumer(queue);
71-
Producer producer = new Producer(queue);
83+
public static void main(String[] args) throws InterruptedException {
84+
85+
CountDownLatch latch = new CountDownLatch(100);
86+
87+
88+
BlockingQueue queue = new ArrayBlockingQueue(15);
89+
Producer producer = new Producer(queue, latch);
7290
new Thread(producer).start();
73-
new Thread(consumer).start();
7491

92+
93+
for (int i = 0; i < 20; i++) {
94+
Consumer consumer = new Consumer(queue, "consumer" + i, latch);
95+
new Thread(consumer).start();
96+
}
97+
latch.await();
98+
System.out.println("执行完成");
7599
}
76100
}

0 commit comments

Comments
 (0)