Penyatuan utas dalam C ++ 11

131

Pertanyaan yang relevan :

Tentang C ++ 11:

Tentang Peningkatan:


Bagaimana cara mendapatkan kumpulan utas untuk mengirim tugas , tanpa membuat dan menghapusnya berulang-ulang? Ini berarti utas persisten untuk melakukan sinkronisasi ulang tanpa bergabung.


Saya memiliki kode yang terlihat seperti ini:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Alih-alih membuat dan menggabungkan utas setiap iterasi, saya lebih suka mengirim tugas ke pekerja saya utas setiap iterasi dan hanya membuatnya sekali.

Yktula
sumber
1
ini pertanyaan terkait dan jawaban saya.
didierc
1
berpikir untuk menggunakan tbb (ini Intel, tetapi gratis & open source, dan melakukan persis apa yang Anda inginkan: Anda cukup mengirimkan tugas (dapat dibagi secara rekursif) dan tidak khawatir tentang utasnya)?
Walter
2
Proyek FOSS ini adalah upaya saya untuk membuat perpustakaan kumpulan thread, periksa jika Anda mau. -> code.google.com/p/threadpool11
Etherealone
Apa yang salah dengan menggunakan tbb?
Walter

Jawaban:

84

Anda dapat menggunakan Perpustakaan C ++ Thread Pool, https://github.com/vit-vit/ctpl .

Maka kode yang Anda tulis dapat diganti dengan yang berikut

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

Anda akan mendapatkan jumlah utas yang diinginkan dan tidak akan membuat dan menghapusnya berulang-ulang di iterasi.

vit-vit
sumber
11
Ini seharusnya jawabannya; pustaka C ++ 11 satu tajuk, mudah dibaca, lugas, ringkas, dan sesuai standar. Kerja bagus!
Jonathan H
@ vit-vit bisakah Anda memberi contoh dengan suatu fungsi? bagaimana Anda mendorong fungsi anggota kelas diresults[j] = p.push([&arr, j](int){ arr[j] +=2; });
Hani Goc
1
@HaniGoc Cukup ambil instance dengan referensi.
Jonathan H
@ vit-vit Mengirimkan Anda permintaan tarik untuk meningkatkan versi STL.
Jonathan H
@ vit-vit: Sulit menghubungi pengelola perpustakaan itu dengan pertanyaan, petunjuk.
einpoklum
83

Ini disalin dari jawaban saya ke posting lain yang sangat mirip, semoga bisa membantu:

1) Mulai dengan jumlah utas maksimum yang dapat didukung sistem:

int Num_Threads =  thread::hardware_concurrency();

2) Untuk implementasi threadpool yang efisien, setelah utas dibuat sesuai dengan Num_Threads, lebih baik tidak membuat yang baru, atau menghancurkan yang lama (dengan bergabung). Akan ada penalti kinerja, bahkan mungkin membuat aplikasi Anda berjalan lebih lambat daripada versi serial.

Setiap utas C ++ 11 harus menjalankan fungsinya dengan loop tak terbatas, terus-menerus menunggu tugas baru untuk diraih dan dijalankan.

Berikut ini cara melampirkan fungsi tersebut ke kumpulan utas:

int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{  Pool.push_back(thread(Infinite_loop_function));}

3) Fungsi Infinite_loop_

Ini adalah loop "while (true)" menunggu antrian tugas

void The_Pool:: Infinite_loop_function()
{
    while(true)
    {
        {
            unique_lock<mutex> lock(Queue_Mutex);

            condition.wait(lock, []{return !Queue.empty() || terminate_pool});
            Job = Queue.front();
            Queue.pop();
        }
        Job(); // function<void()> type
    }
};

4) Buat fungsi untuk menambahkan pekerjaan ke Antrian Anda

void The_Pool:: Add_Job(function<void()> New_Job)
{
    {
        unique_lock<mutex> lock(Queue_Mutex);
        Queue.push(New_Job);
    }
    condition.notify_one();
}

