互斥量保护共享数据
锁保护数据,尽量缩短持锁时间,避免锁内调用耗时操作或外部代码。合理控制锁粒度与顺序,配合 std::unique_lock 等工具,防止死锁并提升并发效率。
互斥量保护共享数据
在并发环境下保护共享数据的核心方法之一,就是使用互斥量(mutex)。本文围绕 C++ 标准库中的 std::mutex
、std::lock_guard
、std::scoped_lock
、std::unique_lock
等工具展开,涵盖互斥的基本操作、接口陷阱、死锁问题与避免策略、锁的粒度优化等方面内容,并配有完整示例代码。
互斥量与 std::lock_guard
std::lock_guard
最基本的做法是用 std::mutex
保护共享数据,利用 std::lock_guard
实现 RAII 风格的自动上锁和解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <list> // 提供 std::list 容器
#include <mutex> // 提供 std::mutex 和 std::lock_guard 用于线程同步
#include <algorithm> // 提供 std::find 算法
// 一个线程共享的全局列表,用于存储整数
std::list<int> some_list;
// 一个全局互斥量,用于保护对 some_list 的并发访问
std::mutex some_mutex;
// 向列表中添加一个元素,使用互斥量进行线程安全保护
void add_to_list(int new_value) {
std::lock_guard<std::mutex> guard(some_mutex); // 加锁,在作用域结束时自动解锁
some_list.push_back(new_value); // 安全地向列表尾部添加一个新值
}
// 检查某个值是否存在于列表中,线程安全
bool list_contains(int value_to_find) {
std::lock_guard<std::mutex> guard(some_mutex); // 加锁,保护整个查找过程
// 使用 std::find 在线性列表中查找目标值
return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
等价的传统写法:
1
2
3
4
std::mutex m;
m.lock();
// ...访问共享资源
m.unlock();
但如果中间抛出异常,就会忘记 unlock,可能造成死锁。
使用 lock_guard
更安全:
1
2
std::lock_guard<std::mutex> guard(m);
// ...访问共享资源,退出作用域自动 unlock
C++17 起,可以使用模板参数推导:
1
std::lock_guard guard(some_mutex); // 自动推导出是 std::mutex
std::scoped_lock
或更进一步使用 std::scoped_lock
:
1
std::scoped_lock guard(some_mutex);
这是 C++17 引入的一种比 std::lock_guard
更强大的 RAII 锁:
- 和
std::lock_guard
类似,会在构造时自动加锁,在析构时自动解锁; - 但它支持同时锁多个互斥量,并且内部使用
std::lock()
,可以避免死锁。
用法示例:
1
2
std::mutex m1, m2;
std::scoped_lock lock(m1, m2); // 同时加锁多个互斥量,顺序无死锁
和下面这段写法等价,但更安全、简洁:
1
2
3
4
5
6
std::mutex m1, m2;
std::lock(m1, m2); // 同时锁多个互斥量,顺序自动避免死锁
std::lock_guard<std::mutex> lock1(m1, std::adopt_lock); // 告诉 lock_guard:我已经锁好了
std::lock_guard<std::mutex> lock2(m2, std::adopt_lock);
小心保护数据的“假象”
即使使用了互斥锁,也可能在间接暴露数据引用时破坏线程安全:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <string>
#include <mutex>
// 一个简单的数据类
class some_data {
int a;
std::string b;
public:
void do_something(); // 假设这个函数会修改 a 和 b
};
// 包装器类,封装了 some_data 与一个互斥量
class data_wrapper {
private:
some_data data; // 被保护的数据
std::mutex m; // 互斥量,用于保护 data 的访问
public:
// 模板函数:接受一个函数 func,并将 data 传给它执行
// 错误点:虽然加了锁,但把 data 的引用暴露出去了
template<typename Function>
void process_data(Function func) {
std::lock_guard<std::mutex> l(m); // 上锁,保护 data
func(data); // ⚠️ 把 data 的引用传出去,可能被保存
}
};
// 一个全局变量,用来“偷走”对 data 的引用(模拟恶意行为)
some_data* unprotected;
// 恶意函数,拿到 protected_data 的引用,并保存起来
void malicious_function(some_data& protected_data) {
unprotected = &protected_data; // ⚠️ 保存引用,等锁释放后仍能访问
}
// 调用者调用 process_data 时传入恶意函数
void foo() {
data_wrapper x;
// 在 process_data 内部锁住了 data,并调用 malicious_function
// malicious_function 偷偷保存了 data 的地址
x.process_data(malicious_function);
// ⚠️ 现在锁已经释放,但我们还持有 data 的裸指针
// 无锁访问受保护的数据 -> 条件竞争/未定义行为
unprotected->do_something(); // ❌ 非线程安全操作
}
解决方法:绝不返回指针/引用,不将保护数据传出锁作用域外。
改法一:不允许传出引用,仅提供封装接口
如果只需要在类内部访问数据,而不需要暴露数据结构,可以将数据访问逻辑也封装在类内部:
1
2
3
4
5
6
7
8
9
10
11
class data_wrapper {
private:
some_data data; // 被保护的数据,只能通过类访问
std::mutex m; // 用于保护 data 的互斥量
public:
void safe_do_something() {
std::lock_guard<std::mutex> lock(m); // 加锁
data.do_something(); // 在加锁范围内安全访问/修改 data
}
};
data
是私有成员,外部代码无法直接访问它;- 所有访问
data
的操作都必须通过类提供的接口;
调用者只能这样使用:
1
2
data_wrapper x;
x.safe_do_something(); // 安全,不能越权访问 data
改法二:传值(拷贝)而不是传引用
如果确实需要执行用户函数,可以传入一个 数据的拷贝,避免将原始引用泄露:
1
2
3
4
5
6
template<typename Function>
void process_data(Function func) {
std::lock_guard<std::mutex> lock(m);
some_data copy = data; // 复制 data
func(copy); // 给用户的只是拷贝
}
- 用户函数
func
操作的是data
的副本copy
; - 用户无法修改原始的
data
(即使func
修改了copy
,原始data
也不会变); - 线程安全,但不可写(只读场景适用)。
这样,即使 func
试图持久化这个副本,也不会影响原始的受保护数据。
改法三:传闭包,仅在锁保护范围内执行
如果要执行的操作很简单(如只修改 data
某些字段),可以让 process_data()
来控制执行过程,而不是传整个对象:
1
2
3
4
5
template<typename Function>
void process_data(Function func) {
std::lock_guard<std::mutex> lock(m);
func(data); // func 不能保存引用,只在当前作用域内调用
}
然后用户保证不保留引用:
1
2
3
x.process_data([](some_data& d) {
d.do_something(); // 不能返回引用、不能保留下来
});
注意:这种方式是有风险的,前提是你信任 func
不会偷偷保存引用。最保险的仍然是传值或封装接口。
这个方法的用处不在于提升线程安全,而是方便调用者在锁保护范围内灵活执行操作,避免每次调用都要自己写锁,提高代码简洁性和可维护性。
最强防护(推荐):完全不让用户接触内部数据
1
2
3
4
5
6
7
8
9
10
class data_wrapper {
private:
some_data data;
std::mutex m;
public:
void do_task() {
std::lock_guard<std::mutex> lock(m);
data.do_something(); // 只暴露受控操作
}
};
接口设计中的条件竞争
使用互斥量或其他机制保护共享数据后,并不意味着就完全避免了条件竞争。仍需确认共享数据和整个操作是否被真正保护。
条件竞争示例:线程安全链表删除操作
例如,线程安全地删除双链表中某个节点,需要保证对该节点及其前后邻节点的访问都受到保护。
- 如果仅仅保护指针的访问(例如对指针操作加锁),但没保护整个结构和操作,则条件竞争依旧存在,线程间可能产生竞态。
- 最简单的解决方案是用一个互斥量保护整个链表,保证删除操作的原子性。
条件竞争在栈接口中的体现
以实现一个类似 std::stack
的线程安全栈为例。
除了构造函数和 swap()
,它需要实现以下操作:
push()
:推入新元素pop()
:弹出栈顶元素top()
:查看栈顶元素empty()
:判断是否为空size()
:获取元素个数
典型条件竞争示例
尽管单个操作安全,但接口设计上的条件竞争依然存在:
1
2
3
4
5
6
stack<int> s;
if (!s.empty()) { // 1. 检查是否为空
int const value = s.top(); // 2. 获取栈顶元素
s.pop(); // 3. 弹出栈顶元素
do_something(value);
}
- 在单线程环境下安全,也符合
top()
对空栈未定义行为的预期。 - 但在多线程共享环境下,调用顺序不安全:
- 在
empty()
和top()
之间,其他线程可能调用pop()
弹出最后一个元素,导致top()
访问空栈,出现条件竞争。
- 在
其他竞态示例
两个线程可能同时调用 top()
两次,没有任何线程在调用 pop()
期间修改栈,导致同一元素被重复处理:
线程 A | 线程 B |
---|---|
if (!s.empty()) | |
if (!s.empty()) | |
int value = s.top(); | |
int value = s.top(); | |
s.pop(); | |
do_something(value); | s.pop(); |
do_something(value); |
这类条件竞争更难发现。
异常安全问题
原始的 T pop()
写法
1
2
3
4
5
6
7
8
T pop() {
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
T value = data.top(); // ① 可能抛异常(拷贝构造)
data.pop(); // ② 删除栈顶元素
return value; // ③ 返回时可能抛异常(移动/拷贝构造)
}
异常安全问题
- 若
T value = data.top();
抛异常,data.pop()
不执行,数据仍在栈中,函数半成功,调用者不确定是否真的弹出了元素,造成“部分执行失败”,难排查。 - 若
return value;
构造返回值抛异常,但data.pop()
已执行,数据丢失但调用者未成功获得值,极其危险。
这就是异常安全性破坏:异常导致数据丢失或状态不确定。
接口原子性问题
top()
和 pop()
是两个独立操作:
1
2
3
4
if (!s.empty()) {
auto val = s.top(); // 线程 A 看到非空
s.pop(); // 线程 B 已经 pop 了 → top() 返回空元素或未定义行为
}
调用者无法保证“取值+弹出”这两个动作是原子的,多线程环境存在条件竞争。
为什么 std::stack
设计成 top()
+ pop()
而不是单独 pop()
返回值?
- 强异常安全保证:
top()
抛异常时,数据仍在栈中未被弹出,允许调用者在异常恢复后重试。 - 缺点:多线程环境下存在条件竞争,因为“读取+删除”不是原子操作。
解决接口条件竞争的几种选项
传入引用参数获取弹出值
1
2
3
4
5
6
7
void pop(T& out) {
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
out = data.top(); // 先拷贝
data.pop(); // 拷贝成功后再弹出
}
- 优点:
- 原子完成“取值 + 弹出”操作;
- 异常安全,拷贝失败时栈不变。
- 缺点:
- 要求提前创建变量;
- 如果
T
不支持赋值或构造代价大,使用受限。
为什么
out = data.top(); data.pop();
更安全? 因为异常只可能发生在明确写出的赋值那一步,pop()
后不会执行,状态清晰。而T pop()
返回值形式的异常可能在无法控制的返回值构造阶段抛出,导致状态不明确。
使用无异常抛出的拷贝/移动构造
1
static_assert(std::is_nothrow_move_constructible<T>::value, "T must be nothrow move constructible");
- 编译时保证安全,允许返回值方式实现
pop()
。 - 缺点:限制过死,不适用于大多数用户自定义类型。
返回指向弹出值的智能指针
1
std::shared_ptr<T> pop();
- 优点:
- 线程安全;
- 异常安全(
make_shared
失败会抛异常,数据不丢失); - 不要求
T
支持赋值。
- 缺点:
- 堆分配开销;
- 对简单类型显得重。
组合使用
1
2
std::shared_ptr<T> pop(); // 推荐主接口
void pop(T& out); // 高性能版
- 用户可根据需求选择性能或安全性。
线程安全栈示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
// 自定义异常类型,表示栈为空时弹出失败
struct empty_stack : std::exception {
const char* what() const throw() override {
return "empty stack!";
}
};
template<typename T>
class threadsafe_stack {
private:
std::stack<T> data; // 底层存储容器,非线程安全的标准栈
mutable std::mutex m; // 保护 data 的互斥量,mutable 允许 const 方法加锁
public:
// 默认构造函数,初始化空栈
threadsafe_stack() : data(std::stack<T>()) {}
// 拷贝构造函数,线程安全地复制另一个栈的数据
threadsafe_stack(const threadsafe_stack& other) {
std::lock_guard<std::mutex> lock(other.m); // 锁住 other 的互斥量,防止数据被修改
data = other.data; // 安全地复制数据
}
// 禁止赋值操作,防止线程安全问题和资源冲突
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
// 向栈中压入一个新元素,传值参数,支持移动语义
void push(T new_value) {
std::lock_guard<std::mutex> lock(m); // 加锁保护栈数据
data.push(std::move(new_value)); // 利用移动构造入栈,避免不必要拷贝
}
// 弹出栈顶元素,返回智能指针指向该元素
std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lock(m); // 加锁保护
if (data.empty()) // 空栈抛异常
throw empty_stack();
// 先拷贝栈顶元素到堆中,构造 shared_ptr,保证异常安全
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop(); // 弹出栈顶元素
return res; // 返回拷贝的智能指针
}
// 弹出栈顶元素,通过引用参数传出
void pop(T& value) {
std::lock_guard<std::mutex> lock(m); // 加锁保护
if (data.empty()) // 空栈抛异常
throw empty_stack();
value = data.top(); // 先拷贝栈顶元素
data.pop(); // 成功拷贝后弹出栈顶元素
}
// 判断栈是否为空,线程安全
bool empty() const {
std::lock_guard<std::mutex> lock(m); // 加锁保护
return data.empty();
}
};
这段代码核心思想:
- 使用互斥量保证每个操作线程安全;
- 提供两种
pop
版本:一种返回智能指针,方便安全管理弹出元素;另一种通过引用参数传出,避免堆分配,提升性能; - 拷贝构造函数也保证线程安全地复制数据;
- 赋值操作被禁用,防止复杂的线程安全问题。
互斥量的粒度及死锁
- 大粒度锁(例如一个全局互斥量保护所有共享数据)简单,但阻塞并发性能,容易成为瓶颈。
- Linux 内核曾使用全局锁,导致多核性能低,后改进为细粒度锁。
- 细粒度锁带来更高并发,但需要小心避免死锁,尤其当一个操作需要同时持有多个锁时。
- 死锁:线程互相等待资源,导致无法继续执行,是与条件竞争相反的线程安全问题。
小结
- 互斥量只能保证单个操作的原子性和线程安全,但接口设计必须避免条件竞争。
- 设计线程安全接口时,减少多操作调用时的竞态窗口,保证操作整体的原子性。
- 细粒度锁提高并发性能,但增加死锁风险,需要权衡。
- 设计线程安全容器时,需综合考虑性能、安全性和接口的易用性。
死锁的解决方案:std::lock
+ adopt_lock
死锁问题描述
想象一个玩具由两部分组成,比如玩具鼓,需要鼓锤和鼓才能玩。有两个小孩同时想玩这个玩具。鼓和鼓锤分别放在两个不同的玩具箱里。两个小孩同时去拿玩具箱中的部分:
- 小孩 A 找到了鼓,
- 小孩 B 找到了鼓锤。
此时双方都手握玩具的一部分,等待对方释放另一部分才能开始玩。如果两个小孩都不让步,就会发生死锁——两个人都在等待对方,谁也没法玩鼓。
在多线程中也类似,两个或多个线程各自锁住了一个互斥量,并等待对方释放另一个锁,导致所有线程都阻塞,无法继续执行。
避免死锁的一般原则
避免死锁的一个常用建议是:
让所有互斥量都按照相同的顺序加锁
例如,总是在锁 B 之前锁 A,这样就不会发生循环等待,死锁的情况可以避免。
但实际情况复杂,当多个互斥量保护同一类的不同实例时,交换操作可能涉及两个不同实例的锁。比如两个线程分别交换两个对象的数据,如果它们锁的顺序不统一,就依然可能死锁。
C++ 标准库中的解决方案:std::lock
C++ 提供了std::lock
,它可以一次性锁住多个(两个或以上)互斥量,且没有死锁风险。
以下是一个示例,展示如何使用std::lock
和std::lock_guard
实现交换操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <mutex>
class some_big_object; // 假设已有实现
void swap(some_big_object& lhs, some_big_object& rhs); // 假设已有实现
class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) : some_detail(sd) {}
friend void swap(X& lhs, X& rhs) {
if (&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m); // 1. 一次性锁住两个互斥量
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2. 管理锁的生命周期,表示锁已被拿到
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail); // 执行实际数据交换
}
};
std::lock(lhs.m, rhs.m)
会一次性锁住两个互斥量,避免死锁。std::lock_guard
的构造参数传入std::adopt_lock
,表示这把锁已经被std::lock
拿到了,lock_guard
只负责管理生命周期,不再尝试上锁。- 如果
std::lock
在锁其中一个互斥量时失败抛异常,会自动释放已锁的互斥量,保证异常安全。
C++17 的改进:std::scoped_lock
C++17 引入了 std::scoped_lock
,它结合了 std::lock
和 std::lock_guard
的功能,可以直接接收多个互斥量,自动安全地上锁和解锁,代码更简洁。
上述的 swap
函数可以简化为:
1
2
3
4
5
6
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs)
return;
std::scoped_lock guard(lhs.m, rhs.m); // 一次性锁住所有互斥量
swap(lhs.some_detail, rhs.some_detail);
}
- 这里利用了 C++17 的模板参数自动推导机制,不用显式写出模板参数类型。
std::scoped_lock
析构时自动解锁,保证异常安全。- 它能接受任意数量的互斥量,替代
std::lock
+ 多个lock_guard
的写法。
等价写法(显式模板参数):
1
std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
避免死锁的进阶指导
死锁多因对锁的不当使用造成,常见于多个线程相互等待资源时陷入阻塞,导致系统无法继续运行。
死锁示例:两个线程互等
即便没有显式锁,如下代码中两个线程互相 join()
也会导致死锁:
1
2
3
4
5
6
std::thread t1([&] {
t2.join(); // 等待 t2
});
std::thread t2([&] {
t1.join(); // 等待 t1
});
死锁的本质是循环等待。为避免死锁,建议:不要谦让,主动规避风险。
避免死锁的常见策略
避免嵌套锁
建议线程一次只持有一个锁。若必须获取多个锁,使用:
1
2
3
std::lock(m1, m2); // 一次性加锁多个互斥量
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
或使用 C++17 的 std::scoped_lock
:
1
std::scoped_lock lock(m1, m2); // 自动处理所有互斥量
避免在持锁时调用外部代码
外部代码可能也尝试加锁,容易破坏锁顺序引发死锁。例如:
1
2
3
4
5
template <typename Func>
void process_data(Func func) {
std::lock_guard<std::mutex> lock(m);
func(data); // 调用外部代码,风险!
}
固定顺序加锁
必须加多个锁时,确保所有线程按照统一顺序加锁。例如对链表:
- 删除节点需锁定:当前节点 + 两个邻接节点。
- 遍历链表时应以“手递手”方式加锁,并避免逆序访问。
错误写法(会导致死锁):
1
2
std::lock_guard<std::mutex> lock1(nodeA->m); // A -> B
std::lock_guard<std::mutex> lock2(nodeB->m);
1
2
std::lock_guard<std::mutex> lock1(nodeB->m); // B -> A
std::lock_guard<std::mutex> lock2(nodeA->m);
多个线程以不同顺序加锁,一旦交叉,会互相等待,最终死锁。
死锁示意图(链表节点 A-B-C):
线程1 | 线程2 |
---|---|
锁住节点 A 的互斥量 | 锁住节点 C 的互斥量 |
试图锁 B(失败) | 试图锁 B(失败) |
等待 B | 等待 A |
死锁! |
解决方案:规定遍历方向与锁顺序一致,避免交叉。
正确写法:按地址顺序加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void lock_two_nodes_safe(Node* a, Node* b) {
if (a == b) {
a->m.lock(); // 自锁即可
return;
}
if (a < b) {
a->m.lock();
b->m.lock();
} else {
b->m.lock();
a->m.lock();
}
}
void unlock_two_nodes(Node* a, Node* b) {
a->m.unlock();
if (b != a)
b->m.unlock();
}
或者用 std::unique_lock
自动解锁:
1
2
3
4
5
6
7
8
std::unique_lock<std::mutex> lock1, lock2;
if (nodeA < nodeB) {
lock1 = std::unique_lock<std::mutex>(nodeA->m);
lock2 = std::unique_lock<std::mutex>(nodeB->m);
} else {
lock1 = std::unique_lock<std::mutex>(nodeB->m);
lock2 = std::unique_lock<std::mutex>(nodeA->m);
}
关键点:以节点地址排序为依据统一加锁顺序,确保所有线程按相同规则执行,从根本上避免死锁。
使用层次锁结构(hierarchical_mutex)
为每个互斥量设定一个层级值,要求:高层锁先加,低层锁后加。若违反此约定,则抛出异常,防止死锁发生。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 定义三个互斥量,每个带有一个“层级值”。
// 层级值越高,表示越高层,必须先加高层锁,再加低层锁。
hierarchical_mutex high_level_mutex(10000); // 高层互斥量
hierarchical_mutex low_level_mutex(5000); // 低层互斥量
hierarchical_mutex other_mutex(6000); // 中间层互斥量
int do_low_level_stuff();
// 封装低层函数,持有 low_level_mutex(层级 5000)
int low_level_func() {
std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // ✅ 当前线程层级从 ULONG_MAX -> 5000
return do_low_level_stuff(); // 假设这里不再持锁
}
void high_level_stuff(int some_param);
// 高层函数,先获取 high_level_mutex(层级 10000),然后调用低层函数
void high_level_func() {
std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // ✅ 当前线程层级从 ULONG_MAX -> 10000
high_level_stuff(low_level_func()); // ✅ low_level_mutex 层级 5000,小于 10000,合法
}
// 线程 A,调用高层逻辑
void thread_a() {
high_level_func(); // ✅ 没有违反层级规则,运行正常
}
void do_other_stuff();
// 中间函数,先调用 high_level_func,再调用其他函数
void other_stuff() {
high_level_func(); // ⚠️ 问题在这里:调用者必须当前线程未持有其他低层互斥量!
do_other_stuff();
}
// 线程 B,先锁定中层互斥量,再调用 high_level_func
void thread_b() {
std::lock_guard<hierarchical_mutex> lk(other_mutex); // 🔒 当前线程层级从 ULONG_MAX -> 6000
other_stuff(); // ⚠️ 违反层级规则:持有 6000 后,再尝试加锁 10000(更高层)互斥量,抛出异常
}
thread_a
正确:先加高层再加低层。thread_b
错误:先加中层再加高层,违反锁顺序,运行时报错!
层次互斥量实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class hierarchical_mutex {
std::mutex internal_mutex; // 实际用于加锁的 std::mutex
unsigned long const hierarchy_value; // 当前互斥量的层级值,越大表示越“上层”
unsigned long previous_hierarchy_value; // 当前线程加锁之前的层级值(用于恢复)
// 每个线程独有的当前锁层级,初始化为 ULONG_MAX(最高)
static thread_local unsigned long this_thread_hierarchy_value;
// 检查是否违反了层级规则
void check_for_hierarchy_violation() {
// 如果当前线程持有的层级值 <= 将要加的锁的层级,说明违反了“高层先锁、低层后锁”的规则
if (this_thread_hierarchy_value <= hierarchy_value)
throw std::logic_error("mutex hierarchy violated");
}
// 更新当前线程的层级状态
void update_hierarchy_value() {
// 记录当前线程之前的层级值
previous_hierarchy_value = this_thread_hierarchy_value;
// 更新为当前加锁的互斥量的层级值
this_thread_hierarchy_value = hierarchy_value;
}
public:
// 构造函数:设置层级值
explicit hierarchical_mutex(unsigned long value)
: hierarchy_value(value), previous_hierarchy_value(0) {}
// 加锁函数
void lock() {
check_for_hierarchy_violation(); // 检查层级是否符合规范
internal_mutex.lock(); // 加内部锁
update_hierarchy_value(); // 更新线程的层级值
}
// 解锁函数
void unlock() {
// 检查当前线程的层级值是否匹配当前互斥量的层级值
// 如果不匹配说明 unlock 顺序错误(不是最后加的那个锁)
if (this_thread_hierarchy_value != hierarchy_value)
throw std::logic_error("mutex hierarchy violated");
// 恢复线程先前的层级值
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock(); // 解锁内部互斥量
}
// 尝试加锁函数(不会阻塞)
bool try_lock() {
check_for_hierarchy_violation(); // 层级检查
if (!internal_mutex.try_lock()) // 尝试加锁失败,返回 false
return false;
update_hierarchy_value(); // 成功后更新线程层级状态
return true;
}
};
// 定义线程局部变量:每个线程都维护一份自己的当前层级值,初始值为最大
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
- 每个线程维护一个
this_thread_hierarchy_value
。 - 上锁前做层级检查,确保符合顺序。
- 解锁后恢复先前层级,避免嵌套混乱。
- 使用标准库
std::lock_guard<hierarchical_mutex>
即可无缝集成。
超越锁的扩展建议
死锁也可能出现在如下情况:
join()
等待某个线程结束(而该线程还在等待锁)- 条件变量造成的环等待
建议:
- 线程间有层级顺序,如:高层线程只能等待低层线程。
- 在同一函数内创建并等待线程,有助于规避复杂依赖。
避免死锁的六大建议
建议 | 说明 |
---|---|
避免嵌套锁 | 一个线程一次只持有一个锁 |
使用 std::lock() | 一次性加锁多个互斥量,避免交叉等待 |
避免持锁调用外部函数 | 外部行为不可控,可能造成死锁 |
统一加锁顺序 | 所有线程按照固定顺序获取锁 |
使用层次锁结构 | 强制锁顺序,运行时检测违例 |
避免线程间循环等待 | 线程间应有清晰的层级关系 |
std::unique_lock
:更灵活的 RAII 锁
std::unique_lock
是一个比 std::lock_guard
更加灵活的互斥量封装器。它提供了更丰富的操作,例如延迟加锁、提前解锁、尝试加锁等功能,适用于需要精细控制互斥行为的场景。
特性概览
- 灵活性更高:可以选择在构造时不立即加锁,或者将已有的锁状态接管;
- 支持锁操作:支持
.lock()
、.try_lock()
和.unlock()
成员函数; - 可转移所有权:支持移动构造/赋值,允许将锁的所有权从一个作用域传递到另一个;
- 占用空间较大,性能略低:相比
std::lock_guard
,体积更大,操作稍慢,但换来的是更大的灵活性。
使用方式
构造 std::unique_lock
时,可以选择不同的模式:
构造方式 | 行为说明 |
---|---|
std::unique_lock<std::mutex> lock(m); | 构造时立即加锁(默认行为) |
std::unique_lock<std::mutex> lock(m, std::defer_lock); | 构造时不加锁,需手动调用 .lock() |
std::unique_lock<std::mutex> lock(m, std::adopt_lock); | 构造时接管已加锁的互斥量,不再加锁 |
示例:使用 std::unique_lock
和 std::defer_lock
实现安全交换
相比前文代码中使用 std::lock_guard
+ std::adopt_lock
的写法,以下是其等价写法 —— 使用 std::unique_lock
和 std::defer_lock
实现更清晰灵活的加锁过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X {
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) : some_detail(sd) {}
friend void swap(X& lhs, X& rhs) {
if (&lhs == &rhs)
return;
// 1. 创建延迟加锁的 unique_lock
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
// 2. 同时锁住两个互斥量,避免死锁
std::lock(lock_a, lock_b);
// 3. 执行实际的交换操作
swap(lhs.some_detail, rhs.some_detail);
}
};
std::defer_lock
:创建unique_lock
对象但不立即加锁;std::lock(lock_a, lock_b)
:使用 C++ 提供的原子锁定多个互斥量函数,防止死锁;- 析构时自动释放锁:只要
unique_lock
对象生命周期结束,所管理的锁也将自动释放; - 灵活性更强:可随时调用
.unlock()
或.lock()
,甚至将锁对象传递出去。
所有权与状态管理
std::unique_lock
内部维护一个是否持有锁的标志,用于确定是否需要在析构时调用unlock()
;- 可以通过
.owns_lock()
成员函数检查当前是否持有锁; - 更适合需要“锁的生命周期不等于作用域生命周期”的复杂控制场景。
小结
使用场景 | 建议锁类型 |
---|---|
简单作用域自动加锁/释放 | std::lock_guard (性能好,体积小) |
需要灵活控制锁(如延迟加锁、提前解锁、锁迁移) | std::unique_lock |
多个互斥量同时加锁且自动处理死锁问题 | std::scoped_lock (C++17 新增) |
std::unique_lock
是在追求灵活性时的首选,但在性能敏感、结构简单的代码中,std::lock_guard
是更高效的选择。
在作用域之间转移锁
std::unique_lock
支持移动语义,但不支持复制赋值。这意味着它的互斥量所有权可以在不同的实例间转移,但不能被复制。
移动所有权的机制
- 当一个
std::unique_lock
对象被返回或移动构造时,锁的所有权会从源对象转移到目标对象; - 如果希望显式地转移所有权,需要使用
std::move()
; - 编译器在函数返回时自动调用移动构造函数,无需手动写
std::move()
,但如果是传参或赋值则通常需要显式std::move()
; - 移动后,源对象不再拥有互斥量,也不会在析构时解锁。
典型应用示例
函数锁住互斥量后返回 std::unique_lock
,调用者接管锁的所有权,进而在安全的锁保护范围内执行操作。
1
2
3
4
5
6
7
8
9
10
11
std::unique_lock<std::mutex> get_lock() {
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex); // 上锁
prepare_data(); // 处理数据
return lk; // ① 返回时自动调用移动构造,转移锁的所有权
}
void process_data() {
std::unique_lock<std::mutex> lk(get_lock()); // ② 接管锁的所有权
do_something(); // 在锁保护下操作数据
}
- 在
get_lock()
中,lk
是局部变量,返回时自动移动; process_data()
中通过初始化lk
,接管锁的所有权,保证后续操作的数据安全。
应用场景
这种模式常见于“网关类”设计:
- 网关类负责管理对受保护数据的访问,保证数据同步安全;
- 调用者必须先从网关类获取锁(例如调用类似
get_lock()
的函数); - 在持有锁的范围内访问数据,结束时销毁锁对象释放锁;
- 网关类可设计为可移动,从而支持函数返回锁的所有权。
std::unique_lock
的灵活锁定与释放
除了所有权转移,std::unique_lock
还允许在生命周期内选择性释放锁:
- 可以调用成员函数
.unlock()
显式释放锁,但对象仍然存在; - 当不需要继续持有锁时,提前释放锁,缩短锁的持有时间,减少线程等待,提高性能;
- 析构函数只会在拥有锁时才调用
unlock()
,防止重复释放。
例如:
1
2
3
4
5
6
std::unique_lock<std::mutex> lk(some_mutex);
// 临界区操作...
if (some_condition) {
lk.unlock(); // 提前释放锁,允许其他线程进入临界区
}
// 后续代码不受锁保护
小结
std::unique_lock
支持锁的所有权在不同域(作用域)间移动,方便复杂场景下的锁管理;- 通过移动构造或返回,可以安全地传递锁所有权,无需复制;
- 可以灵活控制加锁和解锁,缩短持锁时间,提高并发性能;
- 在性能和灵活性需求之间提供了良好平衡。
锁的粒度控制
锁的粒度是一个“华而不实的术语”(hand-waving term),用来描述一个锁保护的数据量大小:
- 细粒度锁(fine-grained lock):保护较小的数据量;
- 粗粒度锁(coarse-grained lock):保护较大的数据量。
锁的粒度对性能影响很大,合理选择粒度对保证数据安全和程序效率都至关重要。
锁粒度的现实类比
想象超市结账场景:
- 顾客正在结账时突然发现忘拿蔓越莓酱,离开收银台去拿,让后面的人等待;
- 或者顾客结账时才去翻钱包拿钱,耽误时间。
这些情况都导致等待的人时间变长。
对应到多线程程序中:
- 如果一个线程持有锁的时间过长,其他线程只能等待,降低并发效率;
- 线程应尽量缩短持锁时间,避免在持锁期间做耗时操作,比如文件 I/O;
- 文件 I/O 通常比内存读写慢百倍甚至千倍,持锁期间做 I/O 会阻塞大量线程。
释放锁以缩短持锁时间的示例
std::unique_lock
支持显式解锁和再加锁,方便缩短锁的持有时间。
1
2
3
4
5
6
7
8
void get_and_process_data() {
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock(); // ① 在调用 process() 之前释放锁,避免锁跨越长时间处理
result_type result = process(data_to_process);
my_lock.lock(); // ② 处理完后重新加锁,写入结果
write_result(data_to_process, result);
}
- 这里避免了锁跨越
process()
调用,缩短了锁的持有时间,提高了并发效率。
锁粒度影响的后果
- 使用单个锁保护整个数据结构会增加锁竞争和持锁时间;
- 持有锁时间长,阻塞其他线程;
- 因此,细粒度锁有助于减少锁竞争,提高性能;
- 合理设计锁的持有时长和保护范围是优化多线程程序的关键。
例子:比较操作中一次只锁一个互斥量
假设要比较两个对象的数据成员,数据类型是简单的 int
,拷贝廉价,且每个对象有自己的互斥量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Y {
private:
int some_detail;
mutable std::mutex m;
int get_detail() const {
std::lock_guard<std::mutex> lock_a(m); // ① 加锁保护读取
return some_detail;
}
public:
Y(int sd) : some_detail(sd) {}
friend bool operator==(Y const& lhs, Y const& rhs) {
if (&lhs == &rhs)
return true;
int const lhs_value = lhs.get_detail(); // ② 加锁读取 lhs.some_detail
int const rhs_value = rhs.get_detail(); // ③ 加锁读取 rhs.some_detail
return lhs_value == rhs_value; // ④ 比较
}
};
- 每次读取时单独加锁,减少持锁时间,避免死锁风险;
- 但这里有个潜在语义问题:读取 lhs 和 rhs 之间,数据可能被其他线程修改;
- 所以比较操作反映的是两个瞬间的状态,可能导致条件竞争和不一致的比较结果。
细粒度锁的局限与挑战
- 有时难以找到合适的锁粒度,尤其当数据结构访问需求复杂时;
- 并非所有数据访问都适合同级别的锁保护;
- 可能需要使用更高级的同步机制替代简单的互斥量,比如读写锁、原子操作等。
总结
std::mutex
是 C++ 提供的基础互斥工具,需配合lock_guard
或unique_lock
使用;- 避免传出引用或指针;
- 接口设计比实现更容易出错,关注原子性;
- 死锁可通过 lock()/scoped_lock 层级机制等方式避免;
- 粒度控制影响性能,持锁时间越短越好。