Bingung saat boost :: asio :: io_service menjalankan blok metode / membuka blokir

91

Menjadi pemula total untuk Boost.Asio, saya bingung io_service::run(). Saya akan sangat menghargai jika seseorang dapat menjelaskan kepada saya ketika metode ini memblokir / membuka blokir. Dokumentasi menyatakan:

The run()blok fungsi sampai semua pekerjaan telah selesai dan tidak ada lagi penangan yang akan diberangkatkan, atau sampai io_servicetelah dihentikan.

Beberapa utas dapat memanggil run()fungsi untuk menyiapkan kumpulan utas tempat io_servicepenangan dapat mengeksekusi. Semua utas yang menunggu di pangkalan adalah setara dan io_servicedapat memilih salah satu dari mereka untuk memanggil penangan.

Keluar normal dari run()fungsi menyiratkan bahwa io_serviceobjek dihentikan ( stopped()fungsi mengembalikan nilai true). Panggilan berikutnya untuk run(), run_one(), poll()atau poll_one()akan segera kembali kecuali ada panggilan sebelum reset().

Apa maksud dari pernyataan berikut?

[...] tidak ada lagi penangan yang akan dikirim [...]


Saat mencoba memahami perilaku io_service::run(), saya menemukan contoh ini (contoh 3a). Di dalamnya, saya mengamati bahwa io_service->run()memblokir dan menunggu perintah kerja.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

Namun, dalam kode berikut yang saya kerjakan, klien menghubungkan menggunakan TCP / IP dan blok metode jalankan hingga data diterima secara asinkron.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Penjelasan apa pun run()yang menggambarkan perilakunya dalam dua contoh di bawah ini akan dihargai.

MistyD
sumber

Jawaban:

238

Dasar

Mari kita mulai dengan contoh yang disederhanakan dan memeriksa Boost. Asio yang relevan:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

Apa itu Handler ?

Seorang penangan tidak lebih dari sebuah panggilan balik. Di kode contoh, ada 3 penangan:

  • The printhandler (1).
  • The handle_async_receivehandler (3).
  • The printhandler (4).

Meskipun print()fungsi yang sama digunakan dua kali, setiap penggunaan dianggap membuat penangannya sendiri yang dapat diidentifikasi secara unik. Penangan bisa datang dalam berbagai bentuk dan ukuran, mulai dari fungsi dasar seperti yang di atas hingga konstruksi yang lebih kompleks seperti fungsi yang dihasilkan dari boost::bind()dan lambda. Terlepas dari kerumitannya, handler tetap tidak lebih dari callback.

Apa Itu Pekerjaan ?

Pekerjaan adalah beberapa pemrosesan yang diminta oleh Boost.Asio untuk dilakukan atas nama kode aplikasi. Terkadang Boost.Asio dapat memulai beberapa pekerjaan segera setelah diberitahu tentangnya, dan di lain waktu mungkin menunggu untuk melakukan pekerjaan di lain waktu. Setelah menyelesaikan pekerjaan, Boost.Asio akan menginformasikan aplikasi dengan memanggil penangan yang disediakan .

Boost.Asio menjamin bahwa penangan hanya akan berjalan dalam thread yang saat menelepon run(), run_one(), poll(), atau poll_one(). Ini adalah utas yang akan berfungsi dan menangani panggilan . Oleh karena itu, dalam contoh di atas, print()tidak dipanggil saat diposting ke io_service(1). Sebaliknya, ini ditambahkan ke io_servicedan akan dipanggil di lain waktu. Dalam hal ini, itu dalam io_service.run()(5).

Apa Itu Operasi Asinkron?

Sebuah operasi asynchronous menciptakan pekerjaan dan Boost.Asio akan memanggil handler untuk menginformasikan aplikasi ketika pekerjaan telah selesai. Operasi asinkron dibuat dengan memanggil fungsi yang memiliki nama dengan awalan async_. Fungsi ini juga dikenal sebagai fungsi inisiasi .

Operasi asinkron dapat diuraikan menjadi tiga langkah unik:

  • Memulai, atau menginformasikan, pekerjaan terkait io_serviceyang perlu dilakukan. The async_receiveOperasi (3) menginformasikan io_servicebahwa akan membutuhkan data asynchronous dibaca dari soket, kemudian async_receivesegera kembali.
  • Melakukan pekerjaan yang sebenarnya. Dalam hal ini, ketika socketmenerima data, byte akan dibaca dan disalin ke buffer. Pekerjaan sebenarnya akan dilakukan di:
    • Fungsi inisiasi (3), jika Boost.Asio dapat menentukan bahwa itu tidak akan diblokir.
    • Ketika aplikasi secara eksplisit menjalankan io_service(5).
  • Menyerukan handle_async_receive ReadHandler . Sekali lagi, penangan hanya dipanggil di dalam utas yang menjalankan io_service. Jadi, terlepas dari kapan pekerjaan selesai (3 atau 5), dijamin handle_async_receive()hanya akan dipanggil dalam io_service.run()(5).

