线程等待与条件变量
线程等待通过条件变量实现高效同步,避免轮询和睡眠造成的CPU浪费与响应延迟。
线程等待与条件变量
线程等待与条件变量
想象下在夜间乘火车下车,有三种选择:
- 整夜保持清醒 → 持续检查,浪费资源。
- 设置闹钟 → 有延迟风险(误差/早醒/电池无电)。
- 司机到站通知你 → 最理想,对应线程条件变量。
这三种方案类比线程等待事件的方式:
- 持续轮询 + 加锁 会造成 CPU 消耗 + 锁竞争。
- 间歇休眠(
sleep_for
) 改善一点,但依旧不精确。 - 条件变量
std::condition_variable
提供高效等待/唤醒机制。
sleep + lock 的简单轮询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
bool flag = false;
std::mutex m;
void waiting_thread() {
while (true) {
m.lock();
if (flag) {
m.unlock();
std::cout << "Flag is set, proceeding...\n";
break;
}
m.unlock();
// 线程睡眠一段时间,避免持续占用CPU,但不是精确等待
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void setting_thread() {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟延迟
std::lock_guard<std::mutex> lg(m);
flag = true;
std::cout << "Flag set by setting_thread\n";
}
int main() {
std::thread t1(waiting_thread);
std::thread t2(setting_thread);
t1.join();
t2.join();
return 0;
}
waiting_thread
通过循环加锁检查共享变量flag
;- 如果没设置,解锁后调用
sleep_for(100ms)
睡眠,避免CPU过度占用; - 但这个“等待”是不精准的,因为最长可能延迟100ms后才发现
flag
变为true
; - 另外,这种不断轮询锁和解锁依然存在一定CPU浪费;
- 如果把睡眠时间缩短,CPU浪费会更严重;如果睡眠时间拉长,响应变慢。
用 sleep + lock
等待事件,虽然写法简单,但存在两大缺点:
- 响应不及时(不精准),影响程序实时性;
- 浪费 CPU 资源,降低性能。
使用条件变量等待条件达成
基本结构
1
2
3
4
5
6
7
8
// 互斥量,用于保护对共享数据 data_queue 的并发访问
std::mutex mut;
// 数据队列:生产者线程将数据压入队列,消费者线程从队列取出数据
std::queue<data_chunk> data_queue;
// 条件变量:用于线程间的等待/通知机制,等待队列非空时唤醒处理线程
std::condition_variable data_cond;
准备线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void data_preparation_thread() {
// 不断准备数据,直到没有更多数据需要处理
while (more_data_to_prepare()) {
// 准备一个数据块(假设是耗时操作)
data_chunk const data = prepare_data();
// 加锁以安全地访问共享队列
std::lock_guard<std::mutex> lk(mut);
// 将准备好的数据压入队列
data_queue.push(data);
// 通知一个正在等待条件变量的线程:队列中有新数据了
data_cond.notify_one();
}
}
std::lock_guard<std::mutex>
:在作用域内自动上锁并在退出时自动释放,确保异常安全。data_queue.push(data)
:是共享资源访问的关键点,必须保护。notify_one()
:仅唤醒一个等待线程,因为只压入了一个数据块,多线程时避免“惊群”。
处理线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void data_processing_thread() {
// 循环处理数据,直到遇到最后一个数据块
while (true) {
// 加锁互斥量以访问共享队列
std::unique_lock<std::mutex> lk(mut);
// 等待条件变量满足:队列非空时才继续执行
// 如果条件不满足,线程会释放互斥锁并进入阻塞,直到被唤醒
data_cond.wait(lk, [] { return !data_queue.empty(); });
// 获取队首数据块(这里已确保队列非空)
data_chunk data = data_queue.front();
// 弹出已读取的数据块
data_queue.pop();
// 处理数据前先释放锁,避免处理耗时操作时阻塞生产者线程
lk.unlock();
// 处理数据块(可能是耗时任务)
process(data);
// 如果处理的是最后一个数据块,则结束循环
if (is_last_chunk(data))
break;
}
}
为什么使用 std::unique_lock<std::mutex>
?
因为 std::condition_variable::wait()
需要能够:
- 在内部临时释放互斥锁(让生产者线程能获取锁);
- 等条件满足后再重新加锁;
而 std::lock_guard<std::mutex>
是轻量 RAII 锁,不支持 unlock() 和 lock() 操作(不可释放锁,只能随作用域自动释放),所以它不能用于 wait()。
换句话说,wait()
要求传入的锁对象是可控的、可解锁的,而 unique_lock
满足这个条件。
wait()
1
data_cond.wait(lk, [] { return !data_queue.empty(); });
等价于:
1
2
3
4
5
6
while (!data_queue.empty()) {
// 如果条件不满足:
// 1. 释放 lk 锁(允许其他线程进入临界区)
// 2. 让当前线程进入“等待状态”(挂起)
// 3. 被唤醒后,再次加锁,并重新检查条件
}
构建线程安全队列
将前面代码中的 push 和 wait_and_pop 提取出来,构建一个线程安全队列模板:
接口设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 线程安全的队列模板类,适用于多线程之间的数据通信
template<typename T>
class threadsafe_queue {
public:
// 默认构造函数
threadsafe_queue();
// 拷贝构造函数,允许从已有队列中构造一个新队列
threadsafe_queue(const threadsafe_queue&);
// 禁用拷贝赋值操作,防止多个队列对象共享内部资源(如互斥量)
threadsafe_queue& operator=(const threadsafe_queue&) = delete;
// 将元素压入队列,线程安全
void push(T new_value);
// 尝试从队列中弹出一个元素(非阻塞)
// 若成功,将值赋给 value 并返回 true;否则返回 false
bool try_pop(T& value);
// 尝试从队列中弹出一个元素(非阻塞)
// 若成功,返回包含值的 shared_ptr;否则返回空指针
std::shared_ptr<T> try_pop();
// 等待直到队列非空,弹出一个元素并赋值给 value(阻塞)
void wait_and_pop(T& value);
// 等待直到队列非空,弹出一个元素并通过 shared_ptr 返回(阻塞)
std::shared_ptr<T> wait_and_pop();
// 检查队列是否为空(线程安全)
bool empty() const;
};
- 一个线程不断生产数据
push()
; - 多个线程从中
wait_and_pop()
处理数据; - 不会出现竞态条件,确保线程安全。
实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
// 一个线程安全的队列模板类,适用于多线程生产者-消费者模型
template<typename T>
class threadsafe_queue {
private:
// 可变互斥量,用于保护共享数据 data_queue
// 必须声明为 mutable,以便在 const 函数中加锁
mutable std::mutex mut;
// 实际用于存储数据的队列
std::queue<T> data_queue;
// 条件变量,用于线程间同步(等待和通知)
std::condition_variable data_cond;
public:
// 默认构造函数
threadsafe_queue() {}
// 拷贝构造函数,使用互斥锁保护复制操作
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut); // 加锁保护
data_queue = other.data_queue; // 拷贝数据
}
// 将元素压入队列,并通知一个等待线程(如果有)
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut); // 加锁以访问共享队列
data_queue.push(std::move(new_value)); // 移动语义更高效
data_cond.notify_one(); // 通知一个等待线程
}
// 阻塞式弹出:等待直到队列非空,然后将元素赋值给引用参数
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut); // 使用 unique_lock 支持条件变量自动解锁
data_cond.wait(lk, [this] { return !data_queue.empty(); }); // 等待条件达成
value = data_queue.front(); // 获取队首元素
data_queue.pop(); // 弹出元素
}
// 阻塞式弹出:返回一个包含弹出元素的 shared_ptr
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] { return !data_queue.empty(); });
std::shared_ptr<T> res = std::make_shared<T>(data_queue.front());
data_queue.pop();
return res;
}
// 非阻塞尝试弹出:若队列非空,将元素赋值给引用参数并返回 true;否则返回 false
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
// 非阻塞尝试弹出:返回 shared_ptr,如果为空则返回空指针
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>(); // 返回空指针表示没有数据
std::shared_ptr<T> res = std::make_shared<T>(data_queue.front());
data_queue.pop();
return res;
}
// 查询队列是否为空,线程安全
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
成员函数 | 特点 | 功能说明 |
---|---|---|
push | 非阻塞 + 唤醒等待线程 | 压入新元素 |
wait_and_pop | 阻塞直到有元素可取 | 安全弹出 |
try_pop | 非阻塞,立即返回结果 | 安全尝试弹出 |
empty | 安全判断队列是否为空 | 状态检查 |
shared_ptr 返回 | 避免返回局部变量引用 | 更灵活的返回方式 |
使用引用参数版本 (bool try_pop(T& value)
):
- 调用时传入一个已有变量,函数内部赋值;
- 简单、直观,但要求调用者事先有变量准备接收数据;
- 适合数据类型拷贝或赋值开销不大的场景。
使用智能指针版本 (std::shared_ptr<T> try_pop()
):
- 返回堆上的智能指针,调用者可以直接使用返回值;
- 适合:
- 数据类型比较大,不希望拷贝或赋值;
- 需要传递对象所有权或延长生命周期的场景;
- 也适合返回“空”状态(用空指针表示无数据);
- 但会涉及堆分配和引用计数开销。
notify_one
vs notify_all
notify_one()
:唤醒一个等待线程(适用于生产者-消费者模式)。notify_all()
:唤醒所有线程(适用于初始化同步或广播事件)。
例如,多个线程等待某个共享数据初始化完毕,适合用 notify_all()
:
1
2
std::condition_variable cv;
cv.notify_all();
总结
技术 | 优点 | 缺点 |
---|---|---|
sleep + lock | 简单 | 不精准、CPU浪费 |
wait() 条件变量 | 高效、精准唤醒 | 实现略复杂,需要 Lambda 判断条件 |
threadsafe_queue | 结构清晰、模块化、线程安全 | 适合跨线程的数据传递封装 |
本文由作者按照 CC BY 4.0 进行授权