文章

互斥量保护共享数据

锁保护数据,尽量缩短持锁时间,避免锁内调用耗时操作或外部代码。合理控制锁粒度与顺序,配合 std::unique_lock 等工具,防止死锁并提升并发效率。

互斥量保护共享数据

互斥量保护共享数据

在并发环境下保护共享数据的核心方法之一,就是使用互斥量(mutex)。本文围绕 C++ 标准库中的 std::mutexstd::lock_guardstd::scoped_lockstd::unique_lock 等工具展开,涵盖互斥的基本操作、接口陷阱、死锁问题与避免策略、锁的粒度优化等方面内容,并配有完整示例代码。

互斥量与 std::lock_guard

std::lock_guard

最基本的做法是用 std::mutex 保护共享数据,利用 std::lock_guard 实现 RAII 风格的自动上锁和解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <list>        // 提供 std::list 容器
#include <mutex>       // 提供 std::mutex 和 std::lock_guard 用于线程同步
#include <algorithm>   // 提供 std::find 算法

// 一个线程共享的全局列表,用于存储整数
std::list<int> some_list;

// 一个全局互斥量,用于保护对 some_list 的并发访问
std::mutex some_mutex;

// 向列表中添加一个元素,使用互斥量进行线程安全保护
void add_to_list(int new_value) {
  std::lock_guard<std::mutex> guard(some_mutex); // 加锁,在作用域结束时自动解锁
  some_list.push_back(new_value);                // 安全地向列表尾部添加一个新值
}

// 检查某个值是否存在于列表中,线程安全
bool list_contains(int value_to_find) {
  std::lock_guard<std::mutex> guard(some_mutex); // 加锁,保护整个查找过程
  // 使用 std::find 在线性列表中查找目标值
  return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}

等价的传统写法:

1
2
3
4
std::mutex m;
m.lock();
// ...访问共享资源
m.unlock();

但如果中间抛出异常,就会忘记 unlock,可能造成死锁。

使用 lock_guard 更安全:

1
2
std::lock_guard<std::mutex> guard(m);
// ...访问共享资源,退出作用域自动 unlock

C++17 起,可以使用模板参数推导:

1
std::lock_guard guard(some_mutex); // 自动推导出是 std::mutex

std::scoped_lock

或更进一步使用 std::scoped_lock

1
std::scoped_lock guard(some_mutex);

这是 C++17 引入的一种std::lock_guard 更强大的 RAII 锁

  • std::lock_guard 类似,会在构造时自动加锁,在析构时自动解锁;
  • 但它支持同时锁多个互斥量,并且内部使用 std::lock(),可以避免死锁。

用法示例:

1
2
std::mutex m1, m2;
std::scoped_lock lock(m1, m2); // 同时加锁多个互斥量,顺序无死锁

和下面这段写法等价,但更安全、简洁:

1
2
3
4
5
6
std::mutex m1, m2;

std::lock(m1, m2); // 同时锁多个互斥量,顺序自动避免死锁

std::lock_guard<std::mutex> lock1(m1, std::adopt_lock); // 告诉 lock_guard:我已经锁好了
std::lock_guard<std::mutex> lock2(m2, std::adopt_lock);

小心保护数据的“假象”

即使使用了互斥锁,也可能在间接暴露数据引用时破坏线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <string>
#include <mutex>

// 一个简单的数据类
class some_data {
  int a;
  std::string b;
public:
  void do_something();  // 假设这个函数会修改 a 和 b
};

// 包装器类,封装了 some_data 与一个互斥量
class data_wrapper {
private:
  some_data data;       // 被保护的数据
  std::mutex m;         // 互斥量,用于保护 data 的访问
public:
  // 模板函数:接受一个函数 func,并将 data 传给它执行
  // 错误点:虽然加了锁,但把 data 的引用暴露出去了
  template<typename Function>
  void process_data(Function func) {
    std::lock_guard<std::mutex> l(m);  // 上锁,保护 data
    func(data);                        // ⚠️ 把 data 的引用传出去,可能被保存
  }
};

// 一个全局变量,用来“偷走”对 data 的引用(模拟恶意行为)
some_data* unprotected;

// 恶意函数,拿到 protected_data 的引用,并保存起来
void malicious_function(some_data& protected_data) {
  unprotected = &protected_data;  // ⚠️ 保存引用,等锁释放后仍能访问
}

// 调用者调用 process_data 时传入恶意函数
void foo() {
  data_wrapper x;

  // 在 process_data 内部锁住了 data,并调用 malicious_function
  // malicious_function 偷偷保存了 data 的地址
  x.process_data(malicious_function);

  // ⚠️ 现在锁已经释放,但我们还持有 data 的裸指针
  // 无锁访问受保护的数据 -> 条件竞争/未定义行为
  unprotected->do_something();  // ❌ 非线程安全操作
}

