Threading pool mirip dengan Pool multiprosesing?

347

Apakah ada kelas Pool untuk thread pekerja , mirip dengan kelas Pool modul multiprosesor ?

Saya suka misalnya cara mudah untuk memparalelkan fungsi peta

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

namun saya ingin melakukannya tanpa overhead untuk menciptakan proses baru.

Saya tahu tentang GIL. Namun, dalam usecase saya, fungsi tersebut akan menjadi fungsi C terikat IO di mana pembungkus python akan melepaskan GIL sebelum pemanggilan fungsi yang sebenarnya.

Apakah saya harus menulis kolam threading sendiri?

Martin
sumber
Berikut ini sesuatu yang terlihat menjanjikan lebih dalam Python Cookbook: Resep 576.519: Thread pool dengan API yang sama seperti (multi) processing.Pool (Python)
otherchirps
1
Saat itu built-in: from multiprocessing.pool import ThreadPool.
martineau
Bisakah Anda menguraikan ini I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.?
mrgloom

Jawaban:

448

Saya baru tahu bahwa sebenarnya adalah antarmuka Renang benang berbasis di multiprocessingmodul, namun tersembunyi agak dan tidak didokumentasikan dengan baik.

Itu dapat diimpor melalui

from multiprocessing.pool import ThreadPool

Ini diimplementasikan menggunakan kelas Proses dummy yang membungkus thread python. Kelas Proses berbasis utas ini dapat ditemukan di multiprocessing.dummymana disebutkan secara singkat dalam dokumen . Modul dummy ini seharusnya menyediakan seluruh antarmuka multi-pemrosesan berdasarkan utas.

Martin
sumber
5
Itu luar biasa. Saya punya masalah membuat ThreadPools di luar utas utama, Anda dapat menggunakannya dari utas anak yang pernah dibuat sekalipun. Saya memasukkan masalah ke dalamnya: bugs.python.org/issue10015
Olson
82
Saya tidak mengerti mengapa kelas ini tidak memiliki dokumentasi. Kelas pembantu seperti itu sangat penting saat ini.
Wernight
18
@Wernight: ini bukan publik terutama karena tidak ada yang menawarkan patch yang menyediakannya (atau yang serupa) sebagai threading.ThreadPool, termasuk dokumentasi dan tes. Memang akan menjadi baterai yang baik untuk dimasukkan dalam perpustakaan standar, tetapi itu tidak akan terjadi jika tidak ada yang menulisnya. Salah satu keuntungan bagus dari implementasi yang ada ini dalam multi-pemrosesan, adalah bahwa hal itu akan membuat tambalan threading seperti itu lebih mudah untuk ditulis ( docs.python.org/devguide )
ncoghlan
3
@ daniel.gindi: multiprocessing.dummy.Pool/ multiprocessing.pool.ThreadPooladalah hal yang sama, dan keduanya kolam utas. Mereka meniru antarmuka dari kumpulan proses, tetapi mereka diimplementasikan sepenuhnya dalam hal threading. Baca ulang dokumen, Anda mendapatkannya mundur.
ShadowRanger
9
@ daniel.gindi: Baca lebih lanjut : " multiprocessing.dummymereplikasi API multiprocessingtetapi tidak lebih dari pembungkus threadingmodul." multiprocessingsecara umum adalah tentang proses, tetapi untuk memungkinkan peralihan antara proses dan utas, mereka (kebanyakan) mereplikasi multiprocessingAPI multiprocessing.dummy, tetapi didukung dengan utas, bukan proses. Tujuannya adalah agar Anda import multiprocessing.dummy as multiprocessingdapat mengubah kode berbasis proses menjadi berbasis thread.
ShadowRanger
236

Di Python 3 Anda bisa menggunakan concurrent.futures.ThreadPoolExecutor, yaitu:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Lihat dokumen untuk info lebih lanjut dan contoh.

Adrian Adamiak
sumber
6
untuk menggunakan modul berjangka yang di-backport, jalankansudo pip install futures
yair
itu cara paling efisien dan tercepat untuk multi pemrosesan
Haritsinh Gohil
2
Apa perbedaan antara menggunakan ThreadPoolExecutordan multiprocessing.dummy.Pool?
Jay
2
dari concurrent.futures import ThreadPoolExecutor
stackOverlord
63

