文章

线程等待与条件变量

线程等待通过条件变量实现高效同步,避免轮询和睡眠造成的CPU浪费与响应延迟。

线程等待与条件变量

线程等待与条件变量

想象下在夜间乘火车下车,有三种选择:

  1. 整夜保持清醒 → 持续检查,浪费资源。
  2. 设置闹钟 → 有延迟风险(误差/早醒/电池无电)。
  3. 司机到站通知你 → 最理想,对应线程条件变量。

这三种方案类比线程等待事件的方式:

  • 持续轮询 + 加锁 会造成 CPU 消耗 + 锁竞争。
  • 间歇休眠(sleep_for 改善一点,但依旧不精确。
  • 条件变量 std::condition_variable 提供高效等待/唤醒机制。

sleep + lock 的简单轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>

bool flag = false;
std::mutex m;

void waiting_thread() {
    while (true) {
        m.lock();
        if (flag) {
            m.unlock();
            std::cout << "Flag is set, proceeding...\n";
            break;
        }
        m.unlock();
        // 线程睡眠一段时间,避免持续占用CPU,但不是精确等待
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void setting_thread() {
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟延迟
    std::lock_guard<std::mutex> lg(m);
    flag = true;
    std::cout << "Flag set by setting_thread\n";
}

int main() {
    std::thread t1(waiting_thread);
    std::thread t2(setting_thread);

    t1.join();
    t2.join();

    return 0;
}
  • waiting_thread 通过循环加锁检查共享变量 flag
  • 如果没设置,解锁后调用 sleep_for(100ms) 睡眠,避免CPU过度占用;
  • 但这个“等待”是不精准的,因为最长可能延迟100ms后才发现 flag 变为 true
  • 另外,这种不断轮询锁和解锁依然存在一定CPU浪费;
  • 如果把睡眠时间缩短,CPU浪费会更严重;如果睡眠时间拉长,响应变慢。

sleep + lock 等待事件,虽然写法简单,但存在两大缺点:

  • 响应不及时(不精准),影响程序实时性;
  • 浪费 CPU 资源,降低性能。

使用条件变量等待条件达成

基本结构

1
2
3
4
5
6
7
8
// 互斥量,用于保护对共享数据 data_queue 的并发访问
std::mutex mut;

// 数据队列:生产者线程将数据压入队列,消费者线程从队列取出数据
std::queue<data_chunk> data_queue;

// 条件变量:用于线程间的等待/通知机制,等待队列非空时唤醒处理线程
std::condition_variable data_cond;

准备线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void data_preparation_thread() {
  // 不断准备数据,直到没有更多数据需要处理
  while (more_data_to_prepare()) {
    // 准备一个数据块(假设是耗时操作)
    data_chunk const data = prepare_data();

    // 加锁以安全地访问共享队列
    std::lock_guard<std::mutex> lk(mut);

    // 将准备好的数据压入队列
    data_queue.push(data);

    // 通知一个正在等待条件变量的线程:队列中有新数据了
    data_cond.notify_one();
  }
}
  • std::lock_guard<std::mutex>:在作用域内自动上锁并在退出时自动释放,确保异常安全。
  • data_queue.push(data):是共享资源访问的关键点,必须保护。
  • notify_one():仅唤醒一个等待线程,因为只压入了一个数据块,多线程时避免“惊群”。

处理线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void data_processing_thread() {
  // 循环处理数据,直到遇到最后一个数据块
  while (true) {
    // 加锁互斥量以访问共享队列
    std::unique_lock<std::mutex> lk(mut);

    // 等待条件变量满足:队列非空时才继续执行
    // 如果条件不满足,线程会释放互斥锁并进入阻塞,直到被唤醒
    data_cond.wait(lk, [] { return !data_queue.empty(); });

    // 获取队首数据块(这里已确保队列非空)
    data_chunk data = data_queue.front();

    // 弹出已读取的数据块
    data_queue.pop();

    // 处理数据前先释放锁,避免处理耗时操作时阻塞生产者线程
    lk.unlock();

    // 处理数据块(可能是耗时任务)
    process(data);

    // 如果处理的是最后一个数据块,则结束循环
    if (is_last_chunk(data))
      break;
  }
}
为什么使用 std::unique_lock<std::mutex>

因为 std::condition_variable::wait() 需要能够:

  1. 在内部临时释放互斥锁(让生产者线程能获取锁);
  2. 等条件满足后再重新加锁

std::lock_guard<std::mutex> 是轻量 RAII 锁,不支持 unlock() 和 lock() 操作(不可释放锁,只能随作用域自动释放),所以它不能用于 wait()

换句话说,wait() 要求传入的锁对象是可控的、可解锁的,而 unique_lock 满足这个条件。

wait()
1
data_cond.wait(lk, [] { return !data_queue.empty(); });

等价于:

1
2
3
4
5
6
while (!data_queue.empty()) {
    // 如果条件不满足:
    // 1. 释放 lk 锁(允许其他线程进入临界区)
    // 2. 让当前线程进入“等待状态”(挂起)
    // 3. 被唤醒后,再次加锁,并重新检查条件
}

构建线程安全队列

将前面代码中的 push 和 wait_and_pop 提取出来,构建一个线程安全队列模板:

接口设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 线程安全的队列模板类,适用于多线程之间的数据通信
template<typename T>
class threadsafe_queue {
public:
  // 默认构造函数
  threadsafe_queue();

  // 拷贝构造函数,允许从已有队列中构造一个新队列
  threadsafe_queue(const threadsafe_queue&);

  // 禁用拷贝赋值操作,防止多个队列对象共享内部资源(如互斥量)
  threadsafe_queue& operator=(const threadsafe_queue&) = delete;

  // 将元素压入队列,线程安全
  void push(T new_value);

  // 尝试从队列中弹出一个元素(非阻塞)
  // 若成功,将值赋给 value 并返回 true;否则返回 false
  bool try_pop(T& value);

  // 尝试从队列中弹出一个元素(非阻塞)
  // 若成功,返回包含值的 shared_ptr;否则返回空指针
  std::shared_ptr<T> try_pop();

  // 等待直到队列非空,弹出一个元素并赋值给 value(阻塞)
  void wait_and_pop(T& value);

  // 等待直到队列非空,弹出一个元素并通过 shared_ptr 返回(阻塞)
  std::shared_ptr<T> wait_and_pop();

  // 检查队列是否为空(线程安全)
  bool empty() const;
};
  • 一个线程不断生产数据 push()
  • 多个线程从中 wait_and_pop() 处理数据;
  • 不会出现竞态条件,确保线程安全。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

// 一个线程安全的队列模板类,适用于多线程生产者-消费者模型
template<typename T>
class threadsafe_queue {
private:
  // 可变互斥量,用于保护共享数据 data_queue
  // 必须声明为 mutable,以便在 const 函数中加锁
  mutable std::mutex mut;

  // 实际用于存储数据的队列
  std::queue<T> data_queue;

  // 条件变量,用于线程间同步(等待和通知)
  std::condition_variable data_cond;

public:
  // 默认构造函数
  threadsafe_queue() {}

  // 拷贝构造函数,使用互斥锁保护复制操作
  threadsafe_queue(threadsafe_queue const& other) {
    std::lock_guard<std::mutex> lk(other.mut);  // 加锁保护
    data_queue = other.data_queue;              // 拷贝数据
  }

  // 将元素压入队列,并通知一个等待线程(如果有)
  void push(T new_value) {
    std::lock_guard<std::mutex> lk(mut);            // 加锁以访问共享队列
    data_queue.push(std::move(new_value));          // 移动语义更高效
    data_cond.notify_one();                         // 通知一个等待线程
  }

  // 阻塞式弹出:等待直到队列非空,然后将元素赋值给引用参数
  void wait_and_pop(T& value) {
    std::unique_lock<std::mutex> lk(mut);           // 使用 unique_lock 支持条件变量自动解锁
    data_cond.wait(lk, [this] { return !data_queue.empty(); }); // 等待条件达成
    value = data_queue.front();                     // 获取队首元素
    data_queue.pop();                               // 弹出元素
  }

  // 阻塞式弹出:返回一个包含弹出元素的 shared_ptr
  std::shared_ptr<T> wait_and_pop() {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] { return !data_queue.empty(); });
    std::shared_ptr<T> res = std::make_shared<T>(data_queue.front());
    data_queue.pop();
    return res;
  }

  // 非阻塞尝试弹出:若队列非空,将元素赋值给引用参数并返回 true;否则返回 false
  bool try_pop(T& value) {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }

  // 非阻塞尝试弹出:返回 shared_ptr,如果为空则返回空指针
  std::shared_ptr<T> try_pop() {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();  // 返回空指针表示没有数据
    std::shared_ptr<T> res = std::make_shared<T>(data_queue.front());
    data_queue.pop();
    return res;
  }

  // 查询队列是否为空,线程安全
  bool empty() const {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};
成员函数特点功能说明
push非阻塞 + 唤醒等待线程压入新元素
wait_and_pop阻塞直到有元素可取安全弹出
try_pop非阻塞,立即返回结果安全尝试弹出
empty安全判断队列是否为空状态检查
shared_ptr 返回避免返回局部变量引用更灵活的返回方式
使用引用参数版本 (bool try_pop(T& value)):
  • 调用时传入一个已有变量,函数内部赋值;
  • 简单、直观,但要求调用者事先有变量准备接收数据;
  • 适合数据类型拷贝或赋值开销不大的场景。
使用智能指针版本 (std::shared_ptr<T> try_pop()):
  • 返回堆上的智能指针,调用者可以直接使用返回值;
  • 适合:
    • 数据类型比较大,不希望拷贝或赋值;
    • 需要传递对象所有权或延长生命周期的场景;
    • 也适合返回“空”状态(用空指针表示无数据);
  • 但会涉及堆分配和引用计数开销。

notify_one vs notify_all

  • notify_one():唤醒一个等待线程(适用于生产者-消费者模式)。
  • notify_all():唤醒所有线程(适用于初始化同步或广播事件)。

例如,多个线程等待某个共享数据初始化完毕,适合用 notify_all()

1
2
std::condition_variable cv;
cv.notify_all();

总结

技术优点缺点
sleep + lock简单不精准、CPU浪费
wait() 条件变量高效、精准唤醒实现略复杂,需要 Lambda 判断条件
threadsafe_queue结构清晰、模块化、线程安全适合跨线程的数据传递封装
本文由作者按照 CC BY 4.0 进行授权