解决方法:绝不返回指针/引用不将保护数据传出锁作用域外

改法一:不允许传出引用,仅提供封装接口

如果只需要在类内部访问数据,而不需要暴露数据结构,可以将数据访问逻辑也封装在类内部:

1
2
3
4
5
6
7
8
9
10
11
class data_wrapper {
private:
    some_data data;          // 被保护的数据,只能通过类访问
    std::mutex m;            // 用于保护 data 的互斥量

public:
    void safe_do_something() {
        std::lock_guard<std::mutex> lock(m);  // 加锁
        data.do_something();                  // 在加锁范围内安全访问/修改 data
    }
};
  • data 是私有成员,外部代码无法直接访问它;
  • 所有访问 data 的操作都必须通过类提供的接口;

调用者只能这样使用:

1
2
data_wrapper x;
x.safe_do_something(); // 安全,不能越权访问 data

改法二:传值(拷贝)而不是传引用

如果确实需要执行用户函数,可以传入一个 数据的拷贝,避免将原始引用泄露:

1
2
3
4
5
6
template<typename Function>
void process_data(Function func) {
    std::lock_guard<std::mutex> lock(m);
    some_data copy = data; // 复制 data
    func(copy);            // 给用户的只是拷贝
}
  • 用户函数 func 操作的是 data 的副本 copy
  • 用户无法修改原始的 data(即使 func 修改了 copy,原始 data 也不会变);
  • 线程安全,但不可写(只读场景适用)。

这样,即使 func 试图持久化这个副本,也不会影响原始的受保护数据。

改法三:传闭包,仅在锁保护范围内执行

如果要执行的操作很简单(如只修改 data 某些字段),可以让 process_data() 来控制执行过程,而不是传整个对象:

1
2
3
4
5
template<typename Function>
void process_data(Function func) {
    std::lock_guard<std::mutex> lock(m);
    func(data); // func 不能保存引用,只在当前作用域内调用
}

然后用户保证不保留引用:

1
2
3
x.process_data([](some_data& d) {
    d.do_something(); // 不能返回引用、不能保留下来
});

注意:这种方式是有风险的,前提是你信任 func 不会偷偷保存引用。最保险的仍然是传值或封装接口

这个方法的用处不在于提升线程安全,而是方便调用者在锁保护范围内灵活执行操作,避免每次调用都要自己写锁,提高代码简洁性和可维护性。

最强防护(推荐):完全不让用户接触内部数据

1
2
3
4
5
6
7
8
9
10
class data_wrapper {
private:
    some_data data;
    std::mutex m;
public:
    void do_task() {
        std::lock_guard<std::mutex> lock(m);
        data.do_something(); // 只暴露受控操作
    }
};

接口设计中的条件竞争

使用互斥量或其他机制保护共享数据后,并不意味着就完全避免了条件竞争。仍需确认共享数据和整个操作是否被真正保护。

条件竞争示例:线程安全链表删除操作

例如,线程安全地删除双链表中某个节点,需要保证对该节点及其前后邻节点的访问都受到保护。

  • 如果仅仅保护指针的访问(例如对指针操作加锁),但没保护整个结构和操作,则条件竞争依旧存在,线程间可能产生竞态。
  • 最简单的解决方案是用一个互斥量保护整个链表,保证删除操作的原子性。

条件竞争在栈接口中的体现

以实现一个类似 std::stack 的线程安全栈为例。

除了构造函数和 swap(),它需要实现以下操作:

  • push():推入新元素
  • pop():弹出栈顶元素
  • top():查看栈顶元素
  • empty():判断是否为空
  • size():获取元素个数
典型条件竞争示例

尽管单个操作安全,但接口设计上的条件竞争依然存在:

1
2
3
4
5
6
stack<int> s;
if (!s.empty()) {      // 1. 检查是否为空
  int const value = s.top();   // 2. 获取栈顶元素
  s.pop();              // 3. 弹出栈顶元素
  do_something(value);
}
  • 在单线程环境下安全,也符合 top() 对空栈未定义行为的预期。
  • 但在多线程共享环境下,调用顺序不安全
    • empty()top() 之间,其他线程可能调用 pop() 弹出最后一个元素,导致 top() 访问空栈,出现条件竞争。
其他竞态示例

两个线程可能同时调用 top() 两次,没有任何线程在调用 pop() 期间修改栈,导致同一元素被重复处理:

线程 A线程 B
if (!s.empty()) 
 if (!s.empty())
int value = s.top(); 
 int value = s.top();
s.pop(); 
do_something(value);s.pop();
 do_something(value);