5) Bind fungsi sewenang-wenang ke Antrian Anda

Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

Setelah Anda mengintegrasikan bahan-bahan ini, Anda memiliki kolam threading dinamis Anda sendiri. Utas ini selalu berjalan, menunggu pekerjaan selesai.

Saya minta maaf jika ada beberapa kesalahan sintaksis, saya mengetik kode ini dan dan saya memiliki memori yang buruk. Maaf bahwa saya tidak dapat memberikan Anda kode kumpulan utas lengkap, yang akan melanggar integritas pekerjaan saya.

Edit: untuk menghentikan pool, panggil metode shutdown ():

XXXX::shutdown(){
{
    unique_lock<mutex> lock(threadpool_mutex);
    terminate_pool = true;} // use this flag in condition.wait

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for(std::thread &every_thread : thread_vector)
    {   every_thread.join();}

    thread_vector.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}
PhD AP EcE
sumber
Bagaimana Anda memiliki vektor <thread> ketika utas (const thread &) = hapus?
Christopher Pisz
1
@ChristopherPisz std::vectortidak memerlukan elemennya untuk dapat disalin. Anda dapat menggunakan vektor dengan jenis bergerak-satunya ( unique_ptr, thread, future, dll).
Daniel Langr
dalam contoh Anda di atas, bagaimana Anda menghentikan pool? Haruskah condition.waitjuga mencari variabel stop_dan periksa if (stop_ == true) { break;}?
John
@ John, silakan lihat metode shutdown di atas.
PhD AP EcE
2
Dalam shutdown (), itu harus thread_vector.clear (); bukannya thread_vector.empty (); Benar?
sudheerbb
63

Kumpulan utas berarti bahwa semua utas Anda berjalan, sepanjang waktu - dengan kata lain, fungsi utas tidak pernah kembali. Untuk memberi thread sesuatu yang bermakna untuk dilakukan, Anda harus merancang sistem komunikasi antar-thread, baik untuk tujuan memberi tahu thread bahwa ada sesuatu yang harus dilakukan, maupun untuk mengkomunikasikan data pekerjaan yang sebenarnya.

Biasanya ini akan melibatkan semacam struktur data bersamaan, dan setiap utas mungkin akan tidur pada beberapa jenis variabel kondisi, yang akan diberitahukan ketika ada pekerjaan yang harus dilakukan. Setelah menerima pemberitahuan, satu atau beberapa utas bangun, memulihkan tugas dari struktur data bersamaan, memprosesnya, dan menyimpan hasilnya dengan cara yang analog.

Utas kemudian akan melanjutkan untuk memeriksa apakah ada lebih banyak pekerjaan yang harus dilakukan, dan jika tidak kembali tidur.

Hasilnya adalah bahwa Anda harus merancang semua ini sendiri, karena tidak ada gagasan alami tentang "pekerjaan" yang berlaku secara universal. Ini sedikit kerja, dan ada beberapa masalah halus yang harus Anda selesaikan. (Anda dapat memprogram di Go jika Anda menyukai sistem yang menangani pengelolaan utas untuk Anda di belakang layar.)

Kerrek SB
sumber
11
"Anda harus merancang sendiri semua ini" <- itulah yang saya coba hindari. Goroutine tampak fantastis.
Yktula
2
@Yktula: Ya, ini tugas yang sangat tidak sepele. Bahkan tidak jelas dari pos Anda pekerjaan apa yang ingin Anda lakukan, dan itu sangat mendasar untuk solusinya. Anda dapat menerapkan Go in C ++, tetapi itu akan menjadi hal yang sangat spesifik, dan separuh orang akan mengeluh bahwa mereka menginginkan sesuatu yang berbeda.
Kerrek SB
19

Threadpool pada intinya adalah seperangkat utas yang semuanya terikat pada fungsi yang bekerja sebagai perulangan kejadian. Utas ini tanpa henti akan menunggu tugas untuk dieksekusi, atau penghentian mereka sendiri.

