Jiang's blog

数据结构学习笔记---队列

Word count: 2.5kReading time: 10 min
2020/03/27 Share

学习自极客时间的《数据结构与算法之美》 作者:王争

队列

1. 什么是队列?

先进者先出,这就是典型的“队列”。
队列跟栈非常相似,支持的操作也很有限,最基本的操作也是两个:入队 enqueue(),放一个数据到队列尾部;出队 dequeue(),从队列头部取一个元素。
所以,队列跟栈一样,也是一种操作受限的线性表数据结构

作为一种非常基础的数据结构,队列的应用也非常广泛,特别是一些具有某些额外特性的队列,比如循环队列、阻塞队列、并发队列。它们在很多偏底层系统、框架、中间件的开发中,起着关键性的作用。比如高性能队列 Disruptor、Linux 环形缓存,都用到了循环并发队列;Java concurrent 并发包利用 ArrayBlockingQueue 来实现公平锁等。

2. 顺序队列

跟栈一样,队列可以用数组来实现,也可以用链表来实现。用数组实现的栈叫作顺序栈,用链表实现的栈叫作链式栈。同样,用数组实现的队列叫作顺序队列,用链表实现的队列叫作链式队列。

2.1 顺序队列

数组实现队列用的是数组双指针的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from typing import Optional

class ArrayQueue:

def __init__(self, capacity: int):
self._items = []
# 申请一个大小为capacity的数组
self._capacity = capacity
# head表示队头下标,tail表示队尾下标
self._head = 0
self._tail = 0

def enqueue(self, item: str) -> bool:
if self._tail == self._capacity:
return False
self._items.insert(self._tail, item)
self._tail += 1
return True

def dequeue(self) -> Optional[str]:
if self._head != self._tail:
item = self._items[self._head]
self._head += 1
return item
else:
return None

def __repr__(self) -> str:
return " ".join(item for item in self._items[self._head : self._tail])

队列需要两个指针:一个是 head 指针,指向队头;一个是 tail 指针,指向队尾。

  • 当 a、b、c、d 依次入队之后,队列中的 head 指针指向下标为 0 的位置,tail 指针指向下标为 4 的位置。
    GiNSbR.png

  • 当我们调用两次出队操作之后,队列中 head 指针指向下标为 2 的位置,tail 指针仍然指向下标为 4 的位置。
    GiNExe.png

随着不停地进行入队、出队操作,head 和 tail 都会持续往后移动。当 tail 移动到最右边,即使数组中还有空闲空间,也无法继续往队列中添加数据了。在数组那里也可以提到,可以用数据搬移。但是,每次进行出队操作都相当于删除数组下标为 0 的数据,要搬移整个队列中的数据,这样出队操作的时间复杂度就会从原来的 O(1) 变为 O(n)。

实际上,我们在出队时可以不用搬移数据。如果没有空闲空间了,我们只需要在入队时,再集中触发一次数据的搬移操作。借助这个思想,出队函数 dequeue() 保持不变,我们稍加改造一下入队函数 enqueue() 的实现,就可以轻松解决刚才的问题了。下面是具体的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def enqueue(self, item: str) -> bool:
if self._tail == self._capacity:
if self._head == 0:
return False
else:
# 数据搬移
for i in range(0, self._tail - self._head):
self._items[i] = self._items[i + self._head]
# 搬移完之后重新更新head和tail
self._tail = self._tail - self._head
self._head = 0

self._items.insert(self._tail, item)
self._tail += 1
return True

当队列的 tail 指针移动到数组的最右边后,如果有新的数据入队,我们可以将 head 到 tail 之间的数据,整体搬移到数组中 0 到 tail-head 的位置。

GiaLPP.png

完整代码可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Optional

class DynamicArrayQueue:

def __init__(self, capacity: int):
self._items = []
self._capacity = capacity
self._head = 0
self._tail = 0

def enqueue(self, item: str) -> bool:
if self._tail == self._capacity:
if self._head == 0: return False

self._items[0 : self._tail - self._head] = self._items[self._head : self._tail]
self._tail -= self._head
self._head = 0

if self._tail == len(self._items):
self._items.append(item)
else:
self._items[self._tail] = item
self._tail += 1
return True

def dequeue(self) -> Optional[str]:
if self._head != self._tail:
item = self._items[self._head]
self._head += 1
return item

def __repr__(self) -> str:
return " ".join(item for item in self._items[self._head:self._tail])

3. 链式队列

基于链表的实现,我们同样需要两个指针:head 指针和 tail 指针。它们分别指向链表的第一个结点和最后一个结点。如图所示,入队时,tail->next= new_node, tail = tail->next;出队时,head = head->next。

Giympn.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from typing import Optional

class Node:

def __init__(self, data: str, next=None):
self.data = data
self._next = next

class LinkedQueue:

def __init__(self):
self._head: Optional[Node] = None
self._tail: Optional[Node] = None

def enqueue(self, value: str):
new_node = Node(value)
if self._tail:
self._tail._next = new_node
else:
self._head = new_node
self._tail = new_node

def dequeue(self) -> Optional[str]:
if self._head:
value = self._head.data
self._head = self._head._next
if not self._head:
self._tail = None
return value

def __repr__(self) -> str:
values = []
current = self._head
while current:
values.append(current.data)
current = current._next
return "->".join(value for value in values)

4. 循环队列

要想实现一个循环队列,最关键的是,确定好队空和队满的判定条件。
循环队列为空的判断条件仍然是 head == tail。但队列满的判断条件就稍微有点复杂了。当队满时,(tail+1)%n=head。

GicWTS.png

当队列满时,图中的 tail 指向的位置实际上是没有存储数据的。所以,循环队列会浪费一个数组的存储空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from typing import Optional
from itertools import chain

class CircularQueue:

def __init__(self, capacity):
self._items = []
self._capacity = capacity + 1
self._head = 0
self._tail = 0

def enqueue(self, item: str) -> bool:
if (self._tail + 1) % self._capacity == self._head:
return False

self._items.append(item)
self._tail = (self._tail + 1) % self._capacity
return True

def dequeue(self) -> Optional[str]:
if self._head != self._tail:
item = self._items[self._head]
self._head = (self._head + 1) % self._capacity
return item

def __repr__(self) -> str:
if self._tail >= self._head:
return " ".join(item for item in self._items[self._head : self._tail])
else:
return " ".join(item for item in chain(self._items[self._head:], self._items[:self._tail]))

5. 阻塞队列

阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。

GigT4e.png

“生产者 - 消费者模型”就是利用的阻塞队列。这种基于阻塞队列实现的“生产者 - 消费者模型”,可以有效地协调生产和消费的速度。当“生产者”生产数据的速度过快,“消费者”来不及消费时,存储数据的队列很快就会满了。这个时候,生产者就阻塞等待,直到“消费者”消费了数据,“生产者”才会被唤醒继续“生产”。

基于阻塞队列,我们还可以通过协调“生产者”和“消费者”的个数,来提高数据的处理效率。比如前面的例子,我们可以多配置几个“消费者”,来应对一个“生产者”。

6. 并发队列

线程安全的队列我们叫作并发队列。最简单直接的实现方式是直接在 enqueue()、dequeue() 方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。实际上,基于数组的循环队列,利用 CAS 原子操作,可以实现非常高效的并发队列。这也是循环队列比链式队列应用更加广泛的原因。在实战篇讲 Disruptor 的时候,我会再详细讲并发队列的应用。

问题思考

一. 当我们向固定大小的线程池中请求一个线程时,如果线程池中没有空闲资源了,这个时候线程池如何处理这个请求?是拒绝请求还是排队请求?各种处理策略又是怎么实现的呢?

我们一般有两种处理策略。第一种是非阻塞的处理方式,直接拒绝任务请求;另一种是阻塞的处理方式,将请求排队,等到有空闲线程时,取出排队的请求继续处理。那如何存储排队的请求呢?

我们希望公平地处理每个排队的请求,先进者先服务,所以队列这种数据结构很适合来存储排队请求。我们前面说过,队列有基于链表和基于数组这两种实现方式。这两种实现方式对于排队请求又有什么区别呢?

  • 基于链表的实现方式,可以实现一个支持无限排队的无界队列(unbounded queue),但是可能会导致过多的请求排队等待,请求处理的响应时间过长。所以,针对响应时间比较敏感的系统,基于链表实现的无限排队的线程池是不合适的。
  • 而基于数组实现的有界队列(bounded queue),队列的大小有限,所以线程池中排队的请求超过队列大小时,接下来的请求就会被拒绝,这种方式对响应时间敏感的系统来说,就相对更加合理。不过,设置一个合理的队列大小,也是非常有讲究的。队列太大导致等待的请求太多,队列太小会导致无法充分利用系统资源、发挥最大性能。

二. 除了线程池这种池结构会用到队列排队请求,你还知道有哪些类似的池结构或者场景中会用到队列的排队请求呢?
分布式应用中的消息队列,也是一种队列结构。

三. 如何实现无锁并发队列?
可以使用 cas + 数组的方式实现。在入队前,获取tail位置,入队时比较tail是否发生变化,如果否,则允许入队,反之,本次入队失败。出队则是获取head位置,进行cas。

CATALOG
  1. 1. 队列
    1. 1.1. 1. 什么是队列?
    2. 1.2. 2. 顺序队列
      1. 1.2.1. 2.1 顺序队列
    3. 1.3. 3. 链式队列
    4. 1.4. 4. 循环队列
    5. 1.5. 5. 阻塞队列
    6. 1.6. 6. 并发队列
    7. 1.7. 问题思考