简化代码
简化代码
使用 future 的函数化编程
函数化编程简介
函数化编程(Functional Programming,FP)是一种编程范式,强调函数的纯粹性。纯函数的特点是:
- 函数结果只依赖于传入的参数;
- 对相同的输入参数,多次调用函数都会得到相同的结果;
- 函数不会改变任何外部状态。
C++标准库中许多数学相关函数都符合这一特性,例如:sin
、cos
、sqrt
,以及基本类型之间的运算如 3+3
、6*9
、1.3/4.7
等。
函数化编程带来的好处之一是避免条件竞争问题。因为纯函数不修改共享数据,所以不存在数据竞争,也不需要互斥量保护数据。例如,Haskell 语言中所有函数默认都是纯函数。
C++作为多范型语言,也可以支持函数式编程风格。C++11引入的Lambda表达式、std::bind
、自动类型推断(auto
关键字)等特性,使得函数式编程在C++中变得更加简洁。future
作为异步任务的结果传递机制,为FP风格的并发编程提供了重要支持。
快速排序的函数化编程实现(串行版)
下面是快速排序算法的函数式串行实现。该函数接受一个std::list<T>
,返回一个排序后的新列表,与std::sort()
不同,后者是就地排序,没有返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input) {
if(input.empty()) {
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin()); // ① 将第一个元素作为基准(pivot)移动到结果列表
T const& pivot = *result.begin(); // ② 使用引用避免拷贝
// ③ 使用std::partition根据pivot划分列表,小于pivot的元素放前面
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part;
// ④ 将小于pivot的元素移动到lower_part列表
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// ⑤ 递归排序小于pivot部分
auto new_lower = sequential_quick_sort(std::move(lower_part));
// ⑥ 递归排序大于等于pivot部分
auto new_higher = sequential_quick_sort(std::move(input));
// ⑦ 结果拼接:先拼接大于部分
result.splice(result.end(), new_higher);
// ⑧ 再拼接小于部分
result.splice(result.begin(), new_lower);
return result;
}
说明:
splice()
操作避免了不必要的拷贝,通过转移链表节点实现高效移动。- 使用Lambda表达式定义
partition
的判定条件。 - 该实现的接口是函数式的,输入不被修改,输出为新的排序列表。
快速排序的函数化并发实现(并行版)
基于上述串行实现,我们可以使用std::async
并发执行排序任务,从而加速排序过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 并行版本的快速排序,使用 std::future 异步排序某部分数据
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
// 处理空列表的情况,直接返回
if(input.empty()) {
return input;
}
std::list<T> result;
// 将 input 的第一个元素(选作 pivot)“挪”到 result 的开头
// 这也是我们后面要比较和拼接的“基准值”
result.splice(result.begin(), input, input.begin());
// 取出刚转移过来的第一个元素作为 pivot
T const& pivot = *result.begin();
// 使用 partition 把 input 中的元素按 pivot 分成两部分:
// 小于 pivot 的放前面,其余放后面
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part;
// 将小于 pivot 的部分(从 input.begin() 到 divide_point)“挪”到 lower_part
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 使用 std::async 异步地对 lower_part 执行并行快速排序
// std::move(lower_part) 避免拷贝,提高效率
// 返回一个 future,稍后调用 .get() 获取排序结果
std::future<std::list<T>> new_lower(
std::async(¶llel_quick_sort<T>, std::move(lower_part))
);
// 当前线程继续递归排序剩下部分(input中大于等于 pivot 的部分)
auto new_higher = parallel_quick_sort(std::move(input));
// 拼接排序结果:
// 先拼接大于部分到 result 尾部(顺序是:[pivot] + new_higher)
result.splice(result.end(), new_higher);
// 再通过 future.get() 获取异步排序结果,拼接到 result 头部
// 最终顺序是:[new_lower] + [pivot] + [new_higher]
result.splice(result.begin(), new_lower.get());
// 返回排序后的结果
return result;
}
说明:
std::async
会为new_lower
启动一个新线程执行排序。- 通过递归调用
parallel_quick_sort
,可以利用多核硬件实现并行计算。 - 当递归层数增加时,线程数量可能急剧上升(例如递归10层可能产生1024个线程),运行库可能会自动控制线程的创建,以避免过度开销。
new_lower.get()
会阻塞当前线程,直到异步任务完成,取得结果。
spawn_task 函数示例
相比直接使用std::async
,有时需要更灵活的任务启动方式。下面示例是使用std::packaged_task
和std::thread
封装的简单任务启动函数spawn_task
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 通用异步任务封装器:将一个函数f及其参数a包装成一个异步任务并启动线程执行,返回 future 用于获取结果
template<typename F, typename A>
std::future<typename std::result_of<F(A&&)>::type>
spawn_task(F&& f, A&& a) {
// 推导函数 f(A&&) 的返回类型作为任务结果类型(C++11 使用 result_of,C++14 起建议用 invoke_result)
typedef typename std::result_of<F(A&&)>::type result_type;
// 创建一个 packaged_task:它封装了函数 f,可异步执行并将结果存入 future 中
std::packaged_task<result_type(A&&)> task(std::move(f));
// 获取 future,用于异步结果访问
std::future<result_type> res(task.get_future());
// 启动一个线程,执行任务 task,传入参数 a
// 注意 task 和 a 都要用 std::move 以避免拷贝,提高效率
std::thread t(std::move(task), std::move(a));
// 将线程分离,交由后台运行(无需调用 join)
t.detach();
// 返回 future,供调用者稍后使用 get() 获取结果
return res;
}
说明:
std::packaged_task
包装一个可调用对象,允许通过future
获取执行结果。std::thread
启动新线程执行任务,主线程返回future
。- 这种方式方便后续改造成线程池任务提交接口。
- 直接使用
std::async
更适合已知所有任务且需要自动管理线程生命周期的情况。
额外说明
std::partition
本身是串行的,若追求最快的并行排序,可以考虑C++17中并行算法支持(后续章节会介绍)。- 函数式编程也是并发编程的一个范式,类似于通讯顺序进程(CSP),线程间通过消息传递而不是共享状态。
- Erlang语言和MPI都是基于此范式的高性能并行编程实践。
- C++通过
future
和异步任务机制支持了这种并发编程风格。
使用消息传递的同步操作
CSP简介:无共享数据的线程模型
CSP(Communicating Sequential Processes) 是一种并发模型,其核心思想是:
- 线程之间不共享数据;
- 每个线程只通过接收“消息”进行状态转换;
- 每个线程可以被建模为一个有限状态机(FSM);
- 消息传递方式极大简化了并发程序的设计与维护。
C++ 线程本质上共享地址空间,并不天然支持 CSP,但通过规范设计(比如消息传递队列)可以模拟出类似行为。
示例场景:ATM 自动取款机
设计一个 ATM 系统,其功能包括:
- 接收卡片、显示信息、响应按钮;
- 与银行通信进行 PIN 验证与账户交易;
- 吐钞、退卡。
线程分工示意:
线程 | 功能 |
---|---|
硬件接口线程 | 处理按钮、吐钞、退卡等机械 |
ATM 逻辑线程 | 状态机控制、处理用户操作 |
银行通信线程 | 与银行后端交互 |
这些线程之间通过消息传递通信,互不共享数据。
ATM 状态机模型(简化)
ATM 逻辑类代码实现
ATM 状态机类结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct card_inserted {
std::string account; // 表示插卡信息
};
class atm {
messaging::receiver incoming; // 消息接收器(自己的消息队列)
messaging::sender bank; // 发给银行的消息发送器
messaging::sender interface_hardware; // 发给界面/设备的消息发送器
void (atm::*state)(); // 当前状态对应的成员函数指针
std::string account; // 当前账户
std::string pin; // 当前PIN码
void waiting_for_card() // 等待插卡状态
{
interface_hardware.send(display_enter_card()); // 显示“请插卡”提示
incoming.wait() // 等待接收到消息
.handle<card_inserted>( // 处理card_inserted类型消息
[&](card_inserted const& msg) {
account = msg.account; // 保存账户
pin = ""; // 清空PIN
interface_hardware.send(display_enter_pin()); // 提示输入PIN
state = &atm::getting_pin; // 状态切换为获取PIN
});
}
void getting_pin(); // 获取PIN的状态函数(稍后给出)
public:
void run() // 主循环,反复调用当前状态函数
{
state = &atm::waiting_for_card; // 初始状态为等待插卡
try {
for (;;) {
(this->*state)(); // 调用当前状态对应的成员函数
}
} catch (messaging::close_queue const&) {
// 队列关闭异常退出循环
}
}
};
ATM 的 PIN 处理状态函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void atm::getting_pin()
{
incoming.wait()
.handle<digit_pressed>( // ① 用户按下数字键
[&](digit_pressed const& msg) {
unsigned const pin_length = 4;
pin += msg.digit; // 累加PIN码
if(pin.length() == pin_length) {
bank.send(verify_pin(account, pin, incoming)); // 向银行验证PIN
state = &atm::verifying_pin; // 状态转移
}
})
.handle<clear_last_pressed>( // ② 用户清除上一个数字
[&](clear_last_pressed const& msg) {
if(!pin.empty()) {
pin.resize(pin.length() - 1); // 删除最后一个字符
}
})
.handle<cancel_pressed>( // ③ 用户取消操作
[&](cancel_pressed const& msg) {
state = &atm::done_processing; // 转到处理完成状态
});
}
编程模型总结:Actor 模式
本系统采用了参与者模型(Actor Model),特点如下:
特性 | 说明 |
---|---|
每个参与者 | 运行在独立线程 |
通过消息通信 | 而非共享数据 |
明确的状态转移逻辑 | 更易建模和测试 |
高并发友好 | 消除锁和竞争条件 |
运行过程回顾
1
2
atm a;
a.run(); // 进入状态机主循环
- 初始状态
waiting_for_card
等待消息; - 收到
card_inserted
后进入getting_pin
; - 在
getting_pin
状态中根据不同消息做出相应反应并继续转换状态; - 每次状态转移后,
run()
会继续执行当前状态函数。
小结
通过消息传递实现同步的编程模式(如 Actor/CSP)相比传统共享内存并发方式有如下优势:
- 消除了互斥、条件变量等同步原语的复杂性;
- 每个组件职责清晰、易于测试;
- 线程安全性天然由结构保证;
- 特别适合 GUI 事件处理、网络通信、分布式计算等场景。
这种模式对设计和编码者提出更高的架构能力要求,但对大规模并发系统具有天然优势。
扩展规范中的持续性并发
使用 std::experimental::future
和 then()
实现异步操作的持续性(continuation)
背景与动机
在标准 std::future
中:
- 获取异步结果时必须调用
get()
; - 获取前需等待其“就绪”,常用
wait()
/wait_for()
等阻塞手段; - 编写异步处理逻辑较为繁琐,代码可读性差。
为了解决这些问题,C++ 并发扩展规范(TS:Technical Specification) 引入了:
std::experimental::future
std::experimental::promise
then()
:可注册一个“持续性操作(continuation)”
什么是“持续性”?
定义:一旦 future
达到“就绪态”,立即触发 then()
中绑定的回调函数。
你无需手动等待、判断状态,异步结果准备好时,自动调用后续处理逻辑。
基本用法示例
1
2
3
4
5
6
std::experimental::future<int> find_the_answer;
auto fut = find_the_answer(); // 异步启动任务
auto fut2 = fut.then(find_the_question); // 注册持续性操作
assert(!fut.valid()); // fut 被“接管”,原始 future 无效
assert(fut2.valid()); // fut2 是 then() 的返回值,持有新的 future
注意事项:
fut.then()
会“接管”原来的 future,使原始的fut
无效;then()
返回一个新的 future,持有回调的返回值;- 回调函数的参数是:原始的
future<T>
对象; - 回调函数可以通过
.get()
拿到实际值,或处理异常。
示例:持续性函数签名
1
std::string find_the_question(std::experimental::future<int> the_answer);
如果 find_the_answer()
返回 int
,那么 then()
中的函数就应接受一个 future<int>
,以便显式处理值或异常。
为什么传入 future<T>
而不是值 T
?
这样可以让 回调函数自己决定如何处理异常,而不是由运行库自动抛出。
示例:
1
2
3
4
5
6
7
8
std::string find_the_question(std::experimental::future<int> the_answer) {
try {
int value = the_answer.get(); // 显式获取值
return "The question to " + std::to_string(value) + " is ...";
} catch (const std::exception& e) {
return "Failed to get the answer: " + std::string(e.what());
}
}
自定义 async 实现(等价于 std::async
)
使用 std::experimental::promise
实现一个简易的异步任务分发函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func) {
std::experimental::promise<decltype(std::declval<Func>()())> p;
// 获取对应 future
auto res = p.get_future();
// 启动线程执行任务
std::thread t(
[p = std::move(p), f = std::decay_t<Func>(func)]() mutable {
try {
// 设置值(线程退出时才传递,确保 thread_local 清理完)
p.set_value_at_thread_exit(f());
} catch (...) {
// 如果异常,捕获并存入 future
p.set_exception_at_thread_exit(std::current_exception());
}
});
t.detach(); // 分离线程
return res;
}
行为 | 实现方式 |
---|---|
返回 future | promise::get_future() |
在线程中执行函数 | std::thread + detach() |
捕获异常并传递到 future | set_exception_at_thread_exit() |
在 future 中设置返回值 | set_value_at_thread_exit() |
支持链式 then() 注册持续性操作 | std::experimental::future 提供支持 |
持续性链式调用(可组合)
由于 then()
返回的是新的 future
,我们可以进行 链式组合:
1
2
3
4
5
6
7
auto fut = spawn_async([] { return 42; })
.then([](auto fut) {
return fut.get() + 1;
})
.then([](auto fut) {
std::cout << "Answer is: " << fut.get() << std::endl;
});
小结
特性 | std::experimental::future 提供的能力 |
---|---|
异步结果自动处理 | 使用 then() 添加处理逻辑 |
支持链式组合 | 每个 then() 返回新的 future,可持续组合 |
显式处理异常 | 回调函数收到的是 future<T> ,可调用 .get() |
灵活的线程调度策略 | then() 不强制在哪个线程执行,由库实现决定 |
兼容 std::async 行为 | 可自定义封装成类似 async 的函数 |
持续性连接
使用 std::experimental::future
实现多步骤异步任务的链式执行与自动触发
使用场景举例
例如:用户登录应用时,需要按顺序完成以下操作:
- 发送用户名与密码进行身份验证;
- 验证成功后请求用户数据;
- 更新 UI 展示用户数据。
同步方式实现
同步实现(阻塞 UI):
1
2
3
4
5
6
7
8
9
10
void process_login(std::string const& username, std::string const& password)
{
try {
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e) {
display_error(e);
}
}
缺点:会阻塞调用线程(UI卡顿),不能并发处理多个用户登录。
简单异步实现
简单异步实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
std::future<void> process_login(
std::string const& username, std::string const& password)
{
return std::async(std::launch::async, [=]() {
try {
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e) {
display_error(e);
}
});
}
这虽是异步调用,但 整个链仍然在一个线程中顺序执行,缺乏粒度控制,也不能解耦步骤。
持续性(continuation)异步方式
链式持续性实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return spawn_async([=]() {
return backend.authenticate_user(username, password);
}).then([](std::experimental::future<user_id> id) {
return backend.request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch(std::exception& e) {
display_error(e);
}
});
}
- 每一步执行完才触发下一步(非阻塞链式执行);
- 每个
.then()
接收future<T>
,通过.get()
取值并处理异常; - 比同步方式更加响应式,适合构建复杂 UI 异步逻辑。
引入 async 接口:完全异步执行链
上例中的 backend.authenticate_user()
和 backend.request_current_info()
仍是同步的。为了更彻底的异步化:
- 使用异步版本:
backend.async_authenticate_user()
; - 避免阻塞等待后端响应。
全异步操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return backend.async_authenticate_user(username, password).then(
[](std::experimental::future<user_id> id) {
return backend.async_request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch(std::exception& e) {
display_error(e);
}
});
}
- 所有操作异步进行,主线程可持续响应 UI;
- 因使用了 future 展开(future-unwrapping),避免出现嵌套 future 类型(如
future<future<T>>
); - 如果支持 C++14,可用泛型 Lambda 简化类型书写。
泛型 Lambda 示例(C++14 起)
1
2
3
4
return backend.async_authenticate_user(username, password).then(
[](auto id) {
return backend.async_request_current_info(id.get());
});
共享 future 与多持续性连接
有时需要多个处理逻辑共享同一个异步结果。这时可以使用 shared_future
。
shared_future
多持续性:
1
2
3
4
5
6
7
8
9
auto fut = spawn_async(some_function).share(); // 转为共享 future
auto fut2 = fut.then([](std::experimental::shared_future<some_data> data) {
do_stuff(data);
});
auto fut3 = fut.then([](std::experimental::shared_future<some_data> data) {
return do_other_stuff(data);
});
特性 | 说明 |
---|---|
.share() | 将 future<T> 转为 shared_future<T> |
多个 .then() 可并发使用 | 每个 then 都绑定一个处理逻辑 |
参数类型需是 shared_future | 避免数据竞争和无效引用 |
注意:不能将临时 shared_future
作为持续性参数;要先保存并复用。
总结:持续性链式编程优势
对比点 | 同步方式 | 异步+future | 持续性(then)方式 |
---|---|---|---|
是否阻塞线程 | 是 | 否(取决于 get 调用) | 否,真正非阻塞 |
任务是否串行 | 是 | 是 | 可分段并行,按需触发 |
控制流清晰度 | 高 | 中 | 高(逻辑清晰、无共享状态) |
异常处理 | try/catch 包裹整段 | 需统一处理 | 每段独立处理 |
支持多个消费者 | 否 | 否 | shared_future 支持 |
线程复用能力 | 差 | 一般 | 易与线程池整合,线程利用率更高 |
等待多个 future
在并发任务中,常见的需求是:多个任务并行处理,每个任务返回一个 future
,等所有任务完成后再做统一处理。为了有效管理这些异步任务的生命周期,我们可以使用标准库或扩展库提供的工具。
问题引入:如何等待多个异步任务完成?
假设有一组数据 vec
,我们希望对其进行分片处理,每片数据由一个异步任务处理,最后收集所有结果并汇总为一个 FinalResult
。
使用 std::async
+ future.get()
传统异步处理方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::future<FinalResult> process_data(std::vector<MyData>& vec)
{
size_t const chunk_size = whatever;
std::vector<std::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin, begin + this_chunk_size)
);
begin += this_chunk_size;
}
return std::async([all_results = std::move(results)]() {
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // ① 阻塞式获取结果
}
return gather_results(v);
});
}
- 所有异步任务完成前,最后那一个
std::async
包装的任务会阻塞等待多个future.get()
。 - 若有任务尚未完成,线程会反复唤醒检查,就绪前可能反复休眠和切换,资源浪费且调度开销大。
改进方案:使用 std::experimental::when_all
我们可以使用技术规范中的 std::experimental::when_all()
函数,它能:
- 接收一组
future
; - 返回一个新
future
,其状态在所有输入future
就绪后变为就绪; - 配合
.then()
实现异步无阻塞链式处理。
使用 when_all
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
std::experimental::future<FinalResult> process_data(
std::vector<MyData>& vec)
{
size_t const chunk_size = whatever;
std::vector<std::experimental::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
spawn_async(process_chunk, begin, begin + this_chunk_size)
);
begin += this_chunk_size;
}
return std::experimental::when_all(
results.begin(), results.end()
).then( // ① 当所有结果都就绪时,触发 Lambda
[](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results) {
std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // ② 安全获取所有处理结果(无阻塞)
}
return gather_results(v);
}
);
}
项目 | std::async 方式 | when_all 方式 |
---|---|---|
阻塞等待 | 是 | 否,纯异步 |
资源消耗 | 高(线程唤醒 + 等待) | 低(等待就绪后一次性触发) |
扩展性 | 差 | 好(支持大规模任务并发处理) |
编码复杂度 | 中 | 稍高(但逻辑更清晰) |
补充:when_any
—— 等第一个任务完成就触发
如果只关心任意一个任务的完成,可以使用 std::experimental::when_any
:
1
2
3
4
5
std::experimental::when_any(futures.begin(), futures.end())
.then([](auto ready_future_group){
// 只有一个任务完成,也会立即触发
// 你可以从 ready_future_group 获取已完成任务的索引或值
});
- 多任务抢占执行,谁先完成就先响应;
- 超时任务竞速,例如从多个备份服务器取最快结果。
小结
方案 | 描述 |
---|---|
std::async + future.get() | 简单易用,但在任务多时效率低下,会阻塞线程 |
when_all | 等所有任务完成后集中处理,适合全并发场景 |
when_any | 响应最快完成的任务,适合竞争式异步处理 |
持续性(.then() ) | 异步任务链式处理,避免阻塞,提高线程利用率 |
使用 when_any
等待第一个 future
在处理并行搜索任务时,如果我们只需要找到一个符合条件的结果,那么没必要等待所有任务完成。这时候就可以用 std::experimental::when_any
来提升效率。
典型场景
假设你有一大堆数据 data
,目标是找出一个满足某种条件的元素:
- 满足条件的元素可能有多个;
- 找到一个即可;
- 找到后停止其他任务的执行。
问题建模:并行搜索 + 首个命中即处理
为了高效执行,可以:
- 利用硬件并发能力将数据拆分;
- 多个异步任务并行搜索;
- 利用
when_any
等待第一个完成搜索任务的结果; - 找到即处理,未找到则继续监听下一个完成的任务。
示例代码解析
使用 when_any
查找符合条件的值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
std::experimental::future<FinalResult>
find_and_process_value(std::vector<MyData> &data)
{
// 1. 根据硬件并发能力分配任务数量
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;
std::vector<std::experimental::future<MyData*>> results;
// 2. 将数据均匀分成 num_tasks 份
auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
// 3. 用共享 flag 控制其他线程是否中止搜索
std::shared_ptr<std::atomic<bool>> done_flag =
std::make_shared<std::atomic<bool>>(false);
// 4. 启动每个异步任务
for (unsigned i = 0; i < num_tasks; ++i)
{
auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
results.push_back(spawn_async([=] {
for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry)
{
if (matches_find_criteria(*entry)) // 匹配成功
{
*done_flag = true; // 通知其他线程终止搜索
return &*entry;
}
}
return (MyData*)nullptr; // 未找到
}));
chunk_begin = chunk_end;
}
// 5. 最终结果 promise
auto final_result =
std::make_shared<std::experimental::promise<FinalResult>>();
// 6. 定义持续性操作器
struct DoneCheck {
std::shared_ptr<std::experimental::promise<FinalResult>> final_result;
DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> fr)
: final_result(std::move(fr)) {}
void operator()(std::experimental::future<
std::experimental::when_any_result<
std::vector<std::experimental::future<MyData*>>>> results_param)
{
auto results = results_param.get(); // 获取 when_any 的结果
MyData* const ready_result = results.futures[results.index].get(); // 获取就绪 future 的值
if (ready_result)
{
// 6.1 找到了符合条件的数据
final_result->set_value(process_found_value(*ready_result));
}
else
{
// 6.2 未找到,移除当前已完成的任务
results.futures.erase(results.futures.begin() + results.index);
if (!results.futures.empty())
{
// 6.3 继续监听下一个完成任务
std::experimental::when_any(
results.futures.begin(), results.futures.end()
).then(std::move(*this));
}
else
{
// 6.4 所有任务都失败,设置异常
final_result->set_exception(std::make_exception_ptr(
std::runtime_error("Not found")));
}
}
}
};
// 7. 持续性连接
std::experimental::when_any(results.begin(), results.end())
.then(DoneCheck(final_result));
// 8. 返回最终 future
return final_result->get_future();
}
实现思路总结
步骤 | 说明 |
---|---|
数据分片 | 平均分配给每个任务,提高并行效率 |
异步执行 | 使用 spawn_async 启动任务,避免主线程阻塞 |
并发控制 | done_flag 控制多个线程之间的终止条件 |
非阻塞监听 | 使用 when_any 等待第一个任务完成并处理 |
异常处理 | 没有任何结果时主动设置异常到 promise |
补充:使用 when_all
/ when_any
的变体形式
除了使用迭代器范围,when_all
和 when_any
也可以接受一组 future
作为变参直接传入:
1
2
3
4
5
6
7
8
9
std::experimental::future<int> f1 = spawn_async(func1);
std::experimental::future<std::string> f2 = spawn_async(func2);
std::experimental::future<double> f3 = spawn_async(func3);
auto result = std::experimental::when_all(
std::move(f1), std::move(f2), std::move(f3)
);
// result 类型为 future<tuple<future<int>, future<string>, future<double>>>
future
是单次获取结果的,所以必须std::move
;- 如果要多次访问,需要使用
shared_future
; - 多个返回值封装在
std::tuple
中。
不要滥用 future
有时,你只是想等待某组线程、数据量或操作完成,不需要具体返回值。这种情况可以考虑:
std::latch
(一次性屏障);std::barrier
(可重用屏障);- 自定义计数器 + 条件变量控制同步。
小结
用法 | 说明 |
---|---|
when_all | 所有 future 完成时返回结果(适用于任务合并) |
when_any | 任意一个 future 完成即返回(适用于“抢跑”场景) |
持续性 .then() | 链式操作,避免阻塞等待,提高异步编程可读性 |
支持变参版本 | 可接受多个不同类型的 future ,封装成 tuple |
多次使用需用 shared_future | 否则使用后 future 会失效 |
锁存器和栅栏(Latch & Barrier)
在并发编程中,除了 future
、mutex
、condition_variable
之外,还有两种实用的同步机制:
Latch(锁存器)
:一次性、单向计数;Barrier(栅栏)
:可重复使用的线程同步机制。
锁存器 std::experimental::latch
概念说明
- 是一种计数同步工具;
- 初始化一个计数值,使用
count_down()
减少计数; - 任何调用
wait()
的线程都会阻塞,直到计数减为 0; - 一旦就绪(减为 0),永久保持就绪态,不再阻塞任何调用
wait()
的线程; latch
是一次性使用的,不能重置。
适用场景
- 等待多个异步任务全部完成;
- 控制多个线程在一个时间点开始任务;
- 替代简化版的
join
。
示例代码:使用 latch 同步启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <experimental/latch>
#include <thread>
#include <iostream>
std::experimental::latch sync_point(3); // 初始值为 3
void worker(int id) {
std::cout << "Worker " << id << " ready\n";
sync_point.count_down(); // 当前线程报告就绪
sync_point.wait(); // 等待所有线程就绪
std::cout << "Worker " << id << " starts running\n";
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
t1.join(); t2.join(); t3.join();
}
输出示意
1
2
3
4
5
6
Worker 1 ready
Worker 2 ready
Worker 3 ready
Worker 3 starts running
Worker 1 starts running
Worker 2 starts running
栅栏 std::experimental::barrier
概念说明
- 是一种可重用的同步机制;
- 每个线程调用
arrive_and_wait()
到达栅栏; - 栅栏阻塞直到所有线程都到达;
- 到达后所有线程继续执行;
- 可以设置 完成函数(phase completion function),在每轮结束时自动执行。
适用场景
- 多线程分阶段同步;
- 每一轮结束后再统一进入下一轮;
- 多线程并行处理,每轮之间协同。
示例代码:使用 barrier 进行分阶段同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <experimental/barrier>
#include <thread>
#include <iostream>
constexpr int num_threads = 3;
std::experimental::barrier sync_point(num_threads,
[] { std::cout << "--- All threads synchronized ---\n"; });
void task(int id) {
for (int round = 1; round <= 3; ++round) {
std::cout << "Thread " << id << " working in round " << round << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100 * id)); // 模拟不同进度
sync_point.arrive_and_wait(); // 栅栏同步点
}
}
int main() {
std::thread t1(task, 1);
std::thread t2(task, 2);
std::thread t3(task, 3);
t1.join(); t2.join(); t3.join();
}
输出示意
1
2
3
4
5
6
7
8
9
10
11
12
Thread 1 working in round 1
Thread 2 working in round 1
Thread 3 working in round 1
--- All threads synchronized ---
Thread 1 working in round 2
Thread 2 working in round 2
Thread 3 working in round 2
--- All threads synchronized ---
Thread 1 working in round 3
Thread 2 working in round 3
Thread 3 working in round 3
--- All threads synchronized ---
latch
与 barrier
对比总结
特性 | latch | barrier |
---|---|---|
生命周期 | 一次性 | 可重用(多轮同步) |
同步行为 | 到达点阻塞,直到计数为0 | 所有线程到达,统一继续 |
控制行为 | count_down() + wait() | arrive_and_wait() |
支持额外回调 | 否 | ✅ 有完成函数(phase completion) |
线程计数方式 | 任意线程递减 | 每线程每轮只能一次 |
应用场景 | 启动控制,任务收集 | 分阶段协同并行 |
延伸用途示例
latch 用于等待多个后台任务完成
1
2
3
4
5
6
7
8
9
10
void parallel_task(std::vector<Task> tasks) {
std::experimental::latch done(tasks.size());
for (auto& task : tasks) {
std::thread([&, task]() {
do_task(task);
done.count_down(); // 任务完成
}).detach();
}
done.wait(); // 等待所有任务完成
}
barrier 用于图像并行处理每帧
1
2
3
4
5
6
7
8
std::experimental::barrier sync_point(num_threads);
void process_frame(Frame& frame, int thread_id) {
for (int round = 0; round < max_rounds; ++round) {
compute_slice(frame, thread_id, round);
sync_point.arrive_and_wait(); // 等待所有线程完成本轮
}
}
std::experimental::latch
:基础的锁存器类型
概述
std::experimental::latch
是 C++ 提供的一种基础的同步机制,声明于 <experimental/latch>
头文件。它适用于一组线程中的某些线程需要等待其他线程完成某项任务之后才能继续执行的场景。
- 初始时设置一个计数值;
- 调用
count_down()
使计数递减; - 当计数为 0 时,
latch
进入就绪态; - 等待线程通过
wait()
阻塞,直到latch
就绪; - 可调用
is_ready()
检查是否就绪; count_down_and_wait()
表示递减后等待。
latch 是一次性同步工具,不能重置或复用。
示例代码:等待所有线程的数据准备完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <experimental/latch>
#include <vector>
#include <future>
void foo() {
unsigned const thread_count = ...; // 实际线程数
std::experimental::latch done(thread_count); // ① 初始化 latch
my_data data[thread_count];
std::vector<std::future<void>> threads;
for (unsigned i = 0; i < thread_count; ++i) {
threads.push_back(std::async(std::launch::async, [&, i] { // ② 通过值捕获 i,其他引用捕获
data[i] = make_data(i);
done.count_down(); // ③ 数据准备完毕后减少 latch 计数
do_more_stuff(); // ④ 线程继续执行其他工作
}));
}
done.wait(); // ⑤ 主线程等待所有数据准备完成
process_data(data, thread_count); // ⑥ 对收集到的数据进行处理
} // ⑦ 等待 future 析构,确保线程执行完毕
示例说明
- ① 初始化 latch:设置初始计数为线程数;
- ② 捕获方式说明:
i
使用值捕获:避免循环变量被修改引发竞争;- 其他变量(如
data
,done
)使用引用捕获;
- ③
count_down()
:线程任务完成后报告; - ④
do_more_stuff()
:线程可继续执行其他非关键任务; - ⑤
wait()
:主线程阻塞直到 latch 减为 0; - ⑥
process_data
:数据准备完毕后安全处理; - ⑦
future
析构:确保线程生命周期完整;
latch 带来的线程安全性
由于 latch
是同步对象,多个线程对 count_down()
的调用是内存可见的,因此主线程调用 wait()
可以准确检测到线程状态的变化,避免数据竞争。
使用 latch 的常见场景
场景 | 描述 |
---|---|
多线程启动前同步 | 等所有线程准备完毕后再统一开始执行 |
异步任务准备阶段同步 | 所有子任务准备完数据后再进行主线程处理 |
替代手动使用 join 等待线程 | latch + async/future 的组合是 join 的高阶替代 |
注意事项
latch
是一次性工具,使用一次后不可重置;wait()
应在所有任务调用count_down()
后调用;- 若想复用同步逻辑,请使用
std::experimental::barrier
;
std::experimental::barrier
:简单的栅栏
概述
在 <experimental/barrier>
中,C++ 提供了两种栅栏(barrier)类型:
std::experimental::barrier
(简单且高效)✅std::experimental::flex_barrier
(更灵活但代价更高)🔧
两者都能用于一组线程的阶段性同步。适用于以下场景:
“所有线程必须在某个时间点等待其他线程,然后一起继续下一阶段。”
栅栏的用途
设想:多线程同时处理数据,每个线程处理不同的部分,但必须等所有线程处理完当前批次后,才能进入下一批次。
使用 std::experimental::barrier
:
- 设置线程数;
- 每个线程调用
arrive_and_wait()
进入阻塞; - 最后一个到达的线程会唤醒所有线程;
- 所有线程可进入下一阶段;
- 栅栏可被复用!
与一次性
latch
不同,barrier
可以循环使用。
示例代码:使用 barrier
同步处理数据块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <experimental/barrier>
#include <vector>
#include <thread>
#include "joining_thread.h" // 见 2.7 章节
result_chunk process(data_chunk);
std::vector<data_chunk> divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source& source, data_sink& sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::experimental::barrier sync(num_threads); // 创建 barrier
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // ⑥ 检查数据是否处理完
if (!i) { // ① 只有 0 号线程划分数据块
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
sync.arrive_and_wait(); // ② 等待所有线程数据准备完成
result.set_chunk(i, num_threads, process(chunks[i])); // ③ 各线程处理自己的块
sync.arrive_and_wait(); // ④ 等待所有线程完成计算
if (!i) { // ⑤ 0 号线程统一输出
sink.write_data(std::move(result));
}
}
});
}
} // ⑦ joining_thread 析构保证线程全部完成
示例详解
步骤 | 说明 |
---|---|
① | 只有线程 0 执行数据划分操作(串行部分) |
② | 所有线程同步等待,直到数据准备完毕 |
③ | 每个线程处理自己的 data_chunk |
④ | 所有线程再次同步等待,确保计算完成 |
⑤ | 由线程 0 输出处理结果 |
⑥ | 线程持续循环,处理下一批数据 |
⑦ | 所有 joining_thread 离开作用域后自动 join,确保线程结束 |
栅栏的复用与退出
barrier
是可复用的,每次arrive_and_wait()
后自动重置;- 若线程要退出 barrier,同步组数量要减少,可调用:
1
sync.arrive_and_drop();
这样下一个周期就会减少一个线程的等待数。
栅栏与线程安全
栅栏提供了可靠的同步点,能确保所有线程同步进入下一阶段,避免数据竞争和未完成的任务之间的不一致状态。
栅栏与 latch 的区别
特性 | latch | barrier |
---|---|---|
可复用 | ❌ 否(一次性) | ✅ 是 |
解锁方式 | 计数减为 0 后就绪 | 所有线程到达后统一释放 |
应用场景 | 线程准备阶段等待 | 多阶段并行任务同步 |
重置机制 | 无 | 自动重置每轮 |
std::experimental::flex_barrier
— 更灵活和更友好的栅栏
介绍
std::experimental::flex_barrier
和 std::experimental::barrier
相似,但多了一个强大功能:
- 构造时传入一个完整函数(callable)和线程数;
- 当所有线程到达栅栏时,由其中一个线程执行该函数(通常是线程0);
- 该函数不仅执行串行代码,还可动态修改下一轮参与栅栏同步的线程数量。
这意味着开发者可以灵活控制下一阶段线程数,支持线程数的增加或减少。
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::vector<data_chunk> chunks;
// 1. 拆分数据的 Lambda,后续会由栅栏调用
auto split_source = [&] {
if (!source.done()) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};
split_source(); // 2. 初次拆分数据
result_block result;
// 3. 构造 flex_barrier,传入线程数和串行函数
std::experimental::flex_barrier sync(num_threads, [&] {
sink.write_data(std::move(result)); // 输出结果
split_source(); // 拆分下一批数据
return -1; // -1 表示线程数保持不变
});
std::vector<joining_thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6. 循环直到数据处理完
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait(); // 7. 栅栏同步点
}
});
}
}
步骤 | 说明 |
---|---|
① | split_source :拆分数据的函数,被封装给 flex_barrier 使用 |
② | 程序启动时先调用一次拆分函数,准备初始数据 |
③ | 构造 flex_barrier ,指定线程数和一个串行函数,该函数由线程0执行 |
④ | 串行函数执行数据输出和拆分,并通过返回值控制下一轮参与线程数 |
⑤ | 返回 -1 表示线程数保持不变;返回其他值则可动态调整线程数 |
⑥ | 每个线程在循环内处理自己的数据块,直到数据源耗尽 |
⑦ | 每轮处理完成后调用 arrive_and_wait() 等待所有线程同步 |
flex_barrier
的优势
- 将串行逻辑统一放入栅栏的完成阶段;
- 支持动态调整下一阶段线程数量,适合复杂流水线、分阶段并行任务;
- 代码结构更简洁,主循环只需包含并行部分代码,串行部分自动处理。
典型应用场景
- 流水线处理:不同阶段线程数不同;
- 需要在每轮同步后执行串行操作,且该操作影响下一轮线程数;
- 复杂并行算法中的阶段间控制。
小结
相比 std::experimental::barrier
,flex_barrier
通过引入完成阶段函数,使得线程同步和串行操作更紧密结合,同时赋予了动态控制线程参与数量的能力,是一个更灵活、功能更强大的栅栏机制。