NOTE:本文还未完成!

本文及下一篇文章将主要介绍一些不基于锁的线程安全的c++数据结构的实现,c++中无锁的数据结构主要基于原子操作和内存序实现。本文将主要介绍无锁栈的实现。本文会使用到一些c++17或c++20的新特性。

参考书目:《c++并发编程实战》

完整代码在GitHub:Github

基于锁的数据结构的实现:C++ concurrent data structures based on lock

建议先看:C++ Lock-free thread-safe data structures(1)- Atomic operations and memory ordering

前言

  无锁的数据结构主要基于原子操作与内存序实现。还是从自旋锁这个例子看起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class spinlock_mutex{
std::atomic_flag flag;

public:
spinlock_mutex(): flag(ATOMIC_FLAG_INIT) {}

void lock(){
while(flag.test_and_set(std::memeory_order_acquire));
}

void unlock(){
flag.clear(std::memeory_order_release);
}
};

这段代码没有调用任何阻塞函数,lock()只是让循环持续调用test_and_set(),并返回false。代码“自旋”于循环当中。所以没有阻塞调用。

如何判断一个数据结构是否无锁(无锁数据结构的分类):
无阻碍 —— 如果其他线程都暂停了,任何给定的线程都将在一定时间内完成操作。
无锁 —— 如果多个线程对一个数据结构进行操作,经过一定时间后,其中一个线程将完成其操作。
无等待 —— 即使有其他线程也在对该数据结构进行操作,每个线程都将在一定的时间内完成操作。

NOTE:大多数情况下无阻塞算法用的很少——其他线程都暂停的情况太少见了,因此这种方式用于描述一个失败的无锁实现更为合适。

无锁数据结构

  定义:无锁数据结构的核心特性是保证在有限的步骤内,至少有一个线程能够完成操作,而不必等待其他线程完成其操作。这种设计使得即使某些线程被阻塞或挂起,其他线程仍然可以继续执行,从而提高了系统的并发性和响应能力。

为什么使用无锁的数据结构:
最大化并发:使用基于锁的容器,会让线程阻塞或等待,并且互斥锁削弱了结构的并发性。无锁数据结构中,某些线程可以逐步执行。无等待数据结构中,每一个线程都可以独自向前运行,这种理想的方式实现起来很难。结构太简单,反而不容易实现。
鲁棒性:当一个线程在持有锁时被终止,那么数据结构将会永久性的破坏。但是,当线程在无锁数据结构上执行操作,在执行到一半终止时,数据结构上的数据不会丢失(除了线程本身的数据),其他线程依旧可以正常执行。
无死锁:因为没有任何锁(有可能存在活锁),死锁问题不会困扰无锁数据结构。

活锁:
两个线程同时尝试修改数据结构,但每个线程所做的修改操作都会让另一个线程重启,所以两个线程就会陷入循环,多次的尝试完成自己的操作。不过活锁的存在时间并不久,因为其依赖于线程调度。所以只是对性能有所消耗,而不是一个长期问题,但这个问题仍需要关注。根据定义,因其操作执行步骤有上限,无等待的代码不会受活锁所困扰。换句话说,无等待的算法要比等待算法的复杂度高,即使没有其他线程访问数据结构,也可能需要更多步骤来完成相应操作。

无锁-无等待代码的缺点:
虽然提高了并发访问的能力,减少了单个线程的等待时间,但是其可能会将整体性能拉低。首先,原子操作的无锁代码要慢于无原子操作的代码,原子操作就相当于无锁数据结构中的锁。不仅如此,硬件必须通过同一个原子变量对线程间的数据进行同步。所以,无论是基于锁的数据结构,还是无锁的数据结构,对性能的检查都很重要(最坏的等待时间,平均等待时间,整体执行时间或者其他指标)。

栈(Stack)

  栈的要求很简单:查询顺序是添加顺序的逆序——先入后出(LIFO)。所以,确保值能安全的入栈就十分重要,因为可能在入栈后,会马上被其他线程访问,同时确保只有一个线程能访问到指定数据。最简单的栈就是链表,head指针指向第一个节点(可能是下一个被访问到的节点),并且每个节点依次指向下一个节点。
接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template<typename T>
class lock_free_stack{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;

void increase_head_count(counted_node_ptr& old_counter);

public:
~lock_free_stack();
void push(T const& data);
std::shared_ptr<T> pop();
};

push操作

添加一个节点的操作非常简单:

  1. 创建新节点node。
  2. 让node->next指向head->next。
  3. head->next指向node。

  单线程中这种方式没有问题,不过使用多线程对栈进行修改时,这几步就不够用了。有两个线程同时添加节点时,第2步和第3步会产生条件竞争:一个线程可能在修改head值时,另一个线程正在执行第2步,并且在第3步中对head进行更新。就会使之前那个线程的结果丢弃,亦或是造成更加糟糕的后果。还要注意当head更新并指向了新节点时,另一个线程就能读取到这个节点,因此head设置在s指向新节点前。因为在这之后就不能对节点进行修改了,所以新节点准备就绪就很重要。为了解决条件竞争,我们应该在第3步的时候使用原子“比较/交换”操作,来保证步骤2对head进行读取时,不会对head进行修改,有修改时可以循环“比较/交换”操作。

push操作如下:

1
2
3
4
5
void push(T const& data){
node* const new_node = new node(data);
new_node->next = head.laod();
while(!head.compare_exchange_weak(new_node->next, new_node));
}

