什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
如果队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,让生产者和消费者能够高效率的进行通讯呢?JDK是使用通知模式实现的。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

英文单词 synchronized

synchronized
英 ['sɪŋkrənaɪzd] 美 ['sɪŋkrənaɪzd]
adj. 同步的;同步化的
v. 使协调(synchronize的过去分词);同时发生;校准

概念

维基百科)
管程,对应的英文是 Monitor,很多 Java 领域的同学都喜欢将其翻译成“监视器”,这是直译。操作系统领域一般都翻译成“管程”,这个是意译,而我自己也更倾向于使用“管程”。所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。那管程是怎么管的呢?
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。

互斥的解决

管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。在下图中,管程 X 将共享变量 queue 这个队列(阻塞队列)和相关的操作入队 enq()、出队 deq() 都封装起来了;线程 A 和线程 B 如果想访问共享变量 queue,只能通过调用管程提供的 enq()、deq() 方法来实现;enq()、deq() 保证互斥性,只允许一个线程进入管程。不知你有没有发现,管程模型和面向对象高度契合的。估计这也是 Java 选择管程的原因吧。而我在前面章节介绍的互斥锁用法,其背后的模型其实就是它。

同步的解决

这个就比较复杂了,不过你可以借鉴一下我们曾经提到过的就医流程,它可以帮助你快速地理解这个问题。为进一步便于你理解,在下面,我展示了一幅 MESA 管程模型示意图,它详细描述了 MESA 模型的主要组成部分。在管程模型里,共享变量和对共享变量的操作是被封装起来的,图中最外层的框就代表封装的意思。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。这个过程类似就医流程的分诊,只允许一个患者就诊,其他患者都在门口等待。管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列,如下图,条件变量 A 和条件变量 B 分别都有自己的等待队列。
管程模型
那条件变量和等待队列的作用是什么呢?其实就是解决线程同步问题。

图中有个队列没画出来,就是入队出队的阻塞队列

你也可以结合上面提到的入队出队例子加深一下理解。假设有个线程 T1 执行出队(阻塞队列)操作,不过需要注意的是执行出队操作,有个前提条件,就是队列不能是空的,而队列不空这个前提条件就是管程里的条件变量。 如果线程 T1 进入管程后恰好发现队列(阻塞队列)是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程 T1 就去“队列不空”这个条件变量的等待队列中等待。这个过程类似于大夫发现你要去验个血,于是给你开了个验血的单子,你呢就去验血的队伍里排队。线程 T1 进入条件变量的等待队列后,是允许其他线程进入管程的。这和你去验血的时候,医生可以给其他患者诊治,道理都是一样的。再假设之后另外一个线程 T2 执行入队操作,入队操作执行成功之后,“队列不空”这个条件对于线程 T1 来说已经满足了,此时线程 T2 要通知 T1,告诉它需要的条件已经满足了。当线程 T1 得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。这个过程类似你验血完,回来找大夫,需要重新分诊。条件变量及其等待队列我们讲清楚了。

其实就是阻塞队列是否可以入队,出队的条件

下面再说说 wait()、notify()、notifyAll() 这三个操作。前面提到线程 T1 发现“队列不空”这个条件不满足,需要进到对应的等待队列里等待。这个过程就是通过调用 wait() 来实现的。如果我们用对象 A 代表“队列不空”这个条件,那么线程 T1 需要调用 A.wait()。同理当“队列不空”这个条件满足时,线程 T2 需要调用 A.notify() 来通知 A 等待队列中的一个线程,此时这个队列里面只有线程 T1。至于 notifyAll() 这个方法,它可以通知等待队列中的所有线程。
这里我还是来一段代码再次说明一下吧。下面的代码实现的是一个阻塞队列,阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await()。对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await()。如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列。如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列。