这类条件竞争更难发现。

异常安全问题

原始的 T pop() 写法
1
2
3
4
5
6
7
8
T pop() {
  std::lock_guard<std::mutex> lock(m);
  if (data.empty()) throw empty_stack();

  T value = data.top();  // ① 可能抛异常(拷贝构造)
  data.pop();            // ② 删除栈顶元素
  return value;          // ③ 返回时可能抛异常(移动/拷贝构造)
}
异常安全问题
  • T value = data.top(); 抛异常,data.pop() 不执行,数据仍在栈中,函数半成功,调用者不确定是否真的弹出了元素,造成“部分执行失败”,难排查。
  • return value; 构造返回值抛异常,但 data.pop() 已执行,数据丢失但调用者未成功获得值,极其危险。

这就是异常安全性破坏:异常导致数据丢失或状态不确定。

接口原子性问题

top()pop() 是两个独立操作:

1
2
3
4
if (!s.empty()) {
  auto val = s.top(); // 线程 A 看到非空
  s.pop();            // 线程 B 已经 pop 了 → top() 返回空元素或未定义行为
}

调用者无法保证“取值+弹出”这两个动作是原子的,多线程环境存在条件竞争。

为什么 std::stack 设计成 top() + pop() 而不是单独 pop() 返回值?

  • 强异常安全保证top() 抛异常时,数据仍在栈中未被弹出,允许调用者在异常恢复后重试。
  • 缺点:多线程环境下存在条件竞争,因为“读取+删除”不是原子操作。

解决接口条件竞争的几种选项

传入引用参数获取弹出值
1
2
3
4
5
6
7
void pop(T& out) {
  std::lock_guard<std::mutex> lock(m);
  if (data.empty()) throw empty_stack();

  out = data.top();  // 先拷贝
  data.pop();        // 拷贝成功后再弹出
}
  • 优点
    • 原子完成“取值 + 弹出”操作;
    • 异常安全,拷贝失败时栈不变。
  • 缺点
    • 要求提前创建变量;
    • 如果 T 不支持赋值或构造代价大,使用受限。

为什么 out = data.top(); data.pop(); 更安全? 因为异常只可能发生在明确写出的赋值那一步,pop() 后不会执行,状态清晰。而 T pop() 返回值形式的异常可能在无法控制的返回值构造阶段抛出,导致状态不明确。

使用无异常抛出的拷贝/移动构造
1
static_assert(std::is_nothrow_move_constructible<T>::value, "T must be nothrow move constructible");
  • 编译时保证安全,允许返回值方式实现 pop()
  • 缺点:限制过死,不适用于大多数用户自定义类型。
返回指向弹出值的智能指针
1
std::shared_ptr<T> pop();
  • 优点
    • 线程安全;
    • 异常安全(make_shared 失败会抛异常,数据不丢失);
    • 不要求 T 支持赋值。
  • 缺点
    • 堆分配开销;
    • 对简单类型显得重。
组合使用
1
2
std::shared_ptr<T> pop(); // 推荐主接口
void pop(T& out);         // 高性能版
  • 用户可根据需求选择性能或安全性。

线程安全栈示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <exception>
#include <memory>
#include <mutex>
#include <stack>

// 自定义异常类型,表示栈为空时弹出失败
struct empty_stack : std::exception {
  const char* what() const throw() override {
    return "empty stack!";
  }
};

template<typename T>
class threadsafe_stack {
private:
  std::stack<T> data;        // 底层存储容器,非线程安全的标准栈
  mutable std::mutex m;      // 保护 data 的互斥量,mutable 允许 const 方法加锁

public:
  // 默认构造函数,初始化空栈
  threadsafe_stack() : data(std::stack<T>()) {}

  // 拷贝构造函数,线程安全地复制另一个栈的数据
  threadsafe_stack(const threadsafe_stack& other) {
    std::lock_guard<std::mutex> lock(other.m);  // 锁住 other 的互斥量,防止数据被修改
    data = other.data;                           // 安全地复制数据
  }

  // 禁止赋值操作,防止线程安全问题和资源冲突
  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  // 向栈中压入一个新元素,传值参数,支持移动语义
  void push(T new_value) {
    std::lock_guard<std::mutex> lock(m);        // 加锁保护栈数据
    data.push(std::move(new_value));             // 利用移动构造入栈,避免不必要拷贝
  }

  // 弹出栈顶元素,返回智能指针指向该元素
  std::shared_ptr<T> pop() {
    std::lock_guard<std::mutex> lock(m);        // 加锁保护
    if (data.empty())                            // 空栈抛异常
      throw empty_stack();

    // 先拷贝栈顶元素到堆中,构造 shared_ptr,保证异常安全
    std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
    data.pop();                                 // 弹出栈顶元素
    return res;                                 // 返回拷贝的智能指针
  }

