生产者消费者模型是线程模型中一个经典问题:生产者和消费者在同一时间内共享同一个容器,生产者向容器添加产品,消费者从容器中取走产品,当容器满时,生产者阻塞,当容器为空时,消费者阻塞

生产者消费者模型示意图

一、Code

1.1 synchronized实现

1.1.1 容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Container {
private Queue<Integer> container = new LinkedList<>();
private int containerSize = 5; //容器的大小

/**
* @Description: 生产者生产内容
* @Param: [val] 添加的内容
* @return: void
* @Author: cloudr
* @Date: 2020/11/22
*/
public synchronized void add(int val) throws InterruptedException {
if (container.size() > containerSize) {
wait(); //阻塞生产者,不让其继续生产
}
container.add(val);
notify(); //通知消费者继续消费
}

/**
* @Description: 消费者消费内容
* @Param: []
* @return: int 返回生产者所生产的
* @Author: cloudr
* @Date: 2020/11/22
*/
public synchronized int get() throws InterruptedException {
if (container.size() == 0) {
wait(); //阻塞生消费者,不让其继续消费
}
int returnRes = container.poll();
notify(); //通知生产者继续生产
return returnRes;
}
}

1.1.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Producer extends Thread{
private Container container;

public Producer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
container.add(i); //生产内容添加进容器中
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1.1.3 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Consumer extends Thread {
private Container container;

public Consumer(Container container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
int val = 0;
try {
val = container.get(); //从容器中获取产品消费
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(val);
}
}
}

1.1.4 Demo

1
2
3
4
5
6
7
8
9
public class ProducerAndConsumerDemo {
public static void main(String[] args) {
Container container = new Container();
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
}
}

1.2 BlockingQueue实现

使用BlockingQueue时,我们再也不必关心什么时候应该阻塞线程,什么时候应该唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.cc.step1.ProducerAndConsumerModel;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @author shangjing
* @date 2018/11/22 4:05 PM
* @describe32
*/
public class PCByBlockingQueue {
private static int count = 0;

private final BlockingQueue blockingQueue = new LinkedBlockingQueue(10); //容器

class Producer implements Runnable {

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(new Random().nextInt(1000)); //模拟生产者和消费者效率不一致
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
int val = new Random().nextInt(1000);
blockingQueue.put(val); //生产内容进入容器
count++;
System.out.println(Thread.currentThread().getName() + "-生产者生产" + val + ",容器中数量为:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Consumer implements Runnable {

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(new Random().nextInt(500)); //模拟生产者和消费者效率不一致
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
int val = (int) blockingQueue.take(); //消费者从容器消费产品
count--;
System.out.println(Thread.currentThread().getName() + "-消费者消费" + val + ",容器中数量为:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
PCByBlockingQueue blockingQueueTest = new PCByBlockingQueue();
new Thread(blockingQueueTest.new Producer()).start();
new Thread(blockingQueueTest.new Consumer()).start();
// 模拟多个生产者和消费者
// new Thread(blockingQueueTest.new Producer()).start();
// new Thread(blockingQueueTest.new Consumer()).start();
// new Thread(blockingQueueTest.new Producer()).start();
// new Thread(blockingQueueTest.new Consumer()).start();
}
}

参考

https://www.jianshu.com/p/f53fb95b5820