public class BlockedQueue<T>{
  final Lock lock =  new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull = lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =  lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满 
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

在这段示例代码中,我们用了 Java 并发包里面的 Lock 和 Condition,如果你看着吃力,也没关系,后面我们还会详细介绍,这个例子只是先让你明白条件变量及其等待队列是怎么回事。需要注意的是:await() 和前面我们提到的 wait() 语义是一样的;signal() 和前面我们提到的 notify() 语义是一样的。

  • 管程是一模型啊,或者可以理解为synchronizd关键字的内部实现
  • 管理共享内存和操作共享内存的过程,令其支持并发。并发编程领域两大难题,互斥和同步。互斥如何保证:封装共享数据及其对应操作,每个操作任意时刻只允许一个线程执行。
  1. 管程是一种概念,任何语言都可以通用。
  2. 在java中,每个加锁的对象都绑定着一个管程(监视器)
  3. 线程访问加锁对象,就是去拥有一个监视器的过程。如一个病人去门诊室看医生,医生是共享资源,门锁锁定医生,病人去看医生,就是访问医生这个共享资源,门诊室其实是监视器(管程)。
  4. 所有线程访问共享资源,都需要先拥有监视器。就像所有病人看病都需要先拥有进入门诊室的资格。
  5. 监视器至少有两个等待队列。一个是进入监视器的等待队列一个是条件变量对应的等待队列。后者可以有多个。就像一个病人进入门诊室诊断后,需要去验血,那么它需要去抽血室排队等待。另外一个病人心脏不舒服,需要去拍胸片,去拍摄室等待。
  6. 监视器要求的条件满足后,位于条件变量下等待的线程需要重新在门诊室门外排队,等待进入监视器。就像抽血的那位,抽完后,拿到了化验单,然后,重新回到门诊室等待,然后进入看病,然后退出,医生通知下一位进入。
  7. 总结起来就是,管程就是一个对象监视器。任何线程想要访问该资源,就要排队进入监控范围。进入之后,接受检查,不符合条件,则要继续等待,直到被通知,然后继续进入监视器。

    代码实现

  8. 代码演示的阻塞队列是和生产者-消费者模型所用的队列一个道理
  9. 就是线程wait后,下次再到runnable状态的时候,是接着上次运行的那行继续执行的对吧 不是重头开始运行,接着上次运行的那一行继续执行
  • 这里的队列是阻塞队列的意思,顺便简单温习下阻塞队列(拿经典的例子生产者和消费来说): 比如生产者每生产一个产品就放到一个仓库中,消费者去这个仓库消费这个产品,对应到java中就是,产品属于共享数据,仓库属于队列。当仓库(队列)满了,就会让生产者先停止生产,就是让线程先阻塞,当消费者消费一个产品后,仓库(队列)不满了,就会唤醒一个线程(生产者)去生产产品;同理,当消费者太多 出现供不应求,就是仓库的产品被消费完了,就会让线程(消费者)阻塞,直到仓库(队列)中有产品为止,再重新唤醒消费者线程去消费。 对文中管程例子的理解:条件变量a,条件变量b是不是就可以理解为仓库没有产品,不能消费,仓库满了不能生产的情况条件变量是一个实实在在的变量
  • hasen 的策略是优先执行持有资源的操作,hoare优先执行被阻塞的操作,mesa有机会两个都同时执行,但也有可能被阻塞的不被执行
  1. 阻塞队列出队和入队操作的元素是共享变量?即队列里是什么,JVM共享一个阻塞队列吗?
  2. 管程中的阻塞队列的出队和入队,与线程给锁对象加锁和释放锁的关系

作者回复:

  1. 阻塞队列内部的数组或者链表是共享变量。阻塞队列就是个普通对象,作用域是你自己控制的。
  2. 管程内部的阻塞队列,不同的管程是不同的。
  3. 加锁释放锁,可能会操作入口等待队列。
  4. 讲到管程MESA模型,假设有个线程 T1 执行出队操作,T2执行入队操作,这个队列具体指的是哪个队列,是入口等待队列,还是条件变量对应的队列啊?
    作者回复: 和这俩队列没关系,只是个应用管程的例子而已
  5. 出队入队,具体是那个队列,没看明白,文中说有两个队列 作者回复: 指的是BlockedQueue,不是管程里的等待队列

精选留言

