本文将介绍一些c++中基于锁实现的并发数据结构。包括线程安全的Stack、Queue、List和Map。本文代码主要基于c++14,有时也会使用一些c++17和c++20的新特性,同时会使用到一些基本的特性和并发开发工具,文中不会详细介绍,建议查阅资料或书籍《c++并发编程实战》。文中也会体现一些并发编程的基本思想和思考。
完整代码在GitHub:GitHub
如何实现并发
主要需要考虑两个方面,一个方面是确保访问的安全,另一方面是实现真正的并发。
如何确保数据的线程安全:
- 确保没有线程可以看到不变量的中间状态(修改过程对其它线程不可见)。
- 小心会引起条件竞争的接口,可以选择提供完整的操作函数,而不是单一步骤。
- 防止异常的发生而导致数据丢失。
- 防止死锁的产生。
如何防止死锁:
- 避免嵌套锁:尽量不要在持有一个锁的情况下再去请求另一个锁。
- 避免在持有锁时调用外部代码:外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反1,并造成死锁。
- 使用固定顺序获取锁:当硬性要求获取两个或两个以上的锁,并且不能使用 std::lock 单独操作来获取它们时,最好在每个线程上,用固定的顺序获取锁。(如链表中的手递手草组)
- 使用层次锁结构:对互斥量进行分层,按照优先级来获取锁。如持有一个锁后只能获取更低层次的锁,而不允许获得高层次的锁,除非释放当前锁。
如何实现高效的并发,考虑以下问题:
- 操作在锁的范围中进行,是否允许在锁外执行?
- 数据结构中不同的互斥量能否保护不同的区域?
- 所有操作都需要同级互斥量的保护吗?
- 能否对数据结构进行简单的修改,增加并发访问的概率?
Stack
我们从一个最简单的栈开始,接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| template<typename T> class threadsafe_stack{ private: std::stack<T> data; mutable std::mutex m;
public: threadsafe_stack(): data(std::stack<T>()) {} threadsafe_stack(const threadsafe_stack& other){ std::lock_guard lock(other.m); data = other.data; }
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value); std::shared_ptr<T> pop(); void pop(T& value); bool empty() const; }
|
实现中使用一个互斥量(mutex)m来对每个成员函数进行加锁保护。保证在同一时间内,只有一个线程可以访问到数据。同时会使用一些RAII模版类来进行互斥量资源的管理,如std::lock_guard。在并发访问的数据结构中,最主要的设计要素就是保证整个数据结构的异常安全,同时注意接口之间的条件竞争。
为什么没有top()?
在多线程环境中,保持数据一致性是至关重要的。top()可以让用户在不修改栈的情况下查看栈顶元素,这会导致一些特殊的情况,如:如果在调用 top() 之后立即调用 pop(),而另一个线程也在操作同一个栈,这可能导致不一致的状态。同时,注意如果我们在top()中返回了对一个共享数据的应用,那整个数据结构的并发安全就会被打破,因为持有这个引用的一方可以在不获得互斥量的情况下访问共享数据。
尽管没有实现 top() 方法,但可以通过其他方式来获取栈顶元素。例如,可以使用第二个重载的 pop(T& value) 方法来获取栈顶元素并将其存储在提供的引用变量中。这样,用户仍然可以获取栈顶元素而不必直接移除它。
接口间竞争
如我们在调用empty()后调用top(),那么有可能其它的线程已经修改了数据结构,这里的top()会得到错误的结果,也就是我们常说的存在数据竞争。
为什么有两个重载的pop()
这里主要要解决的问题是,试想这样一个数据结构std::stack<vector>,vector是个动态容器,当拷贝一个动态容器时,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。当vector中存有大量元素时,这种情况发生的可能性更大。当pop()函数返回“弹出值”时(也就是从栈中将这个值移除),会有一个潜在的问题:这个值返回到调用函数的时候,栈才被改变。但拷贝数据的时候,调用函数抛出一个异常会怎么样? 如果真的发生了,要弹出的数据将会丢失,它的确从栈上移除了,但是拷贝失败了。为了解决这个问题,共有以下几种方法:
- 传入一个引用(void pop(T& value);),用来接收“弹出值”。它的缺点是:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。
- 返回指向弹出值的指针。指针的优势是自由拷贝,并且不会产生异常。为了避免内存泄漏,鼓励使用使用 std::shared_ptr。
所以,为了通用性,我们应当提供以上两种接口,由用户选择具体使用那种。
为什么使用mutable
为了支持在const成员函数中安全地修改该互斥量,如empty();
首先来看两个pop接口的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| std::shared_ptr<T> pop(){ std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop(); return res; }
void pop(T& value){ std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack();
value = data.top(); data.pop(); }
|
pop()第一个重载中,代码可能会抛出empty_stack异常,但是,并没有数据的修改,所以这里是异常安全的。创建res时,也可能会抛出异常(std::make_shared 无法分配出足够的内存去创建新对象,拷贝或移动构造到新分配的内存中时抛出异常),两种情况都有c++标准库和运行时库来保证不会出现内存泄漏,并且新创建的对象(如果有的话)都能正确销毁。因为没有对栈进行任何修改,所以这里也没问题。
第二个重载pop()除了在拷贝赋值或移动赋值时会抛出异常,同样,在调用data.pop()之前,没有对数据结构进行修改,所以这个函数也是异常安全的。
empty()不会对数据进行修改,所以也是异常安全的。
注意,这里的构造和析构函数并不是线程安全的。所以,用户就要保证在栈对象完成构建前,其他线程无法对其进行访问。并且,要保证在栈对象销毁后,停止所有线程的访问操作。
Queue
首先是一个大家比较能够接受的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| template<typename T> class threadsafe_queue{ private: mutable std::mutex mut; std::condition_variable data_cond; std::queue<T> data_queue;
public: threadsafe_queue() {}
void push(T data); void wait_and_pop(T& value)l; std::shared_ptr<T> wait_and_pop(); bool try_pop(T& value); std::shared_ptr<T> try_pop(); bool empty() const; }
|
这个版本使用tls库中的queue作为底层的数据结构,同时使用互斥量和调教变量来提供线程间的互斥与同步。我们来看看push()和wait_and_pop()接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void push(T data){ std::lock_guard<std::mutex> lock(mut); data_queue.push(std::move(data)); data_cond.notify_one(); }
void wait_and_pop(T& value){ std::unique_lock<std::mutex> lock(mut); data_cond.wait(lock, [this](return !data_queue.empty();)); value = std::move(data_queue.front()); data_queue.pop(); }
std::shared_ptr<T> wait_and_pop(){ std::unique_lock<std::mutex> lock(mut); data_cond.wait(lock, [this]{return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front()))); data_queue.pop(); return res; }
|
为什么使用std::move()
将左指转换为右值,从而避免不必要的拷贝。
为什么使用条件变量
- 避免忙等待。忙等待会浪费 CPU 资源,因为线程会在无效的检查中消耗计算资源。使用条件变量后,线程可以安全地进入休眠状态,直到被唤醒。
- 释放互斥锁。条件变量会自动释放与之关联的互斥锁(mutex),允许其他线程获得锁并修改共享数据。当条件满足后,等待的线程会被唤醒,并尝试重新获取锁。这种机制确保了在等待期间不会有其他线程阻塞。
为什么有try_pop
在队列中存在数据时返回pop的数据,不存在数据时返回空。
代码中存在一些潜在的问题,如data_cond.notify_one()会导致一个潜在的异常情况,比如一个线程在wait_and_pop中抛出异常,那么其它的线程将永远无法被唤醒,因为当前只会有这一个工作线程。
可以采用以下修改,比如我们的queue中存储的数据不再是原始数据,而是一个指向原始数据的指针,这样就能使wait_and_pop()安全的工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| template<typename T> class threadsafe_queue{ private: mutable std::mutex mut; std::queue<std::shared_ptr<T> > data_queue; std::condition_variable data_cond; public: void push(T new_value) { std::shared_ptr<T> data( std::make_shared<T>(std::move(new_value))); std::lock_guard<std::mutex> lk(mut); data_queue.push(data); data_cond.notify_one(); }
std::shared_ptr<T> wait_and_pop(){ std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); std::shared_ptr<T> res=data_queue.front(); data_queue.pop(); return res; } }
|
注意这里push()中加锁的位置,对data的构造是无需加锁的,提前加锁只会导致更长的等待时间。
至此,使用tls库中queue的实现就差不多结束了,但是思考这样一个问题,队列是一头只执行push操作,而另一头只执行pop操作,我们对整个队列加锁会使后续的pop和push操作均处于等待,所以我们可以考虑不使用tls库中的queue,该用其它的数据结构实现队列,同时提供更细粒度的锁来实现性能更佳的并发。
使用更细粒度的锁来实现队列可以考虑将底层的数据结构更换为单链表,因为使用单链表我们可以对单个节点加锁。所以我们可以通过减少上锁的时间来实现更大程度的并发。
接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| template<typename T> class threadsafe_queue{ private: struct node { std::shared_ptr<T> data; std::unique_ptr<node> next; };
std::mutex head_mutex,tail_mutex; std::condition_variable data_cond; std::unique_ptr head; node* tail; public: threadsafe_queue(): head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete; threadsafe_queue& operator=(const threadsafe_queue& other) = delete;
std::shared_ptr<T> try_pop(); bool try_pop(T& value); std::shared_ptr<T> wati_and_pop(); void wait_and_pop(T& value); void push(T new_value); bool empty(); }
|
为什么构造函数中预分配节点
因为如果没有预分配的节点,那么在push和pop操作时需要持有两个互斥量,预分配节点后可以消除这种数据竞争。
为什么删除拷贝构造函数和拷贝复制运算符
实现时需要注意有些函数是需要同时访问头尾指针的,如try_pop(),他需要首先判断队列是否为空,这需要同时对头尾指针进行操作。这个时候我们可以使用一些辅助函数来得到尾指针所在的值。如:
1 2 3 4
| node* get_tail(){ std::lock_guard<std::mutex> tail_lock(tail_mutex); return tail; }
|
但是如果你使用了get_tail来判断队列是否为空时,就需要确保一些操作的加锁顺序,如pop操作,如果你在获取队头锁之前调用了get_tail,之后在获取队头锁那么有可能会有潜在的数据竞争,因为在这两个操作之间可能已经有其它的线程pop了队列,并使队列为空,那么你的操作就会存在未定义的风险。
接下来是一些接口实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| template<typename T> void threadsafe_queue<T>::push(T new_value){ std::shared_ptr<T> new_data(std::make_shared(std::move(new_value))); std::unique_ptr<node> p(new node); { std::lock_guard<std::mutex> tail_lock(tail_mutex); tail->data = new_data; node* const new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } data_cond.notify_one(); }
|
注意加锁的位置和具体实现,这里提前构造好新的尾部虚拟节点,并在缓冲区内只存在指针的操作,而不存在具体的数据构造,这样可以减少锁持有的时间。
try_pop和wait_and_pop的接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| template<typename T> std::shared_ptr<T> threadsafe_queue<T>::try_pop(){ std::unique_ptr<typename threadsafe_queue<T>::node> const old_head = threadsafe_queue<T>::try_pop_head(); return old_head ? old_head -> data : std::shared_ptr<T>(); }
template<typename T> bool threadsafe_queue<T>::try_pop(T& value){ std::unique_ptr<typename threadsafe_queue<T>::node> const old_head = threadsafe_queue<T>::try_pop_head(value); return old_head; }
template<typename T> std::shared_ptr<T> threadsafe_queue<T>::wait_and_pop(){ std::unique_ptr<node> const old_head = wait_pop_head(); return old_head -> data; }
template<typename T> void threadsafe_queue<T>::wait_and_pop(T& value){ std::unique_ptr<node> const old_head = wait_pop_head(value); }
std::unique_lock<std::mutex> wait_for_data(){ std::unique_lock<std::mutex> head_lock(head_mutex); data_cond.wait(head_lock, [&]{return head.get() != get_tail();}); return std::move(head_lock); }
std::unique_ptr<node> wait_pop_head(){ std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head(); }
std::unique_ptr<node> wait_pop_head(T& value){ std::unique_lock<std::mutex> head_lock(wait_for_data()); value = std::move(*head -> data); return pop_head(); }
std::unique_ptr<node> try_pop_head(){ std::lock_guard<std::mutex> head_lock(head_mutex); if(head.get() == get_tail()) return std::unique_ptr<node>(); return pop_head(); }
std::unique_ptr<node> try_pop_head(T& value){ std::lock_guard<std::mutex> head_lock(head_mutex); if(head.get() == get_tail()) return std::unique_ptr<node>(); value = std::move(*head -> data); return pop_head(); }
|
这里实现了一些辅助函数来完成实现,但主要还是需要注意两个锁的上锁位置和上锁顺序,在保证安全的前提下尽可能短时间的持有锁。
Map
map不同于前两种数据结构,通常我们不需要对map有太多的修改,而是有大量的查询操作。但是tls库中提供的一些map都不是线程安全的。而且考虑这样一个问题,如果一个map返回了一个迭代器,像tls库中提供的那样,那么这个迭代器指向的数据被删除时,所有对这个迭代器的访问都是未定义的。
我们对map定义四种接口:添加,修改,删除和查询。
这里我们还是考虑使用细粒度的锁来实现我们的数据结构。实现map共有三种方法,红黑树、有序数组和哈希表,前两种方法都无法细粒度化锁或者说无法彻底细粒度化,所以考虑使用哈希表实现。使用哈希表时我们可以安心的对每个桶上锁
接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| template<typename Key, typename Value, typename Hash = std::hash<Key> > class threadsafe_lookup_table{ private: class bucket_type{ private: typedef std::pair<Key, Value> bucket_value; typedef std::list<bucket_value> bucket_data; typedef typename bucket_data::iterator bucket_iterator;
bucket_data data; mutable std::shared_mutex mutex;
bucket_iterator find_entry_for(Key const& key) const public: Value value_for(Key const& key, Value const& default_value) const; void add_or_update_mapping(Key const& key, Value const& value); void remove_mapping(Key const& key); };
std::vector<std::unique<bucket_type> > buckets; Hash hasher; bucket_type& get_bucket(Key const& key) const public: typedef Key key_type; typedef Value mapped_type; typedef Hash hash_type;
threadsafe_lookup_table(unsigned num_buckets = 19, Hash const& hasher_ = Hash()) : buckets(num_buckets), hasher(hasher_){ for(unsigned i = 0; i < num_buckets; ++i) buckets[i].reset(new bucket_type); } threadsafe_lookup_table(threadsafe_lookup_table const& other) = delete; threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other) = delete;
Value value_for(Key const& key, Value const& default_value = Value()) const; void add_or_update_mapping(Key const& key, Value const& value); void remove_mapping(Key const& key); std::map<Key, Value> get_map() const; }
|
接口的定义已经非常清晰了。在构造函数中我们可以指定桶的数量,这个数通常是一个质数,因为这样会使哈希表的工作效率最高,在《算法导论》这本书中可以找到相关的证明。每个桶都会被一个std::shared_mutex保护,所以对每个桶来说,只有一个线程可以修改他。
这里的get_bucket()是无需上锁的,因为他不会修改任何数据,而且桶的数量是固定的。value_for()不会对数据进行修改,只进行查询操作,所以可以只获得读锁。get_map()用于生成当前状态的快照,该操作需要获取每个桶的锁,也就是锁住整个map(这里有更加高效的实现!)。
List
前面的map使用了stl库中的list进行实现,那么它是线程安全的吗?不是。线程安全的链表需要提供以下接口:添加、删除、查找、更新和复制。链表是天然的可以高效并发的数据结构,我们可以让每个节点持有一个互斥量,这样我们就可以对持有的节点群上锁,并在移动到下一个节点时对锁进行释放。
接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| template<typename T> class threadsafe_list{ private: struct node{ std::shared_ptr<T> data; std::unique_ptr<T> next; std::mutex m;
node(): next() {} node(T const& value): data(std::make_shared<T>(value)){}; }; node head;
public: threadsafe_list(){} ~threadsafe_list(){ std::remove_if([](T const&){return true;}); }
threadsafe_list(threadsafe_list const& other)=delete; threadsafe_list& operator=(threadsafe_list const& other)=delete;
void push_front(T const& value); template<typename Function> void for_each(Function f); template<typename Predicate> std::shared_ptr<T> find_first_if(Predicate p); template<typename Predicate> void remove_if(Predicate p) };
|
这里是一个单链表的实现。
为什么不使用标准库中的实现(for_each、remove_if和find_first_if)
因为stl类的迭代器是需要持有容器内部引用的,这可能会导致在无锁状态下对数据结构产生修改,所以使用自定义的函数。
链表中的加锁和解锁是一个手递手的过程就像下面这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| template<typename Predicate> std::shared_ptr<T> find_first_if(Predicate p){ node* current = &head; std::unique_lock<std::mutex> lk(head.m); while(node* const next=current->next.get()){ std::unique_lock<std::mutex> next_lk(next->m); lk.unlock(); if(p(*next->data)){ return next->data; } current=next; lk=std::move(next_lk); } return std::shared_ptr<T>(); }
|
我们持有下一个节点的锁,释放这个节点的锁,以此类推。
总结
写的太过简陋,大家还是去看看代码吧,可以更深入的了解。当然也可以通过内存序来实现无锁结构的线程安全的数据结构,可以参考其它资料。