logo头像
书院的十三先生

Java并发编程入门(十二)生产者和消费者模式-代码模板

一、应用场景

生产者和消费者模式应用于异步处理场景,异步处理的好处是生产者和消费者解耦,不互相依赖,生产者不需要等待消费者处理完,就可以持续生产消费内容,效率大大提高。

二、代码类结构

生产者和消费者代码类结构如下:

1.BlockedQueue是一个阻塞的有界队列,用于存、取消费内容。
2.Producer是生产者,在这里是一个抽象类,子类需要实现generateTask方法。
3.Consumer是消费者,在这里是一个抽象类,子类需要实现exec方法。
4.这里的Producer和Consumer只是一个抽象后的代码模板,逻辑比较简单,落地时可根据实际需要编写合适的模板。

三、Show me code

I、BlockedQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @ClassName BlockedQueue
* @Description 阻塞任务队列,添加任务时如果已经达到容量上限,则会阻塞等待
* @Author 铿然一叶
* @Date 2019/10/5 11:32
* @Version 1.0
* javashizhan.com
**/
public class BlockedQueue<T>{

//锁
private final Lock lock = new ReentrantLock();

// 条件变量:队列不满
private final Condition notFull = lock.newCondition();

// 条件变量:队列不空
private final Condition notEmpty = lock.newCondition();

//任务集合
private Vector<T> taskQueue = new Vector<T>();

//队列容量
private final int capacity;

/**
* 构造器
* @param capacity 队列容量
*/
public BlockedQueue(int capacity) {
this.capacity = capacity;
}

/**
* 入队操作
* @param t
*/
public void enq(T t) {
lock.lock();
try {
System.out.println("size: " + taskQueue.size() + " capacity: " + capacity);
while (taskQueue.size() == this.capacity) {
// 队列满了之后等待,等待队列不满
notFull.await();
}

System.out.println(Thread.currentThread().getName() + " add task: " + t.toString());
taskQueue.add(t);

// 入队后, 通知队列不空了,可以出队
notEmpty.signal();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* 出队操作
* @return
*/
public T deq(){
lock.lock();
try {
try {
while (taskQueue.size() == 0) {
// 队列为空时等待,等待队列不空
notEmpty.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}

T t = taskQueue.remove(0);

// 出队后,通知队列不满,可以继续入队
notFull.signal();

return t;
}finally {
lock.unlock();
}
}
}

II、Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @ClassName Producer
* @Description 生产者,这个类比较简单,使用继承也省不了多少代码,可继承,也可以自行实现。
* @Author 铿然一叶
* @Date 2019/10/5 11:19
* @Version 1.0
* javashizhan.com
**/
public abstract class Producer<T> implements Runnable {

private BlockedQueue<T> taskQueue;

public Producer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}

public void run() {
while(true) {
T[] tasks = generateTask();
if (null != tasks && tasks.length > 0) {
for(T task: tasks) {
if (null != task) {
this.taskQueue.enq(task);
}
}
}
}
}

/**
* 生成任务,使用了“模板方法”设计模式,子类只要实现此方法则可。
* @return
*/
public abstract T[] generateTask();
}

III、Consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @ClassName Consumer
* @Description 消费者,这个类比较简单,使用继承也省不了多少代码,可继承,也可以自行实现。
* @Author 铿然一叶
* @Date 2019/10/5 11:10
* @Version 1.0
* javashizhan.com
**/
public abstract class Consumer<T> implements Runnable {

private BlockedQueue<T> taskQueue;

public Consumer(BlockedQueue<T> taskQueue) {
this.taskQueue = taskQueue;
}

public void run() {
while(true) {
T task = taskQueue.deq();
exec(task);
}
}

/**
* 执行任务,使用了“模板方法”设计模式,子类只要实现此方法则可
* @param task
*/
public abstract void exec(T task);
}

IV、使用代码例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @ClassName BlockedQueue
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/5 9:13
* @Version 1.0
* javashizhan.com
**/

public class LockTest {
public static void main(String[] args) {
BlockedQueue<String> taskQueue = new BlockedQueue<String>(10);

for (int i = 0; i < 3; i++) {
String producerName = "Producder-" + i;
Thread producer = new Thread(new Producer<String>(taskQueue) {
@Override
public String[] generateTask() {
String[] tasks = new String[20];
for (int i = 0; i < tasks.length; i++) {
long timestamp = System.currentTimeMillis();
tasks[i] = "Task_" + timestamp + "_" + i;
}
return tasks;
}
}, producerName);
producer.start();
}

for (int i = 0; i < 5; i++) {
String consumerName = "Consumer-" + i;
Thread consumer = new Thread(new Consumer<String>(taskQueue) {
@Override
public void exec(String task) {
System.out.println(Thread.currentThread().getName() + " do task [" + task + "]");
//休眠一会,模拟任务执行耗时
sleep(2000);
}

private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, consumerName);
consumer.start();
}
}
}

输出日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
size: 0 capacity: 10
Producder-1 add task: Task_1570250409102_0
size: 1 capacity: 10
Producder-1 add task: Task_1570250409103_1
size: 2 capacity: 10
Producder-1 add task: Task_1570250409103_2
size: 3 capacity: 10
Producder-1 add task: Task_1570250409103_3
size: 4 capacity: 10
Producder-1 add task: Task_1570250409103_4
size: 5 capacity: 10
Producder-1 add task: Task_1570250409103_5
size: 6 capacity: 10
Producder-1 add task: Task_1570250409103_6
size: 7 capacity: 10
Producder-1 add task: Task_1570250409103_7
size: 8 capacity: 10
Producder-1 add task: Task_1570250409103_8
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_9
size: 10 capacity: 10
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-0 do task [Task_1570250409102_0]
Consumer-4 do task [Task_1570250409103_1]
Consumer-3 do task [Task_1570250409103_2]
Producder-1 add task: Task_1570250409103_10
Consumer-1 do task [Task_1570250409103_3]
Producder-0 add task: Task_1570250409102_0
size: 8 capacity: 10
Producder-0 add task: Task_1570250409103_1
size: 9 capacity: 10
Producder-0 add task: Task_1570250409103_2
size: 10 capacity: 10
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_4]
Producder-0 add task: Task_1570250409103_3
size: 10 capacity: 10
Consumer-3 do task [Task_1570250409103_6]
Producder-2 add task: Task_1570250409103_0
Consumer-1 do task [Task_1570250409103_5]
size: 9 capacity: 10
Producder-2 add task: Task_1570250409103_1
size: 10 capacity: 10
Consumer-4 do task [Task_1570250409103_7]
Consumer-0 do task [Task_1570250409103_8]
Producder-1 add task: Task_1570250409103_11
size: 9 capacity: 10
Producder-1 add task: Task_1570250409103_12
size: 10 capacity: 10
Consumer-2 do task [Task_1570250409103_9]
Producder-1 add task: Task_1570250409103_13
size: 10 capacity: 10

四、其他说明

1.这里用到了Lock来加锁,Lock相比synchronized关键字加锁更灵活一些,如果有特殊需要,方便改造。
2.synchronized实现生产者和消费者模式的例子可参考“Java并发编程入门(七)轻松理解wait和notify以及使用场景”,那个代码还不够通用,你可以修改得通用一些。
3.就当前这个例子而言,使用Lock加锁和“Java并发编程入门(七)轻松理解wait和notify以及使用场景”中使用synchronized加锁没有多大区别,这里仅仅是为了体会下Lock的使用方法。
4.使用有界阻塞队列时需要注意生产者生产任务过程是否可控,如果是第三方不可控调用,当生产任务速度远远大于消费者处理任务速度时,可能由于阻塞导致长时间挂起,要么挂起时间过长,导致等待线程太多,要么超时失败。这时就不适合使用阻塞方式,应该在队列满时抛出异常以通知调用方不要再等待。

end.


站点: http://javashizhan.com/


微信公众号: