什么是生产者/消费者模型

​ 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

利用wait()/notify()

  缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.LinkedList;

public class Storage {

// 仓库容量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<>();

public void produce() {
synchronized (list) {
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
list.notifyAll();
}
}

public void consume() {
synchronized (list) {
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
list.notifyAll();
}
}
}

生产者调用produce()方法生产产品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Producer implements Runnable {
private Storage storage;
public Producer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
//生产者一直生产
while(true) {
try {
Thread.sleep(1000);
storage.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

消费者调用consume()方法消耗产品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Customer implments Runnable {
private Storage storage;
public Customer(Storage storage) {
this.storage = storage;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(3000);
storage.custome();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Main {
public static void main(String[] args) {
Storage storage = new Storage();
Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread p3 = new Thread(new Producer(storage));

Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
Thread c3 = new Thread(new Consumer(storage));

p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}

  一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

await()/signal()方法

​ synchronized与wait()和nitofy()/notifyAll()方法相结合可以实现等待/通知模型,ReentrantLock同样可以,但是需要借助Condition,且Condition有更好的灵活性,具体体现在:

  1. 一个Lock里面可以创建多个Condition实例,实现多路通知

  2. notify()方法进行通知时,被通知的线程时Java虚拟机随机选择的,但是ReentrantLock结合Condition可以实现有选择性地通知,这是非常重要的

只改变Storage类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Storage {
private final int MAX_SIZE = 10;
private LinkedList<Object> list = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

public void produce() {
//获得锁
lock.lock();
try{
while(list.size() >= MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
notEmpty.await();
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
notFull.signalAll();
}
lock.unlock();
}
public void consume()
{
//获得锁
lock.lock();
try{
while(list.size() == MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库空了");
notFull.await();
}
list.remove();
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
notEmpty.signalAll();
}
lock.unlock()
}
}

​ 条件变量与用wait和notify/notifyAll实现这个模式有一个不同之处在于,Condition的signalAll方法只能唤醒在某一条件上进行阻塞的线程,比如notEmpty.signalAll()只能唤醒通过notEmpty.await()方法而阻塞的线程,在这个例子中的话也就是说生产者调用notEmpty.signalAll()只能唤醒消费者线程,而notifyAll()会唤醒所有在同一个对象锁(Object monitor)上进行等待的线程,比如说生产者和消费者都会因争夺缓冲区对象的锁而导致阻塞,同时在缓冲区满时生产者会调用wait方法进行阻塞,在缓冲区空时消费者会调用wait方法进行阻塞,一旦有线程(不管是生产者还是消费者)调用了notifyAll()方法,则所有的阻塞在同一个Object monitor上的线程都会被唤醒。也就是说Condition的signalAll()是基于某个条件变量进行唤醒,notifyAll是基于某个对象锁(Object monitor)进行唤醒。

BlockingQueue阻塞队列方法

  BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.LinkedBlockingQueue;

public class Storage {
// 仓库存储的载体
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);

public void produce() {
try{
list.put(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}

public void consume() {
try{
list.take();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费了一个产品,现库存" + list.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}

信号量

​ Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行。)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Storage {

// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 仓库的最大容量
final Semaphore notFull = new Semaphore(10);
// 将线程挂起,等待其他来触发
final Semaphore notEmpty = new Semaphore(0);
// 互斥锁
final Semaphore mutex = new Semaphore(1);

public void produce()
{
try {
notFull.acquire();
mutex.acquire();
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
}
catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
}

public void consume()
{
try {
notEmpty.acquire();
mutex.acquire();
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}
联系我

评论