  // 弹出栈顶元素,通过引用参数传出
  void pop(T& value) {
    std::lock_guard<std::mutex> lock(m);        // 加锁保护
    if (data.empty())                            // 空栈抛异常
      throw empty_stack();

    value = data.top();                          // 先拷贝栈顶元素
    data.pop();                                 // 成功拷贝后弹出栈顶元素
  }

  // 判断栈是否为空,线程安全
  bool empty() const {
    std::lock_guard<std::mutex> lock(m);        // 加锁保护
    return data.empty();
  }
};

这段代码核心思想:

  • 使用互斥量保证每个操作线程安全;
  • 提供两种 pop 版本:一种返回智能指针,方便安全管理弹出元素;另一种通过引用参数传出,避免堆分配,提升性能;
  • 拷贝构造函数也保证线程安全地复制数据;
  • 赋值操作被禁用,防止复杂的线程安全问题。

互斥量的粒度及死锁

  • 大粒度锁(例如一个全局互斥量保护所有共享数据)简单,但阻塞并发性能,容易成为瓶颈。
  • Linux 内核曾使用全局锁,导致多核性能低,后改进为细粒度锁。
  • 细粒度锁带来更高并发,但需要小心避免死锁,尤其当一个操作需要同时持有多个锁时。
  • 死锁:线程互相等待资源,导致无法继续执行,是与条件竞争相反的线程安全问题。

小结

  • 互斥量只能保证单个操作的原子性和线程安全,但接口设计必须避免条件竞争。
  • 设计线程安全接口时,减少多操作调用时的竞态窗口,保证操作整体的原子性。
  • 细粒度锁提高并发性能,但增加死锁风险,需要权衡。
  • 设计线程安全容器时,需综合考虑性能、安全性和接口的易用性。

死锁的解决方案:std::lock + adopt_lock

死锁问题描述

想象一个玩具由两部分组成,比如玩具鼓,需要鼓锤和鼓才能玩。有两个小孩同时想玩这个玩具。鼓和鼓锤分别放在两个不同的玩具箱里。两个小孩同时去拿玩具箱中的部分:

  • 小孩 A 找到了鼓,
  • 小孩 B 找到了鼓锤。

此时双方都手握玩具的一部分,等待对方释放另一部分才能开始玩。如果两个小孩都不让步,就会发生死锁——两个人都在等待对方,谁也没法玩鼓。

在多线程中也类似,两个或多个线程各自锁住了一个互斥量,并等待对方释放另一个锁,导致所有线程都阻塞,无法继续执行。

避免死锁的一般原则

避免死锁的一个常用建议是:

让所有互斥量都按照相同的顺序加锁

例如,总是在锁 B 之前锁 A,这样就不会发生循环等待,死锁的情况可以避免。

但实际情况复杂,当多个互斥量保护同一类的不同实例时,交换操作可能涉及两个不同实例的锁。比如两个线程分别交换两个对象的数据,如果它们锁的顺序不统一,就依然可能死锁。

C++ 标准库中的解决方案:std::lock

C++ 提供了std::lock,它可以一次性锁住多个(两个或以上)互斥量,且没有死锁风险。

以下是一个示例,展示如何使用std::lockstd::lock_guard实现交换操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <mutex>

class some_big_object; // 假设已有实现
void swap(some_big_object& lhs, some_big_object& rhs); // 假设已有实现

class X {
private:
    some_big_object some_detail;
    std::mutex m;

public:
    X(some_big_object const& sd) : some_detail(sd) {}

    friend void swap(X& lhs, X& rhs)     {
        if (&lhs == &rhs)
            return;
        std::lock(lhs.m, rhs.m); // 1. 一次性锁住两个互斥量
        std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2. 管理锁的生命周期,表示锁已被拿到
        std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
        swap(lhs.some_detail, rhs.some_detail); // 执行实际数据交换
    }
};
  • std::lock(lhs.m, rhs.m) 会一次性锁住两个互斥量,避免死锁。
  • std::lock_guard 的构造参数传入 std::adopt_lock,表示这把锁已经被std::lock拿到了,lock_guard 只负责管理生命周期,不再尝试上锁。
  • 如果 std::lock 在锁其中一个互斥量时失败抛异常,会自动释放已锁的互斥量,保证异常安全。

C++17 的改进:std::scoped_lock

C++17 引入了 std::scoped_lock,它结合了 std::lockstd::lock_guard 的功能,可以直接接收多个互斥量,自动安全地上锁和解锁,代码更简洁。

上述的 swap 函数可以简化为:

