Keyboard Berinterupsi dengan Pool multi-pemrosesan python

136

Bagaimana saya bisa menangani peristiwa KeyboardInterrupt dengan Pools multi-pemrosesan python? Ini adalah contoh sederhana:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Ketika menjalankan kode di atas, KeyboardInterruptakan dinaikkan ketika saya menekan ^C, tetapi proses hanya hang pada saat itu dan saya harus membunuhnya secara eksternal.

Saya ingin dapat menekan ^Ckapan saja dan menyebabkan semua proses keluar dengan anggun.

Fragsworth
sumber
Saya memecahkan masalah saya menggunakan psutil, Anda dapat melihat solusinya di sini: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Jawaban:

137

Ini adalah bug Python. Saat menunggu kondisi dalam threading.Condition.wait (), KeyboardInterrupt tidak pernah dikirim. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Pengecualian KeyboardInterrupt tidak akan dikirim sampai wait () kembali, dan itu tidak pernah kembali, sehingga interupsi tidak pernah terjadi. KeyboardInterrupt hampir pasti mengganggu kondisi menunggu.

Perhatikan bahwa ini tidak terjadi jika batas waktu ditentukan; cond.wait (1) akan menerima interupsi segera. Jadi, solusinya adalah menentukan batas waktu. Untuk melakukannya, ganti

    results = pool.map(slowly_square, range(40))

dengan

    results = pool.map_async(slowly_square, range(40)).get(9999999)

atau serupa.

Glenn Maynard
sumber
3
Apakah bug ini di pelacak python resmi di mana saja? Saya kesulitan menemukannya tetapi saya mungkin tidak menggunakan istilah pencarian terbaik.
Joseph Garvin
18
Bug ini telah diajukan sebagai [Masalah 8296] [1]. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Berikut ini hack yang memperbaiki pool.imap () dengan cara yang sama, membuat Ctrl-C mungkin saat iterating over imap. Tangkap pengecualian dan panggil pool.terminate () dan program Anda akan keluar. gist.github.com/626518
Alexander Ljungberg
6
Ini tidak cukup memperbaiki masalah. Kadang-kadang saya mendapatkan perilaku yang diharapkan ketika saya menekan Control + C, di waktu lain tidak. Saya tidak yakin mengapa, tapi sepertinya The KeyboardInterrupt mungkin diterima oleh salah satu proses secara acak, dan saya hanya mendapatkan perilaku yang benar jika proses induk adalah yang menangkapnya.
Ryan C. Thompson
6
Ini tidak berfungsi untuk saya dengan Python 3.6.1 di Windows. Saya mendapatkan banyak jejak tumpukan dan sampah lainnya ketika saya melakukan Ctrl-C, yaitu sama seperti tanpa solusi seperti itu. Sebenarnya tidak ada solusi yang saya coba dari utas ini yang tampaknya berfungsi ...
szx
56

Dari apa yang saya temukan baru-baru ini, solusi terbaik adalah mengatur proses pekerja untuk mengabaikan SIGINT sama sekali, dan membatasi semua kode pembersihan ke proses induk. Ini memperbaiki masalah untuk proses pekerja yang menganggur dan sibuk, dan tidak memerlukan kode penanganan kesalahan dalam proses anak Anda.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Penjelasan dan contoh kode lengkap dapat ditemukan di http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ dan http://github.com/jreese/multiprocessing-keyboardinterrupt masing-masing.