Pemisahan dalam ruang dan waktu antara ketiga langkah ini dikenal sebagai inversi aliran kontrol. Ini adalah salah satu kompleksitas yang membuat pemrograman asinkron menjadi sulit. Namun, ada beberapa teknik yang dapat membantu menguranginya, seperti dengan menggunakan coroutine .

Apa yang io_service.run()Dilakukan?

Saat utas memanggil io_service.run(), pekerjaan dan penangan akan dipanggil dari dalam utas ini. Dalam contoh di atas, io_service.run()(5) akan memblokir hingga:

  • Ini telah dipanggil dan dikembalikan dari kedua printpenangan, operasi penerimaan selesai dengan keberhasilan atau kegagalan, dan handle_async_receivepenangannya telah dipanggil dan dikembalikan.
  • Ini io_servicesecara eksplisit dihentikan melalui io_service::stop().
  • Pengecualian dilemparkan dari dalam sebuah handler.

Salah satu aliran psuedo-ish potensial dapat dijelaskan sebagai berikut:

buat io_service
buat soket
tambahkan penangan cetak ke io_service (1)
tunggu soket untuk terhubung (2)
tambahkan permintaan kerja baca asinkron ke io_service (3)
tambahkan penangan cetak ke io_service (4)
jalankan io_service (5)
  apakah ada pekerjaan atau penangan?
    ya, ada 1 pekerjaan dan 2 penangan
      apakah soket memiliki data? tidak, jangan lakukan apa-apa
      jalankan penangan cetak (1)
  apakah ada pekerjaan atau penangan?
    ya, ada 1 pekerjaan dan 1 penangan
      apakah soket memiliki data? tidak, jangan lakukan apa-apa
      jalankan penangan cetak (4)
  apakah ada pekerjaan atau penangan?
    ya, ada 1 pekerjaan
      apakah soket memiliki data? tidak, lanjutkan menunggu
  - soket menerima data -
      socket memiliki data, membacanya menjadi buffer
      tambahkan handler handle_async_receive ke io_service
  apakah ada pekerjaan atau penangan?
    ya, ada 1 pawang
      jalankan handle_async_receive handler (3)
  apakah ada pekerjaan atau penangan?
    tidak, setel io_service sebagai berhenti dan kembali

Perhatikan bagaimana ketika pembacaan selesai, itu menambahkan penangan lain ke io_service. Detail halus ini adalah fitur penting dari pemrograman asinkron. Ini memungkinkan pawang untuk dirantai bersama. Misalnya, jika handle_async_receivetidak mendapatkan semua data yang diharapkan, implementasinya dapat memposting operasi pembacaan asinkron lainnya, yang menghasilkan io_servicelebih banyak pekerjaan, dan dengan demikian tidak kembali dari io_service.run().

Apakah dicatat bahwa ketika io_servicememiliki berlari keluar dari pekerjaan, aplikasi harus reset()yang io_servicesebelum menjalankan lagi.


Contoh Soal dan Contoh kode 3a

Sekarang, mari kita periksa dua bagian kode yang dirujuk dalam pertanyaan itu.

Kode Pertanyaan

socket->async_receivemenambahkan pekerjaan ke io_service. Dengan demikian, io_service->run()akan memblokir hingga operasi baca selesai dengan keberhasilan atau kesalahan, dan ClientReceiveEventtelah selesai berjalan atau melontarkan pengecualian.

Contoh 3a Kode

Dengan harapan membuatnya lebih mudah untuk dipahami, berikut adalah Contoh 3a beranotasi yang lebih kecil:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

Pada level tinggi, program akan membuat 2 thread yang akan memproses io_serviceevent loop (2). Ini menghasilkan kumpulan utas sederhana yang akan menghitung angka Fibonacci (3).