1
2
3
4
5
6
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs)
        return;
    std::scoped_lock guard(lhs.m, rhs.m); // 一次性锁住所有互斥量
    swap(lhs.some_detail, rhs.some_detail);
}
  • 这里利用了 C++17 的模板参数自动推导机制,不用显式写出模板参数类型。
  • std::scoped_lock 析构时自动解锁,保证异常安全。
  • 它能接受任意数量的互斥量,替代 std::lock + 多个 lock_guard 的写法。

等价写法(显式模板参数):

1
std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);

避免死锁的进阶指导

死锁多因对锁的不当使用造成,常见于多个线程相互等待资源时陷入阻塞,导致系统无法继续运行。

死锁示例:两个线程互等

即便没有显式锁,如下代码中两个线程互相 join() 也会导致死锁:

1
2
3
4
5
6
std::thread t1([&] {
    t2.join();  // 等待 t2
});
std::thread t2([&] {
    t1.join();  // 等待 t1
});

死锁的本质是循环等待。为避免死锁,建议:不要谦让,主动规避风险。

避免死锁的常见策略

避免嵌套锁

建议线程一次只持有一个锁。若必须获取多个锁,使用:

1
2
3
std::lock(m1, m2);  // 一次性加锁多个互斥量
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);

或使用 C++17 的 std::scoped_lock

1
std::scoped_lock lock(m1, m2);  // 自动处理所有互斥量
避免在持锁时调用外部代码

外部代码可能也尝试加锁,容易破坏锁顺序引发死锁。例如:

1
2
3
4
5
template <typename Func>
void process_data(Func func) {
    std::lock_guard<std::mutex> lock(m);
    func(data);  // 调用外部代码,风险!
}
固定顺序加锁

必须加多个锁时,确保所有线程按照统一顺序加锁。例如对链表:

  • 删除节点需锁定:当前节点 + 两个邻接节点。
  • 遍历链表时应以“手递手”方式加锁,并避免逆序访问

错误写法(会导致死锁):

1
2
std::lock_guard<std::mutex> lock1(nodeA->m); // A -> B
std::lock_guard<std::mutex> lock2(nodeB->m);
1
2
std::lock_guard<std::mutex> lock1(nodeB->m); // B -> A
std::lock_guard<std::mutex> lock2(nodeA->m);

多个线程以不同顺序加锁,一旦交叉,会互相等待,最终死锁。

死锁示意图(链表节点 A-B-C):

线程1线程2
锁住节点 A 的互斥量锁住节点 C 的互斥量
试图锁 B(失败)试图锁 B(失败)
等待 B等待 A
死锁! 

解决方案:规定遍历方向与锁顺序一致,避免交叉。

正确写法:按地址顺序加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void lock_two_nodes_safe(Node* a, Node* b) {
    if (a == b) {
        a->m.lock(); // 自锁即可
        return;
    }

    if (a < b) {
        a->m.lock();
        b->m.lock();
    } else {
        b->m.lock();
        a->m.lock();
    }
}

void unlock_two_nodes(Node* a, Node* b) {
    a->m.unlock();
    if (b != a)
        b->m.unlock();
}

或者用 std::unique_lock 自动解锁:

1
2
3
4
5
6
7
8
std::unique_lock<std::mutex> lock1, lock2;
if (nodeA < nodeB) {
    lock1 = std::unique_lock<std::mutex>(nodeA->m);
    lock2 = std::unique_lock<std::mutex>(nodeB->m);
} else {
    lock1 = std::unique_lock<std::mutex>(nodeB->m);
    lock2 = std::unique_lock<std::mutex>(nodeA->m);
}

关键点:以节点地址排序为依据统一加锁顺序,确保所有线程按相同规则执行,从根本上避免死锁

使用层次锁结构(hierarchical_mutex)

为每个互斥量设定一个层级值,要求:高层锁先加,低层锁后加。若违反此约定,则抛出异常,防止死锁发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 定义三个互斥量,每个带有一个“层级值”。
// 层级值越高,表示越高层,必须先加高层锁,再加低层锁。
hierarchical_mutex high_level_mutex(10000); // 高层互斥量
hierarchical_mutex low_level_mutex(5000);   // 低层互斥量
hierarchical_mutex other_mutex(6000);       // 中间层互斥量

int do_low_level_stuff();

// 封装低层函数,持有 low_level_mutex(层级 5000)
int low_level_func() {
    std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // ✅ 当前线程层级从 ULONG_MAX -> 5000
    return do_low_level_stuff(); // 假设这里不再持锁
}

void high_level_stuff(int some_param);

