LinkedBlockingQueue原理解析

LinkedBlockingQueue原理详解

简述

前面已经介绍过关于ArrayBlockingQueue相关原理性内容,我们前面讲过ArrayBlockingQueue是基于数组的方式实现的,那么LinkedBlockingQueue是基于链表的形式实现。先来看一下LinkedBlockingQueue的UML,如下所示:

image-20190407143028626

通过上面的UML可以看到,他也是BlockingQueue的实现,也就是他的核心在于Blocking(阻塞)这个上面,在讲解ArrayBlockingQueue的时候,可以清晰的得出ArrayBlockingQueue是使用了独占锁的方式,要求两个操作进行时获得当先队列的独占锁,那么take()和put()操作就不可能真正的并发。它们会彼此等待对方释放资源,在这种情况下所竞争会比较激励,从而会影响到高并发的效率,而LinkedBlockingQueue为了解决这一问题,采用锁分离的方式进行实现,take()函数和put()函数分别实现了从队列中取得数据和往队列天价收的功能,换句话说就会说take()方法有专门的锁进行控制,而put()方法也有专门的锁进行控制,由于take()方法是操作队尾,put()方法操作队首,又因为LinkedBlockingQueue是基于链表的方式实现,因此两个操作不受影响。

源码解析

首先看一下LinkedBlockingQueue中的字段信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 链表的Node节点
*/
static class Node<E> {
E item;

/**
* 下一个节点,如果节点为Null代表最后一个节点
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

/** 容量限制,如果没有指定则为Integer.MAX_VALUE */
private final int capacity;

/** 当前队列的元素个数,原子操作 */
private final AtomicInteger count = new AtomicInteger();

/**
* 头结点
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* 尾节点
* Invariant: last.next == null
*/
private transient Node<E> last;

/**take, poll的重入锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 不为空的条件 */
private final Condition notEmpty = takeLock.newCondition();

/** put, offer的重入锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 队满条件 */
private final Condition notFull = putLock.newCondition();
  1. Node节点维护链表的信息。
  2. 最大容量限制,用户可自己指定,如果没有指定则代表Integer的最大值。
  3. 包含了head头结点,tail尾节点。
  4. takeLock代表的是take,poll等出队列操作的锁。
  5. putLock代表是put,offer等入队列的操作的锁。

接下来看一下put方法是如何进行入队操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 将指定元素插入此队列的尾部, 等待队列空间可用。
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// 保持计数为负,表示失败,除非设定。
int c = -1;
Node<E> node = new Node<E>(e);
// putLock锁。
final ReentrantLock putLock = this.putLock;
// 链表长度,原子操作。
final AtomicInteger count = this.count;
// 获得锁,并且响应中断,put操作只有一个线程操作。
putLock.lockInterruptibly();
try {
// 如果链表长度等着capacity,代表队列已满,则等待队列为空。
while (count.get() == capacity) {
notFull.await();
}
// 将元素插入队列末尾。
enqueue(node);
// c为count加1前的值,这里是原子操作,它会进行CAS,因为现在是两个线程进行操作,有可能put的时候也进行take操作,所以要保证原子性。
c = count.getAndIncrement();
// 当c+1不是最大值时,通知notFull,队列未满可继续添加元素,通知其他线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c代表插入前的的值,所以队列为空的时候c=0,此时已经插入了数据所以c本来应该不为0,所以需要通知队列元素插入成功。
if (c == 0)
signalNotEmpty();
}

通过源码可以清晰得到put方法是如何进行操作的,首先获取putLock锁,获取队列的原子类型的长度,如果当前队列的长度与队列最大长度相等说明队列未满,则等待队列为空的时候插入数据,当队列未满时,可直接插入数据到队尾,c存放的事count元素加1前的值,也就是谁队列为空的时候c的长度是为0,当执行完了put方法后,实际的count为1,但是这里因为存放的是加1前的值,所有c=0,代表队列中有数据通知notEmpty可以进行take了。

enqueue方法源码很简单,就是将node节点插入到队尾,将last节点指向当前队尾。

1
2
3
4
5
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

接下来说一下take方法的源码是如何实现的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 从队头获取元素,等待队列有数据可读。
*/
public E take() throws InterruptedException {
E x;
// 本地保存变量。
int c = -1;
// 队列长度。
final AtomicInteger count = this.count;
// 获取take重入锁。
final ReentrantLock takeLock = this.takeLock;
// 获得锁,并且响应中断操作,并且只有一个线程进入take方法。
takeLock.lockInterruptibly();
try {
// 如果队列为空则等待队列不为空时进行获取操作。
while (count.get() == 0) {
notEmpty.await();
}
// 出队列操作。
x = dequeue();
// c保存减1前的值。
c = count.getAndDecrement();
// 如果队列还有元素则可通知其他线程进行take操作。
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c如果是capacity的时候代表之前队列存在过满的情况,进行take方法后则表示队列有空间可用可进行put操作,通知notFull进行put操作。
if (c == capacity)
signalNotFull();
return x;
}

通过上面的源码可以看到,take方法获取的是takeLock重入锁,并且当前线程进入到take方法后,其他线程是不允许同时进入到take方法中,首先判断队列的长度是不是为0,如果队列为0则代表队列中无数据可消费,则进行等待,等待队列中有元素时进行take后的操作,如果队列长度不为0,则进行dequeue方法,出队列操作,将head节点指向下一个节点,将当前head值返回,当c大于1时,代表还有元素可以take,通知其他线程进行take操作,c如果是capacity的时候,代表之前队列存在过满的情况,进行这次take方法后队列有空间可用,所以可以通知notFull进行put操作。

1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 帮助GC进行垃圾回收。
head = first;
E x = first.item;
first.item = null;
return x;
}

总结

  1. LinkedBlockingQueue是通过锁分离的方式进行控制,减少了take和put之间的锁竞争。
  2. LinkedBlockingQueue是通过链表的方式实现,所以进行锁分离时不会冲突,因为入队和出队分别作用于队尾和队首。
  3. 内部采用了原子操作类(CAS)进行控制链表长度。
  4. 入队后,如果之前队列为空时,会通知take方法,队列已有数据可进行take,反之,出队后,队列之前已满,则通知put方法,队列已有空闲位置可进行put操作。
文章目录
  1. 1. LinkedBlockingQueue原理详解
    1. 1.1. 简述
    2. 1.2. 源码解析
    3. 1.3. 总结
|
载入天数...载入时分秒...