C++ 并发队列的原理简介与开源库concurrentqueue安利

由于最近在做一个项目,但是框架本身有个不合理的设计。其中的代码是单线程的,数据的读取和计算都在一个线程里面完成。也就是说,我们的程序有很大的一部分时间在读取文件数据,导致最终的运行速度很慢。这里就可以使用多线程来优化。

这里需要使用最基本的生产者消费者模式。使用若干个线程作为生产者,负责数据的读取和预处理,这部分任务是IO密集型的,也就是不太占CPU,但是比较占带宽,而且有延时。在处理完数据之后,将数据放到一个队列中。

同时,使用若干个线程充当消费者,从这个队列里面获取数据,然后进行计算。计算的部分是CPU密集型的(其实我这里计算是GPU做的,就只有一个消费者),计算完成之后输出结果。

那么贯穿这一整套方案的,就是我们的队列。

在并发任务中,通常都需要一个队列机制,将并行的任务转化成串行的任务,或者将串行的任务提供给并行工作的线程。这个队列会同时被多个线程读写,因此也必须是线程安全的。

一、线程安全的实现策略

对于线程安全的队列的实现,似乎经常成为企业的面试题,常见的实现方法就是互斥量和条件变量,本质上就是锁的机制。同一时间只有一个线程具有读写的权限。锁的机制在并发量不大情况下,十分的清晰有效。在并发量较大的时候,会因为对锁的竞争而越发不高效。同时,锁本身也需要维护一定的资源,也需要消耗性能。

这时候,大家肯定会想问,不使用锁机制,还可以处理这种并发的情况吗?

答案是肯定的,首先我们知道锁主要有两种,悲观锁和乐观锁。

对于悲观锁,它永远会假定最糟糕的情况,就像我们上面说到的互斥机制,每次我们都假定会有其他的线程和我们竞争资源,因此必须要先拿到锁,之后才放心的进行我们的操作,这就使得争夺锁成为了我们每次操作的第一步。乐观锁则不同,乐观锁假定在很多情况下,资源都不需要竞争,因此可以直接进行读写,但是如果碰巧出现了多线程同时操控数据的情况,那么就多试几次,直到成功(也可以设置重试的次数)。

我们生活的时候,总会碰到很多的不顺心的事情,比如模型训练崩了,被某些库搞得头大,或者女票又生气了什么的,不妨学习一下乐观锁的精神,再训一次?再编译一次?大不了再哄一次。一次不行就两次。

回到乐观锁上,乐观锁中,每次读写都不考虑锁的存在,那么他是如何知道自己这次操作和其他线程是冲突的呢?这就是Lock-free队列的关键——原子操作。原子操作可以保证一次操作在执行的过程中不会被其他线程打断,因此在多线程程序中也不需要同步操作。在C++的STL中其实也提供了atomic这个库,可以保证多线程在操控同一个变量的时候,即使不加锁也能保证起最终结果的正确性。而我们乐观锁需要的一个原子操作就是CAS(Compare And Swap),绝大多数的CPU都支持这个操作。

CAS操作的定义如下(STL中的一个):

bool atomic_compare_exchange_weak (atomic* obj, T* expected, T val);

首先函数会将 objexpected 的内容作比较:

  1. 如果相等,那么将交换 objval 的值,并返回 true
  2. 如果不相等,则什么也不做,之后返回 false

那么使用这个奇怪的操作,为什么就可以实现乐观锁了呢?这里我们看一个例子。这也是我学习的时候看的例子。

struct list {
    std::atomic<node*> head;
};

void append(list* s, node* n)
{
    node* head;
    do {
        head = s->head;
        n->next = head;
    } while (!std::atomic_compare_exchange_weak(&(s->head), &head, n));
    // or while (!s->head.compare_exchange_weak(head, n));
}

在我们向list中插入元素的时候,首先获取到当前的头指针的值head,然后我们在写数据的时候,首先和此刻的头指针值作对比,如果相同,那么就把新的节点插入。如果不相同,说明有线程先我们一步成功了,那么我们就多尝试一次,直到写入成功。

以上就是使用CAS操作实现的乐观锁。上面的这个append就是最简单的Lock-free且线程安全的操作。

二、concurrentqueue

最近在做这个项目的时候,就被安利了一个header only的C++并发队列库 concurrentqueue。本着不重复造轮子的原则,我在项目中用了这个库,由于它只是两个头文件,特别方便的就加入到了项目中。关于这个库的特点,项目的github上写了很多。这里直接照搬下来,不做解释。

  • Knock-your-socks-off blazing fast performance.
  • Single-header implementation. Just drop it in your project.
  • Fully thread-safe lock-free queue. Use concurrently from any number of threads.
  • C++11 implementation -- elements are moved (instead of copied) where possible.
  • Templated, obviating the need to deal exclusively with pointers -- memory is managed for you.
  • No artificial limitations on element types or maximum count.
  • Memory can be allocated once up-front, or dynamically as needed.
  • Fully portable (no assembly; all is done through standard C++11 primitives).
  • Supports super-fast bulk operations.
  • Includes a low-overhead blocking version (BlockingConcurrentQueue).
  • Exception safe.

我体验了一下,感觉最舒服的有以下几点:

  1. 这个库确实可以很好的实现线程安全队列,而且速度很快。接口也比较简单。很容易上手。
  2. 整个库就是两个头文件,而且没有其他的依赖,使用C++11实现,兼容各大平台,很容易融入项目。
  3. 这个并发队列支持阻塞和非阻塞两种。(只在获取元素的时候可以阻塞)

因为这个队列的用法十分简单,这里就直接贴上官网的介绍,然后针对一些细节,补充说明一下,下面是非阻塞队列的常用接口:

  • ConcurrentQueue(size_t initialSizeEstimate) Constructor which optionally accepts an estimate of the number of elements the queue will hold
  • enqueue(T&& item) Enqueues one item, allocating extra space if necessary
  • try_enqueue(T&& item) Enqueues one item, but only if enough memory is already allocated
  • try_dequeue(T& item) Dequeues one item, returning true if an item was found or false if the queue appeared empty
  • ConcurrentQueue(size_t initialSizeEstimate) 这个没什么好说的,一个构造函数,可以指定队列的容量。
  • enqueue(T&& item) 入队操作。比较有意思的是,如果我们的队列已经满了的话,那么这个还是会把数据放到队列里,使得队列的容量变大。所以,如果希望队列的长度不变的话,尽量还是不要使用这个函数。
  • try_enqueue(T&& item) 这个也是入队操作,与上一个不同,这个函数当队列已经满了的时候,并不会进行入队操作,而是返回一个bool类型的值,表示是否入队成功。我在使用的时候,会判断这个bool值,如果是false,就让线程等待10ms之后重试。
  • try_dequeue(T& item) 这个是出队操作,如果队列有值的话,则得到数据(放到参数item里面)。他也会返回一个bool类型的值,表示时候出队成功。

阻塞版本的队列中,主要多了如下两个函数: wait_dequeue(T&& item)wait_dequeue_timed(T&& item, std::int64_t timeout_usecs)。这两个函数的功能类似,都是进行出队操作,如果队列为空,则等待。唯一的区别是,前者永久等待,而后者可以指定等待的时间,如果超时,则会停止等待并返回false。

最后是块操作,两种模式的队列都支持批量插入的操作。这里,我没有用过这些接口,所以大家自行看文档就好。 完整的API:

# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool

# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool

# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t

# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t

# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t