// 高层函数,先获取 high_level_mutex(层级 10000),然后调用低层函数
void high_level_func() {
    std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // ✅ 当前线程层级从 ULONG_MAX -> 10000
    high_level_stuff(low_level_func()); // ✅ low_level_mutex 层级 5000,小于 10000,合法
}

// 线程 A,调用高层逻辑
void thread_a() {
    high_level_func(); // ✅ 没有违反层级规则,运行正常
}

void do_other_stuff();

// 中间函数,先调用 high_level_func,再调用其他函数
void other_stuff() {
    high_level_func();  // ⚠️ 问题在这里:调用者必须当前线程未持有其他低层互斥量!
    do_other_stuff();
}

// 线程 B,先锁定中层互斥量,再调用 high_level_func
void thread_b() {
    std::lock_guard<hierarchical_mutex> lk(other_mutex); // 🔒 当前线程层级从 ULONG_MAX -> 6000
    other_stuff(); // ⚠️ 违反层级规则:持有 6000 后,再尝试加锁 10000(更高层)互斥量,抛出异常
}
  • thread_a 正确:先加高层再加低层。
  • thread_b 错误:先加中层再加高层,违反锁顺序,运行时报错!

层次互斥量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class hierarchical_mutex {
    std::mutex internal_mutex;  // 实际用于加锁的 std::mutex

    unsigned long const hierarchy_value;       // 当前互斥量的层级值,越大表示越“上层”
    unsigned long previous_hierarchy_value;    // 当前线程加锁之前的层级值(用于恢复)

    // 每个线程独有的当前锁层级,初始化为 ULONG_MAX(最高)
    static thread_local unsigned long this_thread_hierarchy_value;

    // 检查是否违反了层级规则
    void check_for_hierarchy_violation() {
        // 如果当前线程持有的层级值 <= 将要加的锁的层级,说明违反了“高层先锁、低层后锁”的规则
        if (this_thread_hierarchy_value <= hierarchy_value)
            throw std::logic_error("mutex hierarchy violated");
    }

    // 更新当前线程的层级状态
    void update_hierarchy_value() {
        // 记录当前线程之前的层级值
        previous_hierarchy_value = this_thread_hierarchy_value;
        // 更新为当前加锁的互斥量的层级值
        this_thread_hierarchy_value = hierarchy_value;
    }

public:
    // 构造函数:设置层级值
    explicit hierarchical_mutex(unsigned long value)
        : hierarchy_value(value), previous_hierarchy_value(0) {}

    // 加锁函数
    void lock() {
        check_for_hierarchy_violation();   // 检查层级是否符合规范
        internal_mutex.lock();             // 加内部锁
        update_hierarchy_value();          // 更新线程的层级值
    }

    // 解锁函数
    void unlock() {
        // 检查当前线程的层级值是否匹配当前互斥量的层级值
        // 如果不匹配说明 unlock 顺序错误(不是最后加的那个锁)
        if (this_thread_hierarchy_value != hierarchy_value)
            throw std::logic_error("mutex hierarchy violated");
        // 恢复线程先前的层级值
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();           // 解锁内部互斥量
    }

    // 尝试加锁函数(不会阻塞)
    bool try_lock() {
        check_for_hierarchy_violation();   // 层级检查
        if (!internal_mutex.try_lock())    // 尝试加锁失败,返回 false
            return false;
        update_hierarchy_value();          // 成功后更新线程层级状态
        return true;
    }
};

// 定义线程局部变量:每个线程都维护一份自己的当前层级值,初始值为最大
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
  • 每个线程维护一个 this_thread_hierarchy_value
  • 上锁前做层级检查,确保符合顺序。
  • 解锁后恢复先前层级,避免嵌套混乱。
  • 使用标准库 std::lock_guard<hierarchical_mutex> 即可无缝集成。

超越锁的扩展建议

死锁也可能出现在如下情况:

  • join() 等待某个线程结束(而该线程还在等待锁)
  • 条件变量造成的环等待

建议:

  • 线程间有层级顺序,如:高层线程只能等待低层线程。
  • 在同一函数内创建并等待线程,有助于规避复杂依赖。

避免死锁的六大建议

建议说明
避免嵌套锁一个线程一次只持有一个锁
使用 std::lock()一次性加锁多个互斥量,避免交叉等待
避免持锁调用外部函数外部行为不可控,可能造成死锁
统一加锁顺序所有线程按照固定顺序获取锁
使用层次锁结构强制锁顺序,运行时检测违例
避免线程间循环等待线程间应有清晰的层级关系

std::unique_lock:更灵活的 RAII 锁

std::unique_lock 是一个比 std::lock_guard 更加灵活的互斥量封装器。它提供了更丰富的操作,例如延迟加锁、提前解锁、尝试加锁等功能,适用于需要精细控制互斥行为的场景。