Satu perbedaan utama antara Kode Pertanyaan dan kode ini adalah bahwa kode ini memanggil io_service::run()(2) sebelum pekerjaan aktual dan penangan ditambahkan ke io_service(3). Untuk mencegah agar tidak io_service::run()segera kembali, sebuah io_service::workobjek dibuat (1). Objek ini mencegah io_servicedari kehabisan pekerjaan; oleh karena itu, io_service::run()tidak akan kembali karena tidak ada pekerjaan.

Alur keseluruhan adalah sebagai berikut:

  1. Buat dan tambahkan io_service::workobjek yang ditambahkan ke io_service.
  2. Kumpulan benang dibuat yang memanggil io_service::run(). Untaian pekerja ini tidak akan kembali dari io_servicekarena io_service::workobjeknya.
  3. Tambahkan 3 penangan yang menghitung angka Fibonacci ke io_service, dan segera kembali. Rangkaian pekerja, bukan utas utama, mungkin mulai menjalankan penangan ini dengan segera.
  4. Hapus io_service::workobjek tersebut.
  5. Tunggu hingga thread pekerja selesai dijalankan. Ini hanya akan terjadi setelah ketiga penangan menyelesaikan eksekusi, karena io_servicetidak ada penangan atau pun pekerjaan.

Kode dapat ditulis berbeda, dengan cara yang sama seperti Kode Asli, di mana penangan ditambahkan ke io_service, dan kemudian io_serviceperulangan kejadian diproses. Ini menghilangkan kebutuhan untuk menggunakan io_service::work, dan menghasilkan kode berikut:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Sinkron vs. Asinkron

Meskipun kode dalam pertanyaan menggunakan operasi asinkron, kode ini berfungsi secara efektif secara sinkron, karena sedang menunggu operasi asinkron selesai:

socket.async_receive(buffer, handler)
io_service.run();

setara dengan:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Sebagai pedoman umum, cobalah untuk menghindari pencampuran operasi sinkron dan asinkron. Seringkali, ini dapat mengubah sistem yang kompleks menjadi sistem yang rumit. Jawaban ini menyoroti keunggulan pemrograman asinkron, beberapa di antaranya juga tercakup dalam dokumentasi Boost.Asio .

Tanner Sansbury
sumber
13
Posting yang luar biasa. Saya ingin menambahkan hanya satu hal karena saya merasa tidak cukup diperhatikan: Setelah run () kembali, Anda perlu memanggil reset () di io_service sebelum Anda dapat menjalankan () lagi. Jika tidak, ia dapat langsung kembali apakah ada operasi async_ yang menunggu atau tidak.
DeVadder
Darimana buffer berasal? Apa itu?
ruipacheco
Saya masih bingung. Jika pencampuran sinkron dan asinkron tidak disarankan, lalu apa mode asinkron murni? dapatkah Anda memberikan contoh yang menunjukkan kode tanpa io_service.run () ;?
Splash
@Splash Satu dapat digunakan io_service.poll()untuk memproses event loop tanpa memblokir operasi yang luar biasa. Rekomendasi utama untuk menghindari pencampuran operasi sinkron dan asinkron adalah untuk menghindari penambahan kompleksitas yang tidak perlu, dan untuk mencegah respons yang buruk saat penangan membutuhkan waktu lama untuk menyelesaikannya. Ada beberapa kasus di mana ini aman, seperti ketika seseorang tahu bahwa operasi sinkron tidak akan memblokir.
Tanner Sansbury
Apa yang Anda maksud dengan "saat ini" dalam "Boost.Asio menjamin bahwa penangan hanya akan berjalan dalam utas yang saat ini memanggilrun() ...." ? Jika ada N thread (yang telah dipanggil run()), lalu mana yang merupakan thread "saat ini"? Bisa banyak? Atau maksud Anda utas yang telah selesai menjalankan async_*()(katakanlah async_read), dijamin akan memanggil penangannya juga?
Nawaz
19

Untuk menyederhanakan bagaimana run, anggap saja sebagai karyawan yang harus memproses setumpuk kertas; ia mengambil satu lembar, melakukan apa yang diperintahkan lembar itu, membuang lembar itu dan mengambil lembar berikutnya; ketika dia kehabisan sprei, itu meninggalkan kantor. Di setiap lembar bisa ada instruksi apa pun, bahkan menambahkan lembar baru ke tumpukan. Kembali ke ASIO: Anda dapat memberikan ke io_servicepekerjaan dalam dua cara, pada dasarnya: dengan menggunakan postdi atasnya seperti dalam sampel Anda terhubung, atau dengan menggunakan benda-benda lain yang secara internal memanggil postpada io_service, seperti socketdan yang async_*metode.

Loghorn
sumber