Ya, dan tampaknya memiliki (kurang lebih) API yang sama.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
peperangan
sumber
9
Jalur impor ThreadPoolberbeda Pool. Impor yang benar adalah from multiprocessing.pool import ThreadPool.
Marigold
2
Anehnya ini bukan API yang terdokumentasi, dan multiprocessing.pool hanya disebutkan secara singkat sebagai menyediakan AsyncResult. Tetapi tersedia dalam 2.x dan 3.x.
Marvin
2
Ini yang saya cari. Itu hanya satu baris impor dan perubahan kecil ke baris pool yang ada dan berfungsi dengan baik.
Danegraphics
39

Untuk sesuatu yang sangat sederhana dan ringan (sedikit dimodifikasi dari sini ):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Untuk mendukung panggilan balik pada penyelesaian tugas, Anda bisa menambahkan panggilan balik ke tugas tuple.

Dgorissen
sumber
bagaimana utas dapat bergabung jika mereka tanpa batas tanpa batas?
Joseph Garvin
@JosephGarvin Saya sudah mengujinya, dan utasnya terus memblokir antrian kosong (karena panggilan untuk Queue.get()memblokir) sampai program berakhir, setelah itu mereka dihentikan secara otomatis.
forumulator
@ JosephephGarvin, pertanyaan bagus. Queue.join()akan benar-benar bergabung dengan antrian tugas, bukan utas pekerja. Jadi, ketika antrian kosong, wait_completionpengembalian, program berakhir, dan utas menuai oleh OS.
randomir
Jika semua kode ini terbungkus dalam fungsi yang rapi, sepertinya tidak akan menghentikan utas bahkan ketika antrian kosong dan pool.wait_completion()kembali. Hasilnya adalah bahwa utas terus membangun.
ubiquibacon
17

Hai untuk menggunakan kolam utas dengan Python Anda dapat menggunakan perpustakaan ini:

from multiprocessing.dummy import Pool as ThreadPool

dan kemudian untuk digunakan, perpustakaan ini melakukan seperti itu:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

Utas adalah jumlah utas yang Anda inginkan dan tugas adalah daftar tugas yang sebagian besar dipetakan ke layanan.

Manochehr Rasouli
sumber
Terima kasih, itu saran yang bagus! Dari dokumen: multiprocessing.dummy mereplikasi API multiprocessing tetapi tidak lebih dari pembungkus di sekitar modul threading. Satu koreksi - saya pikir Anda ingin mengatakan bahwa api pool adalah (fungsi, dapat
diubah
2
Kami melewatkan .close()dan .join()panggilan dan yang menyebabkan .map()untuk menyelesaikan sebelum semua utas selesai. Hanya sebuah peringatan.
Anatoly Scherbakov
8

Inilah hasil akhirnya saya gunakan. Ini adalah versi modifikasi dari kelas oleh dgorissen di atas.

Mengajukan: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Untuk menggunakan kolam renang

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
forumulator
sumber
Annotion untuk pembaca lain: Kode ini Python 3 (shebang #!/usr/bin/python3)
Daniel Marschall
Mengapa Anda menggunakan for i, d in enumerate(delays):lalu mengabaikan inilainya?
martineau
@martineau - mungkin hanya peninggalan dari pengembangan di mana mereka mungkin ingin mencetak iselama menjalankan.
n1k31t4
Kenapa create_taskdisana? Untuk apa ini?
MrR
Saya tidak percaya dan menjawab dengan 4 suara di SO adalah cara untuk melakukan ThreadPooling dengan Python. Threadpool dalam distribusi python resmi masih rusak? Apa yang saya lewatkan?
MrR
2

Biaya tambahan untuk menciptakan proses baru sangat minim, terutama ketika hanya 4 dari mereka. Saya ragu ini adalah hot spot kinerja aplikasi Anda. Sederhanakan, optimalkan ke mana Anda harus pergi dan ke mana profiling menunjukkan hasil.

unbeli
sumber
5
Jika si penanya ada di bawah Windows (yang saya tidak yakin dia sebutkan), maka saya pikir proses spinup bisa menjadi biaya yang signifikan. Setidaknya pada proyek yang saya lakukan baru-baru ini. :-)
Brandon Rhodes
1

Tidak ada kolam berbasis benang yang dibangun. Namun, bisa sangat cepat untuk mengimplementasikan antrian produsen / konsumen dengan Queuekelas.

Dari: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Yann Ramin
sumber
3
Ini tidak lagi berlaku pada concurrent.futuresmodul.
Thanatos
11
Saya tidak berpikir ini benar sama sekali lagi. from multiprocessing.pool import ThreadPool
Randall Hunt
0

cara lain dapat menambahkan proses ke kumpulan antrian

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(0, len(list_of_files) - 1):
        a = executor.submit(loop_files2, i, list_of_files2, mt_list, temp_path, mt_dicto)
pelos
sumber