前言

用 synchronized 关键字实现了多线程生产者消费者模型,用 wait() 和 notify() 函数实现了进程间通信

生产者进程生产完了就通知消费者过来消费,消费者消费完了就通知生产者进行生产

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import java.util.*;

public class test {
public static void main(String[] args) {
/**
* 生产者消费者模型(主线程)
* 实现生产票和买票两个过程
*/
ticketSalePoor poor = new ticketSalePoor(10);

List<Thread> threadList = new ArrayList<>();
//生产者消费者各3个线程执行
threadList.add(new Thread(new Consume(poor),"消费者1"));
threadList.add(new Thread(new Consume(poor),"消费者2"));
threadList.add(new Thread(new Consume(poor),"消费者3"));
threadList.add(new Thread(new Product(poor),"生产厂家1"));
threadList.add(new Thread(new Product(poor),"生产厂家2"));
threadList.add(new Thread(new Product(poor),"生产厂家3"));

for (Thread thread : threadList) {
thread.start();
}

//线程池运行
// ExecutorService executorService = Executors.newFixedThreadPool(10);
// executorService.execute(new Consume(poor));
// executorService.execute(new Consume(poor));
// executorService.execute(new Consume(poor));
// executorService.execute(new Product(poor));
// executorService.execute(new Product(poor));
// executorService.execute(new Product(poor));
}
}

/**
* 售票站
*/
class ticketSalePoor{
private Stack<ticket> tickets = new Stack(); //票池
Integer max; //票池最大容量

public ticketSalePoor(Integer max) {
this.max = max;
}

/**
* 生产一张票
*/
public void addTicket (Integer num){
synchronized (tickets){
if (tickets.size() >= max){
try {
tickets.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
tickets.add(new ticket(num,Thread.currentThread().getName()+"第"+num+"张票"));
System.out.println(Thread.currentThread().getName()+"生产了一张票,此时库存中有"+tickets.size()+"张票");
tickets.notify();
}
}

/**
* 消费一张票
*/
public void sale(){
synchronized(tickets){
if (tickets.size() <= 0){
try {
tickets.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ticket step = tickets.pop();
System.out.println(Thread.currentThread().getName()+"买到了"+step.name+",此时库存中有"+tickets.size()+"张票");
tickets.notify();
}
}
}

/**
* 消费者
*/
class Consume implements Runnable{
ticketSalePoor poor; //公共售票站

public Consume(ticketSalePoor poor) {
this.poor = poor;
}

@Override
public void run() {
while (true){
poor.sale();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 生产者
*/
class Product implements Runnable{
ticketSalePoor poor; //公共售票站

public Product(ticketSalePoor poor) {
this.poor = poor;
}

@Override
public void run() {
while (true){
poor.addTicket((int) Math.random());
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 票据类
*/
class ticket {
Integer id;
String name;

public ticket(Integer id, String name) {
this.id = id;
this.name = name;
}
}