首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >必须将std::std::vector<std::thread>中的线程连接两次,以避免线程dtor终止

必须将std::std::vector<std::thread>中的线程连接两次,以避免线程dtor终止
EN

Stack Overflow用户
提问于 2019-04-06 04:04:54
回答 1查看 430关注 0票数 1

我编写了一个并行程序来找出两个求和问题:

代码语言:javascript
复制
#include <iostream>
#include <vector>
#include <algorithm>
#include <utility>
#include <functional>

#include <thread>
#include <atomic>
#include <future>

using std::vector;
class Solution
{
private:
    using val_t = std::pair<int, int>;
    using Container = vector<val_t>;
    using It = typename Container::iterator;
    using size_t = typename Container::size_type;

    static bool cmp(const val_t &x, const val_t &y) noexcept
    {
        return x.first < y.first;
    }

    // Precondition: data.size() == 0, nums.size() != 0
    static void make_data(Container &data, const vector<int> &nums)
    {
        data.reserve(nums.size());

        int i = 0;
        for (auto &each: nums)
            data.emplace_back(each, i++);

        std::sort(data.begin(), data.end(), &cmp);
    }

    // launch_threads will launch threads in parallel.
    // It will call F with an int(thread id for accessing data) and std::forward<Args>(args)...
    template <class F>
    static void launch_threads(std::reference_wrapper<vector<std::thread>> pool, int thread_cnt, F &&f)
    {
        auto launch_other_threads = [=, f = std::forward<F>(f)]()
        {
            // This is thread 0!
            for (int i = 1; i != thread_cnt; ++i)
                pool.get().emplace_back(f, i);

            return std::invoke(f, 0);
        };

        pool.get().reserve(thread_cnt);
        pool.get().emplace_back(launch_other_threads);
    }

    // number of threads this process will ever have(including the main thread)
    int total_threads;
    // Thread pool and how much data each will process
    vector<std::thread> pool;
    size_t n;

    // The data for processing
    int target;
    Container data;
    // is_ready indicates whether the data is ready
    std::shared_future<void> is_ready;

    // Where result will be put. 
    // Since there is only one result, no atomic variable is needed
    vector<int> ret;
    // Synchronization of the event to terminate other running threads
    std::atomic_bool finished;

    void findtwoSum_impl(int thread_id, It beg, It end) noexcept
    {
        for (; beg != end && !finished.load(std::memory_order_acquire); ++beg) {
            int val = target - beg->first;
            auto it = std::lower_bound(data.begin(), data.end(), val_t{val, 0}, &cmp);

            if (it != data.end() && it->first == val &&
                (it->second != beg->second || (++it)->first == val)) {

                finished.store(true, std::memory_order_release);
                ret = {beg->second, it->second};
                break;
            }
        }
    }

    void findtwoSum(int thread_id) noexcept
    {
        std::shared_future<void>{is_ready}.wait();

        // Calculate the data that this thread will process
        auto beg = data.begin() + n * thread_id;
        auto end = beg + n;

        return findtwoSum_impl(thread_id, beg, end);
    }

    // thread_cnt must > 0
    void launch_threads_and_prepare_data(const vector<int> &nums, int thread_cnt)
    {
        // Initial the notification mechanism
        std::promise<void> promise;
        is_ready = promise.get_future().share();

        // Launch threads
        launch_threads(pool, thread_cnt, [this](int id) noexcept {
            return findtwoSum(id);
        });

        // Prepare data
        make_data(data, nums);

        n = data.size() / total_threads;

        // Notify threads that the data is ready
        promise.set_value();
    }

    // do_last_thread_cleanup process the remaining data and join threads
    void do_last_thread_cleanup()
    {
        // The main thred is the last thread.
        int thread_id = total_threads - 1;

        // findtwoSum_impl returns if any thread find the result.
        findtwoSum_impl(thread_id, data.begin() + n * thread_id, data.end());

        // The original join loop
        // Wait for other threads to finish.
        for (auto &thread: pool)
            thread.join();

        // Clear containers
        pool.clear();
        data.clear();
    }

public:
    vector<int> twoSum(const vector<int> &nums, int target)
    {
        // Initialize class variables
        // I know that total_threads should be much lower depending on the size of input
        total_threads = 8;

        this->target = target;
        ret.reserve(2);
        finished.store(false, std::memory_order_release);

        // Initialize class variable pool, n, data and is_ready
        launch_threads_and_prepare_data(nums, total_threads - 1);

        do_last_thread_cleanup();

        return std::move(ret);
    }
};

int main()
{
    Solution s;
    s.twoSum({3, 2, 4}, 6);
    return 0;
}

我用clang++-8 -std=c++17 -O1 -g -fsanitizer=address -lpthread -o debug.out编译了它,当我运行./debug.out时,它毫无例外地被终止。

我尝试通过添加std::cerr << "!@@@" << std::endl;来调试它,在join()std::vector<std::thread>中的std::threads之后,在Solution s超出范围之前;结果显示,pool.clear()是导致这种情况的代码。

我完全糊涂了,因为我在打电话给join之前就做了pool.clear()。为了发现问题,我将原始join()的代码修改为以下代码:

代码语言:javascript
复制
// The first loop
for (auto &thread: pool) {
    thread.join();
    std::cerr << " 1" << thread.get_id() << " is joinable? " << thread.joinable() << std::endl;
}

// The second loop
for (auto &thread: pool)
    if (thread.joinable())
        thread.join();
// The third loop
for (auto &thread: pool)
    std::cerr << thread.get_id() << " is joinable? " << thread.joinable() << std::endl;

而且,让我再次感到惊讶的是,我发现了join的第一个循环--线程根本不工作:

代码语言:javascript
复制
 1thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0
thread::id of a non-executing thread is joinable? 0

我认为我编写的第一个循环有问题,所以我注释掉了,然后再次运行它:

代码语言:javascript
复制
thread::id of a non-executing thread is joinable? 0
140634635626240 is joinable? 1
140634627233536 is joinable? 1
140634618840832 is joinable? 1
140634610448128 is joinable? 1
140634602055424 is joinable? 1
140634593662720 is joinable? 1
terminate called without an active exception

我完全糊涂了,也不知道如何解决这个问题。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-12 14:04:53

您的bug

您的线程从n = 0开始,因为data.size()total_threads小,整数除法n = data.size() / total_threads;将给出零。

代码语言:javascript
复制
void findtwoSum(int thread_id) noexcept
{
    std::shared_future<void>{is_ready}.wait();

    // Calculate the data that this thread will process
    auto beg = data.begin() + n * thread_id;
    auto end = beg + n;

    return findtwoSum_impl(thread_id, beg, end);
}

您有以下beg == end == data.begin()此时没有线程执行任何计算并退出

代码语言:javascript
复制
thread::id of a non-executing thread is joinable? 0

这是因为线程已经完成。

代码语言:javascript
复制
140634635626240 is joinable? 1

这是线程仍在运行\runnable的时候。

由于线程的调度是完全随机的,所以输出总是会变化的。这是正常的,即使您的代码中没有任何错误。

请注意:在启动线程之前设置所有内容:这避免了对std::shared_future<void> is_ready;的尴尬依赖

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55545782

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档