John Reese
sumber
4
Hai John. Solusi Anda tidak mencapai hal yang sama dengan solusi saya, ya sayangnya rumit. Itu bersembunyi di balik time.sleep(10)dalam proses utama. Jika Anda menghapus tidur itu, atau jika Anda menunggu sampai proses mencoba untuk bergabung di kolam, yang harus Anda lakukan untuk menjamin pekerjaan selesai, maka Anda masih menderita masalah yang sama yang merupakan proses utama tidak dapat menerima KeyboardInterrupt ketika sedang menunggu pada joinoperasi pemungutan suara .
bboe
Dalam kasus di mana saya menggunakan kode ini dalam produksi, time.sleep () adalah bagian dari loop yang akan memeriksa status setiap proses anak, dan kemudian memulai kembali proses tertentu pada penundaan jika perlu. Daripada bergabung () yang akan menunggu semua proses untuk diselesaikan, itu akan memeriksa mereka secara individual, memastikan bahwa proses master tetap responsif.
John Reese
2
Jadi itu lebih merupakan penantian yang sibuk (mungkin dengan sedikit tidur di antara cek) yang disurvei untuk penyelesaian proses melalui metode lain daripada bergabung? Jika itu masalahnya, mungkin akan lebih baik untuk memasukkan kode ini dalam posting blog Anda, karena Anda kemudian dapat menjamin bahwa semua pekerja telah menyelesaikan sebelum mencoba untuk bergabung.
bboe
4
Ini tidak berhasil. Hanya anak-anak yang mengirim sinyal. Orang tua tidak pernah menerimanya, jadi pool.terminate()tidak pernah dieksekusi. Membuat anak-anak mengabaikan sinyal tidak menghasilkan apa-apa. @ Jawaban Glenn memecahkan masalah.
Cerin
1
Versi saya tentang ini ada di gist.github.com/admackin/003dd646e5fadee8b8d6 ; itu tidak memanggil .join()kecuali pada interupsi - itu hanya secara manual memeriksa hasil .apply_async()menggunakan AsyncResult.ready()untuk melihat apakah sudah siap, yang berarti kita sudah selesai.
Andy MacKinlay
29

Untuk beberapa alasan, hanya pengecualian yang diwarisi dari Exceptionkelas dasar yang ditangani secara normal. Sebagai solusinya, Anda dapat menaikkan kembali KeyboardInterruptsebagai Exceptioncontoh:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Biasanya Anda akan mendapatkan output berikut:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Jadi, jika Anda menekan ^C, Anda akan mendapatkan:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
sumber
2
Tampaknya ini bukan solusi lengkap. Jika a KeyboardInterrupttiba saat multiprocessingsedang melakukan pertukaran data IPC sendiri maka try..catchtidak akan diaktifkan (jelas).
Andrey Vlasovskikh
Anda bisa mengganti raise KeyboardInterruptErrordengan return. Anda hanya perlu memastikan bahwa proses anak berakhir segera setelah KeyboardInterrupt diterima. Nilai kembali tampaknya diabaikan, mainmasih KeyboardInterrupt diterima.
Bernhard
8

Biasanya struktur sederhana ini berfungsi untuk Ctrl- Cdi Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Sebagaimana dinyatakan dalam beberapa posting serupa:

Abadikan keyboardinterrupt dalam Python tanpa coba-kecuali

igco
sumber
1
Ini harus dilakukan pada masing-masing proses pekerja juga, dan mungkin masih gagal jika KeyboardInterrupt dinaikkan ketika pustaka multiprosesing mulai dijalankan.
MarioVilas
7

Jawaban yang dipilih tidak menangani masalah inti tetapi efek samping yang serupa.

Jesse Noller, penulis perpustakaan multiprosesing, menjelaskan cara menangani CTRL + C dengan benar saat menggunakan multiprocessing.Pooldalam posting blog lama .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
sumber
Saya telah menemukan bahwa ProcessPoolExecutor juga memiliki masalah yang sama. Satu-satunya perbaikan yang dapat saya temukan adalah menelepon os.setpgrp()dari dalam masa depan
portforwardpodcast
1
Tentu, satu-satunya perbedaan adalah bahwa ProcessPoolExecutortidak mendukung fungsi penginisialisasi. Di Unix, Anda dapat memanfaatkan forkstrategi dengan menonaktifkan sighandler pada proses utama sebelum membuat Pool dan mengaktifkannya kembali sesudahnya. Dalam kerikil , saya membungkam SIGINTproses anak secara default. Saya tidak mengetahui alasan mereka tidak melakukan hal yang sama dengan Pools Python. Pada akhirnya, pengguna dapat mengatur ulang SIGINTpawang jika dia ingin melukai dirinya sendiri.
noxdafox
Solusi ini tampaknya mencegah Ctrl-C mengganggu proses utama juga.
Paul Harga
1
Saya baru saja menguji pada Python 3.5 dan berhasil, versi Python apa yang Anda gunakan? OS apa?
noxdafox
5

