当前位置 : 首页 > 徐州软件开发技术论坛 > 无界阻塞队列LinkedBlockingQueue原理探究

无界阻塞队列LinkedBlockingQueue原理探究

作者:徐州软件公司  文章来源:徐州软件公司   阅读次数:374 次
2017-06-19

此文章是徐州软件公司整理的技术资料,由徐州软件公司总务部发布,邮箱:zongwu@xuzhousoft.com

江苏徐软信息科技有限公司江苏徐软信息科技有限公司

 

一、前言

前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面徐州软件开发公司徐州徐软信息科技有限公司就来介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。

二、 LinkedBlockingQueue类图结构

如图LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。

/** Lock held by take, poll, etc */

 

private final ReentrantLock takeLock = new ReentrantLock();

 

/** Wait queue for waiting takes */

 

private final Condition notEmpty = takeLock.newCondition();

 

/** Lock held by put, offer, etc */

 

private final ReentrantLock putLock = new ReentrantLock();

 

/** Wait queue for waiting puts */

 

private final Condition notFull = putLock.newCondition();

/* Current number of elements /

private final AtomicInteger count = new AtomicInteger(0);

 

public static final int   MAX_VALUE = 0x7fffffff;

 

public LinkedBlockingQueue() {

 

this(Integer.MAX_VALUE);

 

}

 

public LinkedBlockingQueue(int capacity) {

 

if (capacity <= 0) throw new IllegalArgumentException();

 

this.capacity = capacity;

 

//初始化首尾节点

 

last = head = new Node<E>(null);

 

}

如图默认队列容量为0x7fffffff;用户也可以自己指定容量。

三、必备基础

3.1 ReentrantLock

可以参考 https://www.atatech.org/articles/80539?flag_data_from=active

3.2 条件变量(Condition

条件变量这里使用的是takeLock.newCondition()获取也就是说调用ReentrantLock的方法获取的,那么可预见Condition使用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下

 

public final long awaitNanos(long nanosTimeout)

 

throws InterruptedException {

 

//如果中断标志被设置了,则抛异常

 

if (Thread.interrupted())

 

throw new InterruptedException();

 

//添加当前线程节点到条件队列,

 

Node node = addConditionWaiter();

 

//当前线程释放独占锁

 

int savedState = fullyRelease(node);

 

long lastTime = System.nanoTime();

 

int interruptMode = 0;

 

while (!isOnSyncQueue(node)) {

 

if (nanosTimeout <= 0L) {

 

transferAfterCancelledWait(node);

 

break;

 

}

 

//挂起当前线程直到超时

 

LockSupport.parkNanos(this, nanosTimeout);

 

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

 

break;

 

long now = System.nanoTime();

 

nanosTimeout -= now – lastTime;

 

lastTime = now;

 

}

 

//unpark后,当前线程重新获取锁,有可能获取不到被放到AQS的队列

 

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

 

interruptMode = REINTERRUPT;

 

if (node.nextWaiter != null)

 

unlinkCancelledWaiters();

 

if (interruptMode != 0)

 

reportInterruptAfterWait(interruptMode);

 

return nanosTimeout – (System.nanoTime() – lastTime);

 

}

 

final int fullyRelease(Node node) {

 

boolean failed = true;

 

try {

 

int savedState = getState();

 

//释放锁,如果失败则抛异常

 

if (release(savedState)) {

 

failed = false;

 

return savedState;

 

} else {

 

throw new IllegalMonitorStateException();

 

}

 

} finally {

 

if (failed)

 

node.waitStatus = Node.CANCELLED;

 

}

 

}

首先如果当前线程中断标志被设置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件队列。

然后尝试释放当前线程拥有的锁并保存当前计数,可知如果当前线程调用awaitNano前没有使用当前条件变量所在的Reetenlock变量调用lock或者lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。

然后调用park挂起当前线程直到超时或者其他线程调用了当前线程的unpark方法,或者调用了当前线程的interupt方法(这时候会抛异常)。

如果超时或者其他线程调用了当前线程的unpark方法,则当前线程从挂起变为激活,获取cpu资源后会继续执行,会重新获取锁。

 

public final void signal() {

 

//如果当前线程没有持有锁,抛异常

 

if (!isHeldExclusively())

 

throw new IllegalMonitorStateException();

 

//从条件队列找第一个状态为CONDITION的,然后把状态变为0

 

Node first = firstWaiter;

 

if (first != null)

 

doSignal(first);

 

}

 

private void doSignal(Node first) {

 

do {

 

if ( (firstWaiter = first.nextWaiter) == null)

 

lastWaiter = null;

 

first.nextWaiter = null;

 

} while (!transferForSignal(first) &&

 

(first = firstWaiter) != null);

 

}

 

final boolean transferForSignal(Node node) {

 

 

 

//状态为CONDITION的,然后把状态变为0

 

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

 

return false;

 

 

 

//把条件队列的上面状态为0的节点放入AQS阻塞队列

 

Node p = enq(node);

 

int ws = p.waitStatus;

 

//调用unpark激活挂起的线程

 

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

 

LockSupport.unpark(node.thread);

 

return true;

 

}

首先看调用signal的线程是不是持有了独占锁,没有则抛出异常。

然后获取在条件队列里面待的时间最长的node,把它移动到线程持有的锁所在的AQS队列。

其中enq方法就是把当前节点放入了AQS队列,但是这时候该节点还是在条件队列里面那,那么什么时候从条件队列移除那?其实在await里面的unlinkCancelledWaiters方法。

总结: 无论是条件变量的await和singal都是需要先获取独占锁才能调用,因为条件变量使用的就是独占锁里面的state管理状态,否者会报异常。

、带超时时间的offer操作-生产者

在队尾添加元素,如果队列满了,那么等待timeout时候,如果时间超时则返回false,如果在超时前队列有空余空间,则插入后返回true。

 

public boolean offer(E e, long timeout, TimeUnit unit)

 

throws InterruptedException {

 

//空元素抛空指针异常

 

if (e == null) throw new NullPointerException();

 

long nanos = unit.toNanos(timeout);

 

int c = -1;

 

final ReentrantLock putLock = this.putLock;

 

final AtomicInteger count = this.count;

 

//获取可被中断锁,只有一个线程克获取

 

putLock.lockInterruptibly();

 

try {

 

//如果队列满则进入循环

 

while (count.get() == capacity) {

 

//nanos<=0直接返回

 

if (nanos <= 0)

 

return false;

 

//否者调用await进行等待,超时则返回<=0(1)

 

nanos = notFull.awaitNanos(nanos);

 

}

 

//await在超时时间内返回则添加元素(2)

 

enqueue(new Node<E>(e));

 

c = count.getAndIncrement();

 

//队列不满则激活其他等待入队线程(3)

 

if (c + 1 < capacity)

 

notFull.signal();

 

} finally {

 

//释放锁

 

putLock.unlock();

 

}

 

//c==0说明队列里面有一个元素,这时候唤醒出队线程(4)

 

if (c == 0)

 

signalNotEmpty();

 

return true;

 

}

 

private void enqueue(Node<E> node) {

 

last = last.next = node;

 

}

 

private void signalNotEmpty() {

 

final ReentrantLock takeLock = this.takeLock;

 

takeLock.lock();

 

try {

 

notEmpty.signal();

 

} finally {

 

takeLock.unlock();

 

}

 

}

如果获取锁前面有线程调用了putLock. interrupt(),并且后面没有调用interrupted()重置中断标志,调用lockInterruptibly时候会抛出InterruptedException异常。

队列满的时候调用notFull.awaitNanos阻塞当前线程,当前线程会释放获取的锁,然后等待超时或者其他线程调用了notFull.signal()才会返回并重新获取锁,或者其他线程调用了该线程的interrupt方法设置了中断标志,这时候也会返回但是会抛出InterruptedException异常。

如果超时则直接返回false,如果超时前调用了notFull.signal()则会退出循环,执行(2)添加元素到队列,然后执行(3),(3)的目的是为了激活其他入队等待线程。(4)的话c==0说明队列里面已经有一个元素了,这时候就可以激活等待出队线程了。

另外signalNotEmpty函数是先获取独占锁,然后在调用的signal这也证明了3.2节的结论。

五、 带超时时间的poll操作-消费者

获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null

 

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

 

E x = null;

 

int c = -1;

 

long nanos = unit.toNanos(timeout);

 

final AtomicInteger count = this.count;

 

final ReentrantLock takeLock = this.takeLock;

 

//出队线程获取独占锁

 

takeLock.lockInterruptibly();

 

try {

 

//循环直到队列不为空

 

while (count.get() == 0) {

 

//超时直接返回null

 

if (nanos <= 0)

 

return null;

 

nanos = notEmpty.awaitNanos(nanos);

 

}

 

//出队,计数器减一

 

x = dequeue();

 

c = count.getAndDecrement();

 

//如果出队前队列不为空则发送信号,激活其他阻塞的出队线程

 

if (c > 1)

 

notEmpty.signal();

 

} finally {

 

//释放锁

 

takeLock.unlock();

 

}

 

//当前队列容量为最大值-1则激活入队线程。

 

if (c == capacity)

 

signalNotFull();

 

return x;

 

}

首先获取独占锁,然后进入循环当当前队列有元素才会退出循环,或者超时了,直接返回null。

超时前退出循环后,就从队列移除元素,然后计数器减去一,如果减去1前队列元素大于1则说明当前移除后队列还有元素,那么就发信号激活其他可能阻塞到当前条件信号的线程。

最后如果减去1前队列元素个数=最大值,那么移除一个后会腾出一个空间来,这时候可以激活可能存在的入队阻塞线程。

六、put操作-生产者

与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒。

七、 take操作-消费者

与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒。

八、 size操作

当前队列元素个数,如代码直接使用原子变量count获取。

 

public int size() {

 

return count.get();

 

}

九、peek操作

获取但是不移除当前队列的头元素,没有则返回null

 

public E peek() {

 

//队列空,则返回null

 

if (count.get() == 0)

 

return null;

 

final ReentrantLock takeLock = this.takeLock;

 

takeLock.lock();

 

try {

 

Node<E> first = head.next;

 

if (first == null)

 

return null;

 

else

 

return first.item;

 

} finally {

 

takeLock.unlock();

 

}

 

}

十、 remove操作

删除队列里面的一个元素,有则删除返回true,没有则返回false,在删除操作时候由于要遍历队列所以加了双重锁,也就是在删除过程中不允许入队也不允许出队操作

 

public boolean remove(Object o) {

 

if (o == null) return false;

 

//双重加锁

 

fullyLock();

 

try {

 

//遍历队列找则删除返回true

 

for (Node<E> trail = head, p = trail.next;

 

p != null;

 

trail = p, p = p.next) {

 

if (o.equals(p.item)) {

 

unlink(p, trail);

 

return true;

 

}

 

}

 

//找不到返回false

 

return false;

 

} finally {

 

//解锁

 

fullyUnlock();

 

}

 

}

 

void fullyLock() {

 

putLock.lock();

 

takeLock.lock();

 

}

 

void fullyUnlock() {

 

takeLock.unlock();

 

putLock.unlock();

 

}

 

void unlink(Node<E> p, Node<E> trail) {

 

 

 

p.item = null;

 

trail.next = p.next;

 

if (last == p)

 

last = trail;

 

//如果当前队列满,删除后,也不忘记最快的唤醒等待的线程

 

if (count.getAndDecrement() == capacity)

 

notFull.signal();

 

}

十一、开源框架中使用

首先线程池Executors的newFixedThreadPool和newSingleThreadExecutor的工作队列就是使用的这个。

然后tomcat中任务队列TaskQueue是继承并扩展了的,下面看看TaskQueue

11.1 类图结构

可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer,poll,take方法。

11.2 TaskQueue

tomcat中有个线程池ThreadPoolExecutor,在NIOEndPoint中当acceptor线程接受到请求后,会把任务放入队列,然后poller 线程从队列里面获取任务,然后就吧任务放入线程池执行。这个ThreadPoolExecutor中的的一个参数就是TaskQueue。

先看看ThreadPoolExecutor的参数如果是普通LinkedBlockingQueue是怎么样的执行逻辑:

当调用线程池方法 execute() 方法添加一个任务时:

 

public void execute(Runnable command) {

 

if (command == null)

 

throw new NullPointerException();

 

 

 

//当前工作线程个数小于core个数则开新线程执行(1)

 

int c = ctl.get();

 

if (workerCountOf(c) < corePoolSize) {

 

if (addWorker(command, true))

 

return;

 

c = ctl.get();

 

}

 

//放入队列(2)

 

if (isRunning(c) && workQueue.offer(command)) {

 

int recheck = ctl.get();

 

if (! isRunning(recheck) && remove(command))

 

reject(command);

 

else if (workerCountOf(recheck) == 0)

 

addWorker(null, false);

 

}

 

//如果队列满了则开新线程,但是个数要不超过最大值,超过则返回false

 

//然后执行reject handler(3)

 

else if (!addWorker(command, false))

 

reject(command);

 

}

可知当当前工作线程个数为corePoolSize后,如果在来任务会把任务添加到队列,队列满了或者入队失败了则开启新线程。

然后看看TaskQueue中重写的offer方法的逻辑:

 

public boolean offer(Runnable o) {

 

// 如果parent为null则直接调用父类方法

 

if (parent==null) return super.offer(o);

 

//如果当前线程池中线程个数达到最大,则无条件调用父类方法

 

if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);

 

//如果当前提交的任务小于当前线程池线程数,说明线程用不完,没必要重新开线程

 

if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);

 

//如果当前线程池线程个数>core个数但是小于最大个数,则开新线程代替放入队列

 

if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;

 

//到了这里,无条件调用父类

 

return super.offer(o);

 

}

可知parent.getPoolSize()<parent.getMaximumPoolSize()普通队列会把当前任务放入队列,TAskQueue则是返回false,因为这会开启新线程执行任务,当然前提是当前线程个数没有达到最大值。

然后看下Worker线程中如果从队列里面获取任务执行的:

 

final void runWorker(Worker w) {

 

 

try {

 

while (task != null || (task = getTask()) != null) {

 

 

}

 

completedAbruptly = false;

 

} finally {

 

 

}

 

}

 

private Runnable getTask() {

 

boolean timedOut = false; // Did the last poll() time out?

 

for (;;) {

 

int c = ctl.get();

 

int rs = runStateOf(c);

 

 

int wc = workerCountOf(c);

 

 

try {

 

//根据timed决定调用poll还是take

 

Runnable r = timed ?

 

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

 

workQueue.take();

 

if (r != null)

 

return r;

 

timedOut = true;

 

} catch (InterruptedException retry) {

 

timedOut = false;

 

}

 

}

 

}

十二、总结

12.1 并发安全总结

仔细思考下阻塞队列是如何实现并发安全的维护队列链表的,先分析下简单的情况就是当队列里面有多个元素时候,由于同时只有一个线程(通过独占锁putLock实现)入队元素并且是操作last节点(,而同时只有一个出队线程(通过独占锁takeLock实现)操作head节点,所以不存在并发安全问题。

 

江苏徐软信息科技有限公司(简称徐州软件公司)是徐州软件公司中成立时间最长、技术能力最强、经济实力最雄厚的徐州软件开发公司之一,专业的徐州软件开发团队,从事徐州软件开发10年,一直保持着徐州软件开发行业排头兵的地位。徐州软件公司徐州软件开发行业内的众多徐州软件开发公司保持着良好的合作关系,是徐州软件开发行业的领航者之一。徐州软件公司立足徐州软件开发市场,主攻徐州软件开发徐州APP开发徐州软件公司徐州ERP软件开发徐州OA软件开发徐州CRM软件开发等领域拥有大量经典案例。更多信息请访问徐州软件公司官方网站:

徐软com:http://www.xuzhousoft.com  徐软cn:http://www.xuzhousoft.com.cn
徐软app:http://app.xuzhousoft.com  淮北徐软:http://huaibei.xuzhousoft.com.cn
济宁徐软:http://jining.xuzhousoft.com.cn  亳州徐软:http://bozhou.xuzhousoft.com.cn
菏泽徐软:http://heze.xuzhousoft.com.cn  宿州徐软:http://suzhou.xuzhousoft.com.cn
枣庄徐软:http://zaozhuang.xuzhousoft.com.cn  宿迁徐软:http://suqian.xuzhousoft.com.cn
商丘徐软:http://shangqiu.xuzhousoft.com.cn  连云港徐软:http://lianyungang.xuzhousoft.com.cn
莱芜徐软:http://laiwu.xuzhousoft.com.cn  泰安徐软:http://taian.xuzhousoft.com.cn
日照徐软:http://rizhao.xuzhousoft.com.cn  开封徐软:http://kaifeng.xuzhousoft.com.cn
周口徐软:http://zhoukou.xuzhousoft.com.cn  盐城徐软:http://yancheng.xuzhousoft.com.cn
淮安徐软:http://huaian.xuzhousoft.com.cn  阜阳徐软:http://fuyang.xuzhousoft.com.cn
蚌埠徐软:http://bengbu.xuzhousoft.com.cn  临沂徐软:http://linyi.xuzhousoft.com.cn
邳州徐软:http://pizhou.xuzhousoft.com.cn  新沂徐软:http://xinyi.xuzhousoft.com.cn
沛县徐软:http://peixian.xuzhousoft.com.cn  睢宁徐软:http://suining.xuzhousoft.com.cn
丰县徐软:http://fengxian.xuzhousoft.com.cn  萧县徐软:http://xiaoxian.xuzhousoft.com.cn
砀山徐软:http://dangshan.xuzhousoft.com.cn  微山徐软:http://weishan.xuzhousoft.com.cn
永城徐软:http://yongcheng.xuzhousoft.com.cn  网络营销:http://www.f168yingxiao.com
徐州系统集成公司:http://www.0516app.com

关键字标签:徐州软件公司 徐州软件开发公司 徐州APP软件开发公司 徐州ERP软件开发公司 徐州CRM软件开发公司 徐州OA软件开发公司

下载DOC版 下载PDF版

* 以上内容由 徐州软件公司 整理


关于我们

    江苏徐软信息科技有限公司(简称徐州软件)位于国家大学科技园内,成立于2005年,注册资金1000万元,是徐州地区最具实力的集软件开发、电子商务技术服务、门户网站建设、系统集成、网络工程为一体的高科技IT技术公司之一。

技术支持

  • 售后服务电话:0516-83003411
  • 售后服务QQ:412110939
  • 售后服务邮箱:
    service@xuzhousoft.com
  • 售后投诉电话:18795428064
徐州软件公司
    扫描微信二维码即可获得
    免费信息化咨询服务

Copyright© 2005 江苏徐软信息科技有限公司 All Rights Reserved.
苏公网安备 32030302000144号  苏ICP备11059116号-5

地址:江苏省徐州市云龙区和平路57号江苏师范大学科技园4F  徐州软件公司
电话:0516-83737996 邮箱:sales@xuzhousoft.com

江苏徐软信息科技有限公司地图
江苏徐软信息科技有限公司地图
点这里关闭本窗口
×