Pekerjaan threadpool adalah untuk menyediakan antarmuka untuk mengirimkan pekerjaan, menentukan (dan mungkin memodifikasi) kebijakan menjalankan pekerjaan ini (aturan penjadwalan, instantiasi utas, ukuran kumpulan), dan memantau status utas dan sumber daya terkait.

Jadi untuk kumpulan serbaguna, seseorang harus mulai dengan mendefinisikan apa tugas itu, bagaimana itu diluncurkan, disela, apa hasilnya (lihat gagasan tentang janji dan masa depan untuk pertanyaan itu), peristiwa seperti apa yang harus ditanggapi oleh utas-utas tersebut untuk, bagaimana mereka akan menanganinya, bagaimana peristiwa-peristiwa ini akan dibedakan dari yang ditangani oleh tugas-tugas. Ini bisa menjadi sangat rumit seperti yang Anda lihat, dan memaksakan pembatasan tentang cara kerja utas, karena solusinya menjadi semakin terlibat.

Perkakas saat ini untuk menangani peristiwa cukup barebone (*): primitif seperti mutex, variabel kondisi, dan beberapa abstraksi di atas itu (kunci, hambatan). Tetapi dalam beberapa kasus, abstrasi ini mungkin tidak layak (lihat pertanyaan terkait ini ), dan seseorang harus kembali menggunakan primitif.

Masalah lain harus dikelola juga:

  • sinyal
  • saya / o
  • perangkat keras (afinitas prosesor, pengaturan heterogen)

Bagaimana ini akan bermain di pengaturan Anda?

Jawaban untuk pertanyaan serupa ini menunjuk pada implementasi yang ada yang dimaksudkan untuk meningkatkan dan stl.

Saya menawarkan implementasi threadpool yang sangat kasar untuk pertanyaan lain, yang tidak membahas banyak masalah yang diuraikan di atas. Anda mungkin ingin membangunnya. Anda mungkin juga ingin melihat kerangka kerja yang ada dalam bahasa lain, untuk mencari inspirasi.


(*) Saya tidak melihat itu sebagai masalah, justru sebaliknya. Saya pikir itu adalah semangat C ++ yang diwarisi dari C.

didierc
sumber
4
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
pio
sumber
2
Terima kasih! Ini benar-benar membantu saya memulai dengan operasi threading paralel. Saya akhirnya menggunakan versi implementasi Anda yang sedikit dimodifikasi.
Robbie Capps
3

Sesuatu seperti ini mungkin membantu (diambil dari aplikasi yang berfungsi).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

Anda bisa menggunakannya seperti ini:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Perlu diingat bahwa menciptakan kembali mekanisme antrian asinkron yang efisien bukanlah hal sepele.

Boost :: asio :: io_service adalah implementasi yang sangat efisien, atau sebenarnya adalah kumpulan pembungkus khusus platform (misalnya membungkus port penyelesaian I / O pada Windows).

rustyx
sumber
2
Apakah dorongan sebanyak itu diperlukan dengan C ++ 11? Bukankah, katakanlah, std::threadsudah cukup?
einpoklum
Tidak ada padanan stduntuk boost::thread_group. boost::thread_groupadalah kumpulan boost::threadinstance. Tapi tentu saja, itu sangat mudah untuk mengganti boost::thread_groupdengan vectordari std::threads.
rustyx
3

Sunting: Ini sekarang membutuhkan C ++ 17 dan konsep. (Pada 9/12/16, hanya g ++ 6.0+ yang cukup.)

Pengurangan template jauh lebih akurat karena itu, jadi itu sepadan dengan upaya mendapatkan kompiler yang lebih baru. Saya belum menemukan fungsi yang membutuhkan argumen templat eksplisit.

Itu juga sekarang mengambil objek callable yang sesuai ( dan masih statis typesafe !!! ).

