[超级链接:Java并发学习系列-绪论]
[系列序章:Java并发43:并发集合系列-序章]
原文地址:http://www.importnew.com/25583.html
一、前言
前面介绍了使用CAS实现的非阻塞队列ConcurrentLinkedQueue,下面就来介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。
二、 LinkedBlockingQueue类图结构
如图LinkedBlockingQueue中:
- 也有两个Node分别用来存放首尾节点,
- 并且里面有个初始值为0的原子变量count用来记录队列元素个数,
- 另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,
- 其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,
- putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。
- 另外notEmpty和notFull用来实现入队和出队的同步。
另外由于出入队是两个非公平独占锁,所以可以同时有一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点
last = head = new Node<E>(null);
}
如图默认队列容量为0x7fffffff
,用户也可以自己指定容量。
三、必备基础
3.1 ReentrantLock
可以参考:Java并发19:Lock系列-Lock接口基本方法学习实例
3.2 条件变量(Condition)
可以参考:Java并发20:Lock系列-Condition接口基本方法学习实例
四 、带超时时间的offer操作-生产者
在队尾添加元素,
- 如果队列满了,那么等待timeout时候,如果时间超时则返回false,
- 如果在超时前队列有空余空间,则插入后返回true。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//空元素抛空指针异常
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取可被中断锁,只有一个线程克获取
putLock.lockInterruptibly();
try {
//如果队列满则进入循环
while (count.get() == capacity) {
//nanos<=0直接返回
if (nanos <= 0)
return false;
//否者调用await进行等待,超时则返回<=0(1)
nanos = notFull.awaitNanos(nanos);
}
//await在超时时间内返回则添加元素(2)
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//队列不满则激活其他等待入队线程(3)
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//c==0说明队列里面有一个元素,这时候唤醒出队线程(4)
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
五、 带超时时间的poll操作-消费者
获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//出队线程获取独占锁
takeLock.lockInterruptibly();
try {
//循环直到队列不为空
while (count.get() == 0) {
//超时直接返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出队,计数器减一
x = dequeue();
c = count.getAndDecrement();
//如果出队前队列不为空则发送信号,激活其他阻塞的出队线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//当前队列容量为最大值-1则激活入队线程。
if (c == capacity)
signalNotFull();
return x;
}
六、put操作-生产者
与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒。
七、 take操作-消费者
与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒。
八、 size操作
当前队列元素个数,如代码直接使用原子变量count获取。
public int size() {
return count.get();
}
九、peek操作
获取但是不移除当前队列的头元素,没有则返回null:
public E peek() {
//队列空,则返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
十、 remove操作
删除队列里面的一个元素,有则删除返回true,没有则返回false。
在删除操作时候由于要遍历队列所以加了双重锁,也就是在删除过程中不允许入队也不允许出队操作:
public boolean remove(Object o) {
if (o == null) return false;
//双重加锁
fullyLock();
try {
//遍历队列找则删除返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
//找不到返回false
return false;
} finally {
//解锁
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果当前队列满,删除后,也不忘记最快的唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
十一、开源框架中使用
tomcat中任务队列TaskQueue
11.1 类图结构
可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer、poll、take
方法。
十二、总结
LinkedBlockingQueue与ConcurrentLinkedQueue相比前者是阻塞队列,使用可重入独占的非公平锁来实现。
通过使用put锁和take锁使得入队和出队解耦可以同时进行处理,但是同时只有一个线程可以入队或者出队,其他线程必须等待。
另外引入了条件变量来进行入队和出队的同步,每个条件变量维护一个条件队列用来存放阻塞的线程。
LinkedBlockingQueue的size操作通过使用原子变量count获取能够比较精确的获取当前队列的元素个数。
另外remove方法使用双锁保证删除时候队列元素保持不变,另外其实这个是个生产者-消费者模型。
而ConcurrentLinkedQueue则使用CAS非阻塞算法来实现,使用CAS原子操作保证链表构建的安全性。
当多个线程并发时候CAS失败的线程不会被阻塞,而是使用cpu资源去轮询CAS直到成功。
size方法先比LinkedBlockingQueue的获取的个数是不精确的,因为获取size的时候是通过遍历队列进行的,而遍历过程中可能进行增加删除操作,remove方法操作时候也没有对整个队列加锁。
remove时候可能进行增加删除操作,这就可能删除了一个刚刚新增的元素,而不是删除的想要位置的。