为什么使用compare_exchange_weak():
使用compare_exchange_weak()来保证在被存储到new_node->next的head指针和之前的一样。使用“比较/交换”操作:返回false时,因为比较失败(例如,head被其他线程锁修改),会使用head中的内容更新new_node->next(第一个参数)的内容。因为编译器会重新加载head指针,所以循环中不需要每次都重新加载head指针。

为什么不使用compare_exchange_strong():
因为循环可能直接失败,所以使用compare_exchange_weak()要好于使用compare_exchange_strong()。

pop操作

删除一个节点的操作同样简单:

  1. 获取head。
  2. 读取head->next指向的结点node。
  3. 设置head->next指向node->next。
  4. 通过node返回携带的数据data。
  5. 删除node节点。

  多线程环境下,就不那么简单了。有两个线程要从栈中移除数据时,两个线程可能在步骤1中读取到同一个head。其中一个线程处理到步骤5时,另一个线程还在处理步骤2,这个还在处理步骤2的线程将会解引用一个悬空指针,此时会发生节点泄漏。另一个问题是,当两个线程同时读取head时,他们会返回相同的值,这会违反栈结构的意图。

pop操作如下:

1
2
3
4
5
void pop(T& result){
node* old_head = head.load();
while(!head.compare_exchange_weak(old_head, old_head->next));
result = old_head->data;
}

存在内存泄漏:

  1. 当head指针是空指针时,要访问next指针时,将引起未定义行为。
  2. 在返回值时会出现异常安全问题:有异常抛出时, 复制的值将丢失。可以返回智能指针(和有锁的数据结构中的实现相同)。

解决内存泄漏

  为了避免条件竞争,我们选择在pop()时返回一个指向节点的指针,但这样操作会导致pop()出栈的节点的指针仍然被一个线程持有,这样就会存在内存泄漏的问题。解决内存泄漏需要得知何时我们可以释放被pop()节点的内存空间。

  第一种方法:我们可以使用一个可删除列表,同时维护一个计数器,当计数器数值增加时,就是有节点被推入。当计数器数值减少时,就是有节点被删除。当计数器的数值为0时,我们从可删除列表中删除节点是安全的,也就是说我们在没有线程访问pop()时将节点进行回收。当然这里的计数器必须是原子的。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template<t_ypename T>
class lock_free_stack{
std::atomic<unsigned> threads_in_pop;
void try_reclaim(node* old_head);

public:
std::shared_ptr<T> pop() {
++thread_in_pop;
node* old_head = head.load();
while(old_head && !head.compare_exchange_wead(old_head, old_head->next));
std::shared_ptr<T> res;
if(old_head){
res.swap(old_head->data);
}
try_reclaim(old_head);
return res;
}
}

这里,threads_in_pop原子变量用来记录有多少线程试图弹出栈中的元素,调用pop()时,其值+1,调用。try_reclaim()时其值-1。当调用try_reclaim()时,说明节点已经“删除”,可以通过 swap()函数来删除节点上的数据(而非只是拷贝指针),当不再需要这些数据时,数据会自动删除,而不是持续存在(因为还有对未删除节点的引用被其它线程持有)。垃圾回收机制的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template<typename T>
class lock_free_stack{
private:
struct node{
std::shared_ptr<T> data;
node* next;

node(T const& data_): data(std::make_shared<T>(data_)){}
};

std::atomic<node*> head;
std::atomic<unsigned> threads_in_pop;
std::atomic<node*> to_be_deleted;

static void delete_nodes(node* nodes){
while(nodes){
node* next = nodes->next;
delete nodes;
nodes = next;
}
}

void try_reclaim(node* old_head){
if(threads_in_pop == 1){
node* nodes_to_delete = to_be_deleted.exchange(nullptr);
if(!--threads_in_pop)
delete_nodes(nodes_to_delete);
else if(nodes_to_delete)
chain_pending_nodes(nodes_to_delete);
delete old_head;
}else{
chain_pending_node(old_head);
--threads_in_pop;
}
}

void chain_pending_nodes(node* nodes){
node* last = nodes;
while(node* const next = last->next)
last = next;
chain_pending_nodes(nodes, last);
}

void chain_pending_nodes(node* first, node* last){
last->next = to_be_deleted;
while(!to_be_deleted.compare_exchange_weak(last->next, first));
}

void chain_pending_node(node* n){
chain_pending_nodes(n, n);
}
}

回收节点时,threads_in_pop是1,说明只有当前线程对pop()进行访问,就可以安全的将节点删除。当threads_in_pop不为1时删除任何节点都是不安全的,此时需要将节点加入到可删除队列中去。假设某个时刻,threads_in_pop值为1,此时可以尝试回收可删除列表中的节点,如果不回收,节点就会继续等待,直到整个栈被销毁。要做到回收,首先要通过原子exchange操作声明可删除列表,并将计数器减1。如果之后计数器的值为0,意味着没有其他线程访问可删除列表。而后,可以使用delete_nodes对链表进行迭代,并将其删除。当计数值在减后不为0时,回收节点就是不安全的。此时需要将该节点挂在可删除链表后,这种情况会发生在多个线程同时访问数据结构的时候。当有线程在当前线程检查threads_in_pop和node* nodes_to_delete = to_be_deleted.exchange(nullptr)之间调用了pop(),就可能会讲已经访问的节点重新填入到栈中。

  但是这种方法仍然存在很大的问题:当栈处于高负荷状态时,因为其他线程在初始化后都能使用pop(),所以to_ne_deleted链表将会无限增加,会再次泄露。这时我们需要考虑使用其它的方法,如:风险指针。