Ini juga sekarang termasuk kolam utas prioritas threading hijau opsional menggunakan API yang sama. Kelas ini hanya POSIX. Ini menggunakan ucontext_tAPI untuk beralih tugas userspace.


Saya membuat perpustakaan sederhana untuk ini. Contoh penggunaan diberikan di bawah ini. (Saya menjawab ini karena itu adalah salah satu hal yang saya temukan sebelum saya memutuskan untuk menulisnya sendiri.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

Anda dapat melewati asyncfungsi apa pun dengan nilai pengembalian (atau batal) apa pun dan (atau tidak ada) argumen apa pun dan itu akan mengembalikan yang sesuai std::future. Untuk mendapatkan hasilnya (atau hanya menunggu sampai tugas selesai) Anda memanggil get()masa depan.

Inilah github: https://github.com/Tyler-Hardin/thread_pool .

Tyler
sumber
1
Tampak luar biasa, tetapi akan lebih baik untuk memiliki perbandingan dengan header vit-vit!
Jonathan H
1
@ Sh3ljohn, dari melihat itu, tampaknya mereka pada dasarnya sama di API. vit-vit menggunakan antrian penguncian boost, yang lebih baik dari milik saya. (Tapi tujuan saya secara khusus untuk melakukannya dengan std :: * saja. Saya kira saya dapat mengimplementasikan antrian lockfree sendiri, tetapi itu terdengar sulit dan rawan kesalahan.) Juga, vit-vit tidak memiliki .cpp terkait, yang lebih mudah digunakan untuk orang yang tidak tahu apa yang mereka lakukan. (Misalnya github.com/Tyler-Hardin/thread_pool/issues/1 )
Tyler
Ia juga memiliki solusi stl-only yang saya telah forking selama beberapa jam terakhir, pada awalnya itu terlihat lebih rumit daripada milik Anda dengan pointer bersama di semua tempat, tetapi ini sebenarnya diperlukan untuk menangani pengubahan ukuran panas dengan benar.
Jonathan H
@ Sh3ljohn, ah, saya tidak melihat ukurannya yang panas. Itu bagus. Saya memilih untuk tidak khawatir tentang hal itu karena itu tidak benar-benar dalam kasus penggunaan yang dimaksud. (Saya tidak bisa memikirkan kasus di mana saya ingin mengubah ukuran, secara pribadi, tapi itu bisa disebabkan oleh kurangnya imajinasi.)
Tyler
1
Contoh kasus penggunaan: Anda menjalankan RESTful API di server dan perlu mengurangi sementara alokasi sumber daya untuk keperluan pemeliharaan, tanpa harus mematikan layanan sepenuhnya.
Jonathan H
3

Ini adalah implementasi thread pool lain yang sangat sederhana, mudah dimengerti dan digunakan, hanya menggunakan pustaka standar C ++ 11, dan dapat dilihat atau dimodifikasi untuk penggunaan Anda, harus menjadi starter yang bagus jika Anda ingin menggunakan thread kolam:

https://github.com/progschj/ThreadPool

Tenang
sumber
3

Anda dapat menggunakan thread_pool dari boost library:

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

Anda juga dapat menggunakan threadpool dari komunitas open source:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
Amir Fo
sumber
1

Threadpool tanpa ketergantungan di luar STL sepenuhnya memungkinkan. Baru-baru ini saya menulis pustaka threadpool hanya header kecil untuk mengatasi masalah yang sama persis. Ini mendukung pengubahan ukuran pool dinamis (mengubah jumlah pekerja saat runtime), menunggu, berhenti, berhenti, melanjutkan dan seterusnya. Semoga bermanfaat.

tidak bisa dipercaya
sumber
sepertinya Anda telah menghapus akun github Anda (atau salah tautan). Sudahkah Anda memindahkan kode ini di tempat lain?
rtpax
1
@rtpax Saya sudah memindahkan repo - Saya memperbarui jawaban untuk mencerminkan itu.
cantordust