Tampaknya ada dua masalah yang membuat pengecualian sementara multi-proses mengganggu. Yang pertama (dicatat oleh Glenn) adalah bahwa Anda perlu menggunakan map_asyncdengan batas waktu alih-alih mapuntuk mendapatkan tanggapan langsung (yaitu, jangan selesai memproses seluruh daftar). Yang kedua (dicatat oleh Andrey) adalah bahwa multiprocessing tidak menangkap pengecualian yang tidak diwarisi dari Exception(misalnya, SystemExit). Jadi, inilah solusi saya yang menangani keduanya:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Paul Price
sumber
1
Saya tidak melihat adanya penalti performa, tetapi dalam kasus saya functionini cukup berumur panjang (ratusan detik).
Paul Price
Ini sebenarnya bukan masalahnya lagi, setidaknya dari mataku dan pengalamanku. Jika Anda menangkap pengecualian keyboard di masing-masing proses anak dan menangkapnya sekali lagi dalam proses utama, maka Anda dapat terus menggunakan mapdan semuanya baik-baik saja. @Linux Cli Aikmemberikan solusi di bawah ini yang menghasilkan perilaku ini. Penggunaan map_asynctidak selalu diinginkan jika utas utama bergantung pada hasil dari proses anak.
Kode Doggo
4

Saya menemukan, untuk saat ini, solusi terbaik adalah tidak menggunakan fitur multiprocessing.pool melainkan memutar fungsi pool Anda sendiri. Saya memberikan contoh yang menunjukkan kesalahan dengan apply_async serta contoh yang menunjukkan bagaimana cara menghindari penggunaan fungsi kumpulan sama sekali.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
sumber
Bekerja seperti pesona. Ini adalah solusi bersih dan bukan semacam hack (/ saya pikir) .btw, trik dengan .get (99999) seperti yang diusulkan oleh orang lain sangat buruk kinerjanya.
Walter
Saya tidak melihat ada penalti kinerja dari penggunaan batas waktu, meskipun saya telah menggunakan 9999, bukan 999999. Pengecualiannya adalah ketika pengecualian yang tidak diwariskan dari kelas Exception dinaikkan: maka Anda harus menunggu sampai batas waktu habis memukul. Solusi untuk itu adalah dengan menangkap semua pengecualian (lihat solusi saya).
Paul Harga
1

Saya seorang pemula di Python. Saya mencari jawaban di mana-mana dan menemukan ini dan beberapa blog dan video youtube lainnya. Saya telah mencoba untuk menyalin tempel kode penulis di atas dan mereproduksinya di python 2.7.13 saya di windows 7 64-bit. Ini dekat dengan apa yang ingin saya capai.

Saya membuat proses anak saya untuk mengabaikan ControlC dan membuat proses induk berakhir. Sepertinya melewati proses anak tidak menghindari masalah ini untuk saya.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Bagian yang dimulai pada pool.terminate()sepertinya tidak pernah dieksekusi.

Linux Cli Aik
sumber
Saya baru saja menemukan ini juga! Jujur saya pikir ini adalah solusi terbaik untuk masalah seperti ini. Solusi yang diterima memaksa map_asyncke pengguna, yang saya khususnya tidak suka. Dalam banyak situasi, seperti punyaku, utas utama perlu menunggu proses masing-masing selesai. Ini adalah salah satu alasan mengapa mapada!
Kode Doggo
1

Anda dapat mencoba menggunakan metode apply_async dari objek Pool, seperti ini:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Keluaran:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Keuntungan dari metode ini adalah bahwa hasil yang diproses sebelum gangguan akan dikembalikan dalam kamus hasil:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
sumber
Contoh yang mulia dan lengkap
EMTT
-5

Anehnya sepertinya Anda harus menangani KeyboardInterruptanak-anak juga. Saya berharap ini berfungsi seperti yang tertulis ... coba ubah slowly_squareke:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Itu seharusnya bekerja seperti yang Anda harapkan.

D.Shawley
sumber
1
Saya mencoba ini, dan itu tidak benar-benar menghentikan seluruh rangkaian pekerjaan. Ini menghentikan pekerjaan yang sedang berjalan, tetapi skrip masih menetapkan pekerjaan yang tersisa di panggilan pool.map seolah-olah semuanya normal.
Fragsworth
ini OK, tetapi Anda mungkin kehilangan jejak kesalahan yang terjadi. mengembalikan kesalahan dengan stacktrace mungkin berhasil sehingga proses induk dapat mengatakan bahwa kesalahan terjadi, tetapi masih tidak keluar segera ketika kesalahan terjadi.
mehtunguh