特性概览

  • 灵活性更高:可以选择在构造时不立即加锁,或者将已有的锁状态接管;
  • 支持锁操作:支持 .lock().try_lock().unlock() 成员函数;
  • 可转移所有权:支持移动构造/赋值,允许将锁的所有权从一个作用域传递到另一个;
  • 占用空间较大,性能略低:相比 std::lock_guard,体积更大,操作稍慢,但换来的是更大的灵活性。

使用方式

构造 std::unique_lock 时,可以选择不同的模式:

构造方式行为说明
std::unique_lock<std::mutex> lock(m);构造时立即加锁(默认行为)
std::unique_lock<std::mutex> lock(m, std::defer_lock);构造时不加锁,需手动调用 .lock()
std::unique_lock<std::mutex> lock(m, std::adopt_lock);构造时接管已加锁的互斥量,不再加锁

示例:使用 std::unique_lockstd::defer_lock 实现安全交换

相比前文代码中使用 std::lock_guard + std::adopt_lock 的写法,以下是其等价写法 —— 使用 std::unique_lockstd::defer_lock 实现更清晰灵活的加锁过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class some_big_object;

void swap(some_big_object& lhs, some_big_object& rhs);

class X {
private:
  some_big_object some_detail;
  std::mutex m;

public:
  X(some_big_object const& sd) : some_detail(sd) {}

  friend void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs)
      return;

    // 1. 创建延迟加锁的 unique_lock
    std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
    std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);

    // 2. 同时锁住两个互斥量,避免死锁
    std::lock(lock_a, lock_b);

    // 3. 执行实际的交换操作
    swap(lhs.some_detail, rhs.some_detail);
  }
};
  • std::defer_lock:创建 unique_lock 对象但不立即加锁;
  • std::lock(lock_a, lock_b):使用 C++ 提供的原子锁定多个互斥量函数,防止死锁;
  • 析构时自动释放锁:只要 unique_lock 对象生命周期结束,所管理的锁也将自动释放;
  • 灵活性更强:可随时调用 .unlock().lock(),甚至将锁对象传递出去。

所有权与状态管理

  • std::unique_lock 内部维护一个是否持有锁的标志,用于确定是否需要在析构时调用 unlock()
  • 可以通过 .owns_lock() 成员函数检查当前是否持有锁;
  • 更适合需要“锁的生命周期不等于作用域生命周期”的复杂控制场景。

小结

使用场景建议锁类型
简单作用域自动加锁/释放std::lock_guard(性能好,体积小)
需要灵活控制锁(如延迟加锁、提前解锁、锁迁移)std::unique_lock
多个互斥量同时加锁且自动处理死锁问题std::scoped_lock(C++17 新增)

std::unique_lock 是在追求灵活性时的首选,但在性能敏感、结构简单的代码中,std::lock_guard 是更高效的选择。

在作用域之间转移锁

std::unique_lock 支持移动语义,但不支持复制赋值。这意味着它的互斥量所有权可以在不同的实例间转移,但不能被复制。

移动所有权的机制

  • 当一个 std::unique_lock 对象被返回移动构造时,锁的所有权会从源对象转移到目标对象;
  • 如果希望显式地转移所有权,需要使用 std::move()
  • 编译器在函数返回时自动调用移动构造函数,无需手动写 std::move(),但如果是传参或赋值则通常需要显式 std::move()
  • 移动后,源对象不再拥有互斥量,也不会在析构时解锁。

典型应用示例

函数锁住互斥量后返回 std::unique_lock,调用者接管锁的所有权,进而在安全的锁保护范围内执行操作。

1
2
3
4
5
6
7
8
9
10
11
std::unique_lock<std::mutex> get_lock() {
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);  // 上锁
  prepare_data();                              // 处理数据
  return lk;  // ① 返回时自动调用移动构造,转移锁的所有权
}

void process_data() {
  std::unique_lock<std::mutex> lk(get_lock());  // ② 接管锁的所有权
  do_something();                               // 在锁保护下操作数据
}
  • get_lock() 中,lk 是局部变量,返回时自动移动;
  • process_data() 中通过初始化 lk,接管锁的所有权,保证后续操作的数据安全。

应用场景

这种模式常见于“网关类”设计:

  • 网关类负责管理对受保护数据的访问,保证数据同步安全;
  • 调用者必须先从网关类获取锁(例如调用类似 get_lock() 的函数);
  • 在持有锁的范围内访问数据,结束时销毁锁对象释放锁;
  • 网关类可设计为可移动,从而支持函数返回锁的所有权。

std::unique_lock 的灵活锁定与释放