  • MESA模型和其他两种模型相比可以实现更好的公平性,因为唤醒只是把你放到队列里而不保证你一定可以执行,最后能不能执行还是要看你自己可不可以抢得到执行权也就是入口,其他两种模型是显式地唤醒,有点内定的意思了。
    作者回复: 内定都出来了,真是理论联系生活
  • 简单来说,一个锁实际上对应两个队列,一个是就绪队列,对应本节的入口等待队列,一个是阻塞队列,实际对应本节的条件变量等待队列,wait操作是把当前线程放入条件变量的等待队列中,而notifyall是将条件变量等待队列中的所有线程唤醒到就绪队列(入口等待队列)中,实际上哪个线程执行由jvm操作
  • 在MESA模型中,线程T1被唤醒,从条件A的等待队列中(其实是一个set,list的话可能会重复)移除,并加入入口等待队列,重新与其他的线程竞争锁的控制权。那么有这样一种可能,线程T1的优先级比较低,并且经常地有高优先级的线程加入入口等待队列。每次当它获得锁的时候,条件已经不满足了(被高优先级的线程抢先破坏了条件)。即使T1可以得到调度,但是也没办法继续执行下去。最后T1被饿死了(有点冷。。。)另外我刚才的问题想通了。不需要实现像lock一样的条件对象,并调用condition.await(). Synchronized用判断条件+wait()就可以了。
  • wait() 不加超时参数,相当于得一直等着别人叫你去门口排队,加了超时参数,相当于等一段时间,再没人叫的话,我就受不了自己去门口排队了,这样就诊的机会会大一点,是这样理解吧? 挺形象,就诊机会不一定大,但是能避免没人叫的时候傻等
  • 信号量机制是可以解决同步/互斥的问题的,但是信号量的操作分散在各个进程或线程中,不方便进行管理,因每次需调用PV操作,还可能导致死锁或破坏互斥请求的问题。管程是定义了一个数据结构和能为并发所执行的一组操作,这组操作能够进行同步和改变管程中的数据。这相当于对临界资源的同步操作都集中进行管理,凡是要访问临界资源的进程或线程,都必须先通过管程,由管程的这套机制来实现多进程或线程对同一个临界资源的互斥访问和使用。管程的同步主要通过condition类型的变量(条件变量),条件变量可执行操作wait()和signal()。管程一般是由语言编译器进行封装,体现出OOP中的封装思想,也如老师所讲的,管程模型和面向对象高度契合的。作者回复: 是的,管程只是一种解决并发问题的模型而已。
  • 管程的英文就是monitor,至于synchronized底层的实现还挺复杂的,尤其是1.6优化之后。你可以参考《java并发编程的艺术》的第二章,详细描述了具体实现。
  • 想起来前段时间看的AQS,很像是MESA的具体实现哈,不知道是否理解正确,以前看到监视器模型就没有深究它的理论模型,看来还是要追根溯源,这样才能真正理解,比如看了很多并发包的源码,但是现在加上老师的理论,理解起来会豁然开朗
  • 我觉得超时参数很有必要,老师说wait()的正确姿势是:

    while(条件不满足) {
    wait();
    }
    class Minitor{
    同步队列(一个)
    条件队列(多个)
    入队
    出队
    }

    有可能条件永远不满足,一直等下去

  • 关于思考题超时,这三个模型都是判断条件然后再执行,不同的是前两个都是马上执行,而MESA模型唤醒后是进入等待队列,如果队列为空,那跟前两个一样,如果不为空,则要考虑是否会等待过长时间,这样看更多的是考虑是给业务代码多一种选择吧

标签: none

评论已关闭