文章

确定线程数量

CPU逻辑核心数决定并发线程上限,超线程增加逻辑核数,合理线程数一般不超硬件线程数,避免资源竞争。

确定线程数量

确定线程数量

C++标准库中的std::thread::hardware_concurrency()函数非常实用,它返回系统支持的并发线程数,通常是CPU的核心数或者硬件线程数。如果无法获取,返回值为0。

方面物理核心逻辑核心(硬件线程)
本质实际的CPU核心核心上模拟的多线程执行单元
性能独立处理能力强共享资源,性能提升有限
数量核心数核心数 × 超线程倍数(通常2)
任务调度真正并行并行+快速切换
示例4核CPU开启超线程后,变成8逻辑核心

可以利用这个函数,实现一个并行版本的std::accumulate,将整体任务分成多个小块并由多个线程并行处理,最后合并结果。此方法会设定最小任务量,避免产生过多线程;输入为空时,返回初始值。

并行版 std::accumulate 实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <thread>
#include <vector>
#include <numeric>
#include <algorithm>
#include <iterator>
#include <functional>
#include <iostream>
#include <exception>
#include <chrono> // 用于计时

// 小任务执行体,将某个范围内的元素累加进 result
template<typename Iterator, typename T>
struct accumulate_block {
    void operator()(Iterator first, Iterator last, T& result) {
        result = std::accumulate(first, last, result);
    }
};

// 并行版 accumulate:将输入范围划分为多个块,用多线程执行累加
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    unsigned long const length = std::distance(first, last);  // 元素总数

    if (length == 0) return init;  // 边界条件:输入为空直接返回初始值

    unsigned long const min_per_thread = 25;  // 每个线程处理的最少元素数
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread; // 最多允许的线程数

    unsigned long const hardware_threads = std::thread::hardware_concurrency();  // 获取硬件支持的并发线程数

    // 实际使用的线程数:硬件线程数和 max_threads 中较小的一个
    // 若 hardware_threads 返回 0,则默认使用 2 个线程
    unsigned long const num_threads = std::min(
        hardware_threads != 0 ? hardware_threads : 2,
        max_threads
    );

    unsigned long const block_size = length / num_threads;  // 每个线程处理的块大小

    std::vector<T> results(num_threads);  // 存放每个线程的中间结果
    std::vector<std::thread> threads(num_threads - 1);  // 启动的线程容器(主线程负责最后一块)

    Iterator block_start = first;

    // 创建并启动 num_threads - 1 个线程,每个处理一个块
    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);  // 计算当前块的结束迭代器

        threads[i] = std::thread(
            accumulate_block<Iterator, T>(),  // 任务对象
            block_start,                      // 起始位置
            block_end,                        // 结束位置(不含)
            std::ref(results[i])              // 中间结果通过引用传出
        );

        block_start = block_end;  // 更新下一块的起始位置
    }

    // 主线程处理最后一块数据(可能大小略大)
    accumulate_block<Iterator, T>()(
        block_start, last, results[num_threads - 1]
    );

    // 等待所有子线程完成
    for (auto& entry : threads) {
        entry.join();
    }

    // 汇总所有线程的结果并返回
    return std::accumulate(results.begin(), results.end(), init);
}

测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int main() {
    // 准备测试数据
    std::vector<int> data(10'000'000); // 一千万个元素
    std::iota(data.begin(), data.end(), 1); // 生成 1~10000000

    // 计时串行 accumulate
    auto start_serial = std::chrono::high_resolution_clock::now();
    long long sum_serial = std::accumulate(data.begin(), data.end(), 0LL);
    auto end_serial = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> duration_serial = end_serial - start_serial;

    // 计时并行 accumulate
    auto start_parallel = std::chrono::high_resolution_clock::now();
    long long sum_parallel = parallel_accumulate(data.begin(), data.end(), 0LL);
    auto end_parallel = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> duration_parallel = end_parallel - start_parallel;

    // 输出结果进行对比
    std::cout << "Serial accumulate:   " << sum_serial << " in "
        << duration_serial.count() << " seconds\n";

    std::cout << "Parallel accumulate: " << sum_parallel << " in "
        << duration_parallel.count() << " seconds\n";

    // 校验结果是否一致
    if (sum_serial == sum_parallel) {
        std::cout << "结果一致,测试通过!" << std::endl;
    } else {
        std::cout << "结果不一致,测试失败!" << std::endl;
    }

    return 0;
}

输出示例(正常情况下):

1
2
3
Serial accumulate:   50000005000000 in 0.0201547 seconds
Parallel accumulate: 50000005000000 in 0.0108109 seconds
结果一致测试通过

关键点解析

  1. 输入为空时返回初始值 代码中if(length == 0)时,直接返回init
  2. 计算最大线程数 通过min_per_thread确定最小任务规模,保证线程数量不会太多,减少线程管理开销。
  3. 确定线程数 线程数取硬件支持线程数和最大线程数中的较小者。若hardware_concurrency()返回0,选择默认2个线程。
  4. 计算每个线程任务大小 将总元素数除以线程数,得到每个线程处理的块大小。
  5. 线程与结果容器 使用两个std::vector容器,一个存放线程对象,一个存放各线程计算的部分结果。线程数比num_threads少1,因为主线程也参与处理最后一块。
  6. 分块执行并行计算 循环中为每个线程分配任务区间,调用线程执行累加。最后主线程执行最后一块。
  7. 线程同步与结果合并 等待所有线程join(),再将所有线程结果累加得到最终结果。

注意事项

  • 并行累加不保证浮点数结果与串行相同,因加法不满足严格结合律(特别是浮点运算中的舍入误差)。
  • 迭代器类型要求至少是前向迭代器,因多线程分块需要多次遍历。
  • T类型需要默认构造函数(用于results容器预分配)。
  • 传递结果到线程函数时,使用std::ref确保传引用。
  • 线程间通信采用结果数组存储中间值,避免线程安全问题。
  • 未来可用std::futurestd::async等方式改进。
本文由作者按照 CC BY 4.0 进行授权