除了所有权转移,std::unique_lock 还允许在生命周期内选择性释放锁

  • 可以调用成员函数 .unlock() 显式释放锁,但对象仍然存在;
  • 当不需要继续持有锁时,提前释放锁,缩短锁的持有时间,减少线程等待,提高性能;
  • 析构函数只会在拥有锁时才调用 unlock(),防止重复释放。

例如:

1
2
3
4
5
6
std::unique_lock<std::mutex> lk(some_mutex);
// 临界区操作...
if (some_condition) {
    lk.unlock();  // 提前释放锁,允许其他线程进入临界区
}
// 后续代码不受锁保护

小结

  • std::unique_lock 支持锁的所有权在不同域(作用域)间移动,方便复杂场景下的锁管理;
  • 通过移动构造或返回,可以安全地传递锁所有权,无需复制;
  • 可以灵活控制加锁和解锁,缩短持锁时间,提高并发性能;
  • 在性能和灵活性需求之间提供了良好平衡。

锁的粒度控制

锁的粒度是一个“华而不实的术语”(hand-waving term),用来描述一个锁保护的数据量大小:

  • 细粒度锁(fine-grained lock):保护较小的数据量;
  • 粗粒度锁(coarse-grained lock):保护较大的数据量。

锁的粒度对性能影响很大,合理选择粒度对保证数据安全和程序效率都至关重要。

锁粒度的现实类比

想象超市结账场景:

  • 顾客正在结账时突然发现忘拿蔓越莓酱,离开收银台去拿,让后面的人等待;
  • 或者顾客结账时才去翻钱包拿钱,耽误时间。

这些情况都导致等待的人时间变长。

对应到多线程程序中:

  • 如果一个线程持有锁的时间过长,其他线程只能等待,降低并发效率;
  • 线程应尽量缩短持锁时间,避免在持锁期间做耗时操作,比如文件 I/O;
  • 文件 I/O 通常比内存读写慢百倍甚至千倍,持锁期间做 I/O 会阻塞大量线程。

释放锁以缩短持锁时间的示例

std::unique_lock 支持显式解锁和再加锁,方便缩短锁的持有时间。

1
2
3
4
5
6
7
8
void get_and_process_data() {
  std::unique_lock<std::mutex> my_lock(the_mutex);
  some_class data_to_process = get_next_data_chunk();
  my_lock.unlock();  // ① 在调用 process() 之前释放锁,避免锁跨越长时间处理
  result_type result = process(data_to_process);
  my_lock.lock();    // ② 处理完后重新加锁,写入结果
  write_result(data_to_process, result);
}
  • 这里避免了锁跨越 process() 调用,缩短了锁的持有时间,提高了并发效率。

锁粒度影响的后果

  • 使用单个锁保护整个数据结构会增加锁竞争和持锁时间;
  • 持有锁时间长,阻塞其他线程;
  • 因此,细粒度锁有助于减少锁竞争,提高性能;
  • 合理设计锁的持有时长和保护范围是优化多线程程序的关键。

例子:比较操作中一次只锁一个互斥量

假设要比较两个对象的数据成员,数据类型是简单的 int,拷贝廉价,且每个对象有自己的互斥量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Y {
private:
  int some_detail;
  mutable std::mutex m;

  int get_detail() const   {
    std::lock_guard<std::mutex> lock_a(m);  // ① 加锁保护读取
    return some_detail;
  }

public:
  Y(int sd) : some_detail(sd) {}

  friend bool operator==(Y const& lhs, Y const& rhs)   {
    if (&lhs == &rhs)
      return true;
    int const lhs_value = lhs.get_detail();  // ② 加锁读取 lhs.some_detail
    int const rhs_value = rhs.get_detail();  // ③ 加锁读取 rhs.some_detail
    return lhs_value == rhs_value;            // ④ 比较
  }
};
  • 每次读取时单独加锁,减少持锁时间,避免死锁风险;
  • 但这里有个潜在语义问题:读取 lhs 和 rhs 之间,数据可能被其他线程修改;
  • 所以比较操作反映的是两个瞬间的状态,可能导致条件竞争和不一致的比较结果。

细粒度锁的局限与挑战

  • 有时难以找到合适的锁粒度,尤其当数据结构访问需求复杂时;
  • 并非所有数据访问都适合同级别的锁保护;
  • 可能需要使用更高级的同步机制替代简单的互斥量,比如读写锁、原子操作等。

总结

  • std::mutex 是 C++ 提供的基础互斥工具,需配合 lock_guardunique_lock 使用;
  • 避免传出引用或指针
  • 接口设计比实现更容易出错,关注原子性
  • 死锁可通过 lock()/scoped_lock 层级机制等方式避免
  • 粒度控制影响性能,持锁时间越短越好
本文由作者按照 CC BY 4.0 进行授权