确定线程数量
CPU逻辑核心数决定并发线程上限,超线程增加逻辑核数,合理线程数一般不超硬件线程数,避免资源竞争。
确定线程数量
确定线程数量
C++标准库中的std::thread::hardware_concurrency()
函数非常实用,它返回系统支持的并发线程数,通常是CPU的核心数或者硬件线程数。如果无法获取,返回值为0。
方面 物理核心 逻辑核心(硬件线程) 本质 实际的CPU核心 核心上模拟的多线程执行单元 性能 独立处理能力强 共享资源,性能提升有限 数量 核心数 核心数 × 超线程倍数(通常2) 任务调度 真正并行 并行+快速切换 示例 4核CPU 开启超线程后,变成8逻辑核心
可以利用这个函数,实现一个并行版本的std::accumulate
,将整体任务分成多个小块并由多个线程并行处理,最后合并结果。此方法会设定最小任务量,避免产生过多线程;输入为空时,返回初始值。
并行版 std::accumulate
实现示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <thread>
#include <vector>
#include <numeric>
#include <algorithm>
#include <iterator>
#include <functional>
#include <iostream>
#include <exception>
#include <chrono> // 用于计时
// 小任务执行体,将某个范围内的元素累加进 result
template<typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result);
}
};
// 并行版 accumulate:将输入范围划分为多个块,用多线程执行累加
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last); // 元素总数
if (length == 0) return init; // 边界条件:输入为空直接返回初始值
unsigned long const min_per_thread = 25; // 每个线程处理的最少元素数
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread; // 最多允许的线程数
unsigned long const hardware_threads = std::thread::hardware_concurrency(); // 获取硬件支持的并发线程数
// 实际使用的线程数:硬件线程数和 max_threads 中较小的一个
// 若 hardware_threads 返回 0,则默认使用 2 个线程
unsigned long const num_threads = std::min(
hardware_threads != 0 ? hardware_threads : 2,
max_threads
);
unsigned long const block_size = length / num_threads; // 每个线程处理的块大小
std::vector<T> results(num_threads); // 存放每个线程的中间结果
std::vector<std::thread> threads(num_threads - 1); // 启动的线程容器(主线程负责最后一块)
Iterator block_start = first;
// 创建并启动 num_threads - 1 个线程,每个处理一个块
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size); // 计算当前块的结束迭代器
threads[i] = std::thread(
accumulate_block<Iterator, T>(), // 任务对象
block_start, // 起始位置
block_end, // 结束位置(不含)
std::ref(results[i]) // 中间结果通过引用传出
);
block_start = block_end; // 更新下一块的起始位置
}
// 主线程处理最后一块数据(可能大小略大)
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]
);
// 等待所有子线程完成
for (auto& entry : threads) {
entry.join();
}
// 汇总所有线程的结果并返回
return std::accumulate(results.begin(), results.end(), init);
}
测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int main() {
// 准备测试数据
std::vector<int> data(10'000'000); // 一千万个元素
std::iota(data.begin(), data.end(), 1); // 生成 1~10000000
// 计时串行 accumulate
auto start_serial = std::chrono::high_resolution_clock::now();
long long sum_serial = std::accumulate(data.begin(), data.end(), 0LL);
auto end_serial = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration_serial = end_serial - start_serial;
// 计时并行 accumulate
auto start_parallel = std::chrono::high_resolution_clock::now();
long long sum_parallel = parallel_accumulate(data.begin(), data.end(), 0LL);
auto end_parallel = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration_parallel = end_parallel - start_parallel;
// 输出结果进行对比
std::cout << "Serial accumulate: " << sum_serial << " in "
<< duration_serial.count() << " seconds\n";
std::cout << "Parallel accumulate: " << sum_parallel << " in "
<< duration_parallel.count() << " seconds\n";
// 校验结果是否一致
if (sum_serial == sum_parallel) {
std::cout << "结果一致,测试通过!" << std::endl;
} else {
std::cout << "结果不一致,测试失败!" << std::endl;
}
return 0;
}
输出示例(正常情况下):
1
2
3
Serial accumulate: 50000005000000 in 0.0201547 seconds
Parallel accumulate: 50000005000000 in 0.0108109 seconds
结果一致,测试通过!
关键点解析
- 输入为空时返回初始值 代码中
if(length == 0)
时,直接返回init
。 - 计算最大线程数 通过
min_per_thread
确定最小任务规模,保证线程数量不会太多,减少线程管理开销。 - 确定线程数 线程数取硬件支持线程数和最大线程数中的较小者。若
hardware_concurrency()
返回0,选择默认2个线程。 - 计算每个线程任务大小 将总元素数除以线程数,得到每个线程处理的块大小。
- 线程与结果容器 使用两个
std::vector
容器,一个存放线程对象,一个存放各线程计算的部分结果。线程数比num_threads
少1,因为主线程也参与处理最后一块。 - 分块执行并行计算 循环中为每个线程分配任务区间,调用线程执行累加。最后主线程执行最后一块。
- 线程同步与结果合并 等待所有线程
join()
,再将所有线程结果累加得到最终结果。
注意事项
- 并行累加不保证浮点数结果与串行相同,因加法不满足严格结合律(特别是浮点运算中的舍入误差)。
- 迭代器类型要求至少是前向迭代器,因多线程分块需要多次遍历。
T
类型需要默认构造函数(用于results
容器预分配)。- 传递结果到线程函数时,使用
std::ref
确保传引用。 - 线程间通信采用结果数组存储中间值,避免线程安全问题。
- 未来可用
std::future
、std::async
等方式改进。
本文由作者按照 CC BY 4.0 进行授权