Bagaimana saya harus login sambil menggunakan multiprosesor dengan Python?

239

Saat ini saya memiliki modul pusat dalam kerangka kerja yang memunculkan beberapa proses menggunakan multiprocessingmodul Python 2.6 . Karena itu digunakan multiprocessing, ada log tingkat-sadar multiprocessing modul LOG = multiprocessing.get_logger(),. Per dokumen , logger ini memiliki kunci proses bersama sehingga Anda tidak memutarbalikkan hal-hal sys.stderr(atau file apa pun yang ditangani) dengan meminta beberapa proses menulis secara bersamaan.

Masalah yang saya miliki sekarang adalah bahwa modul-modul lain dalam kerangka kerja ini tidak sadar multiprosesor. Cara saya melihatnya, saya perlu membuat semua dependensi pada modul pusat ini menggunakan pencatatan multiprocessing-aware. Itu menjengkelkan dalam kerangka kerja, apalagi untuk semua klien kerangka kerja. Apakah ada alternatif yang tidak saya pikirkan?

cdleary
sumber
10
Dokumen yang Anda tautkan, nyatakan kebalikan dari apa yang Anda katakan, pencatat tidak memiliki proses berbagi kunci dan segala sesuatunya campur aduk - masalah yang saya alami juga.
Sebastian Blask
3
lihat contoh di dokumen stdlib: Masuk ke satu file dari banyak proses . Resep-resep ini tidak membutuhkan modul-modul lain agar sadar multi-proses.
jfs
Jadi, untuk apa gunanya multiprocessing.get_logger()? Tampaknya berdasarkan pada cara-cara lain untuk melakukan logging ini adalah fungsionalitas logging yang multiprocessingnilainya kecil.
Tim Ludwinski
4
get_logger()adalah logger yang digunakan oleh multiprocessingmodul itu sendiri. Berguna jika Anda ingin men-debug multiprocessingmasalah.
jfs

Jawaban:

69

Satu-satunya cara untuk menangani hal ini secara non-intrusively adalah dengan:

  1. Menelurkan setiap pekerja proses sehingga log-nya pergi ke deskriptor file yang berbeda (ke disk atau ke pipa.) Idealnya, semua entri log harus cap waktu.
  2. Proses controller Anda kemudian dapat melakukan salah satu dari yang berikut:
    • Jika menggunakan file disk: Menyatukan file log di akhir proses, diurutkan berdasarkan cap waktu
    • Jika menggunakan pipa (disarankan): Menyatukan entri log dengan cepat dari semua pipa, ke file log pusat. (Mis., Secara berkala selectdari deskriptor file pipa, lakukan penggabungan-sort pada entri log yang tersedia, dan siram ke log terpusat. Ulangi.)
vladr
sumber
Bagus, itu 35-an sebelum saya memikirkan itu (saya pikir saya akan menggunakan atexit:-). Masalahnya adalah itu tidak akan memberi Anda pembacaan waktu nyata. Ini mungkin merupakan bagian dari harga multiprocessing yang bertentangan dengan multithreading.
cdleary
@cdleary, menggunakan pendekatan perpipaan akan sedekat mungkin dengan waktu yang dapat diperoleh (terutama jika stderr tidak disangga dalam proses spawned.)
vladr
1
Kebetulan, asumsi besar di sini: bukan Windows. Apakah Anda menggunakan Windows?
vladr
22
Mengapa tidak menggunakan multiprocessing.Queue dan thread logging di proses utama sebagai gantinya? Tampak lebih sederhana.
Brandon Rhodes
1
@BrandonRhodes - Seperti yang saya katakan, non-intrusively . Menggunakan multiprocessing.Queuetidak akan lebih mudah jika ada banyak kode yang perlu dipasang kembali untuk digunakan multiprocessing.Queue, dan / atau jika kinerja merupakan masalah
vladr
122

Saya baru saja menulis log handler saya sendiri yang hanya memberi makan segalanya untuk proses induk melalui pipa. Saya hanya mengujinya selama sepuluh menit tetapi tampaknya bekerja dengan cukup baik.

( Catatan: Ini adalah hardcoded RotatingFileHandler, yang merupakan kasus penggunaan saya sendiri.)


Pembaruan: @javier sekarang mempertahankan pendekatan ini sebagai paket yang tersedia di Pypi - lihat multiprocessing-logging di Pypi, github di https://github.com/jruere/multiprocessing-logging


Perbarui: Implementasi!

Ini sekarang menggunakan antrian untuk penanganan konkurensi yang benar, dan juga pulih dari kesalahan dengan benar. Saya sekarang telah menggunakan ini dalam produksi selama beberapa bulan, dan versi saat ini di bawah ini berfungsi tanpa masalah.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
sumber
4
Penangan di atas melakukan semua penulisan file dari proses induk dan menggunakan hanya satu utas untuk menerima pesan yang dikirimkan dari proses anak. Jika Anda memanggil pawang itu sendiri dari proses anak yang baru lahir maka itu salah menggunakannya, dan Anda akan mendapatkan semua masalah yang sama seperti RotatingFileHandler. Saya telah menggunakan kode di atas selama bertahun-tahun tanpa masalah.
zzzeek
9
Sayangnya pendekatan ini tidak berfungsi di Windows. Dari docs.python.org/library/multiprocessing.html 16.6.2.12 "Perhatikan bahwa pada Windows proses anak hanya akan mewarisi tingkat logger proses induk - kustomisasi lain dari logger tidak akan diwarisi." Subproses tidak akan mewarisi penangan, dan Anda tidak dapat meneruskannya secara eksplisit karena tidak dapat dipilih.
Noah Yetter
2
Perlu dicatat bahwa multiprocessing.Queuemenggunakan utas untuk masuk put(). Jadi jangan memohon put(mis. Log msg menggunakan MultiProcessingLoghandler) sebelum membuat semua subproses. Kalau tidak, utas akan mati dalam proses anak. Salah satu solusinya adalah menelepon Queue._after_fork()pada awal setiap proses anak, atau menggunakannya multiprocessing.queues.SimpleQueue, yang tidak melibatkan utas tetapi menghalangi.
Danqi Wang
5
Bisakah Anda menambahkan contoh sederhana yang menunjukkan inisialisasi, serta penggunaan dari proses anak hipotetis? Saya tidak yakin bagaimana proses anak seharusnya mendapatkan akses ke antrian tanpa membuat instance dari kelas Anda.
JesseBuesking
11
@zzzeek, ​​solusi ini bagus tetapi saya tidak dapat menemukan paket dengan itu atau sesuatu yang serupa jadi saya membuat satu yang disebut multiprocessing-logging.
Javier
30

QueueHandleradalah asli dalam Python 3.2+, dan melakukan hal ini. Ini mudah direplikasi di versi sebelumnya.

Python docs memiliki dua contoh lengkap: Masuk ke satu file dari beberapa proses

Bagi mereka yang menggunakan Python <3.2, cukup salin QueueHandlerke kode Anda sendiri dari: https://gist.github.com/vsajip/591589 atau impor logutils alternatif .

Setiap proses (termasuk proses induk) menempatkan logging pada Queue, dan kemudian listenerutas atau proses (satu contoh disediakan untuk masing-masing) mengambil mereka dan menulis semuanya ke file - tidak ada risiko korupsi atau kekacauan.

fantabolous
sumber
21

Di bawah ini adalah solusi lain dengan fokus pada kesederhanaan untuk orang lain (seperti saya) yang datang ke sini dari Google. Penebangan harus mudah! Hanya untuk 3,2 atau lebih tinggi.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
pengguna2133814
sumber
2
The QueueHandlerdan QueueListenerkelas dapat digunakan pada Python 2,7 juga, tersedia dalam logutilspaket.
Lev Levitsky
5
Logger dari proses utama juga harus menggunakan QueueHandler. Dalam kode Anda saat ini, proses utama melewati antrian sehingga bisa ada kondisi balapan antara proses utama dan yang pekerja. Setiap orang harus masuk ke antrian (melalui QueueHandler) dan hanya QueueListener yang boleh masuk ke StreamHandler.
Ismael EL ATIFI
Juga, Anda tidak harus menginisialisasi logger pada setiap anak. Inisialkan logger di proses induk, dan dapatkan logger di setiap proses anak.
okwap
20

Namun alternatif lain mungkin berbagai penangan logging berbasis file dalam loggingpaket :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(dan lain-lain)

Dengan cara ini, Anda dapat dengan mudah memiliki daemon logging di suatu tempat yang dapat Anda tulis dengan aman dan akan menangani hasilnya dengan benar. (Misalnya, server soket sederhana yang hanya mengurai pesan dan memancarkannya ke penangan file rotasinya sendiri.)

The SyslogHandlerakan mengurus ini untuk Anda juga. Tentu saja, Anda bisa menggunakan contoh Anda sendiri syslog, bukan yang sistem.

Ali Afshar
sumber
13

Varian yang lain yang membuat log logging dan antrian terpisah.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
tukang besi
sumber
Saya suka ide mengambil nama logger dari catatan antrian. Hal ini memungkinkan untuk menggunakan konvensional fileConfig()di MainProcess dan logger yang hampir tidak dikonfigurasi di PoolWorkers (hanya dengan setLevel(logging.NOTSET)) Seperti yang saya sebutkan di komentar lain, saya menggunakan Pool sehingga saya harus mendapatkan Antrian (proxy) saya dari Manajer alih-alih multiprocessing sehingga bisa diambil. Ini memungkinkan saya untuk mengantri ke pekerja di dalam kamus (sebagian besar berasal dari objek argsparse menggunakan vars()). Saya merasa seperti pada akhirnya ini adalah pendekatan terbaik untuk MS Windows yang tidak memiliki fork () dan merusak solusi @zzzeak.
mlt
@mlt Saya pikir Anda juga bisa meletakkan Antrian multi-proses di init daripada menggunakan Manajer (lihat jawaban untuk stackoverflow.com/questions/25557686/… - ini tentang Kunci tetapi saya yakin ini juga berfungsi untuk Antrian)
fantabolous
@fantabolous Itu tidak akan berfungsi pada MS Windows atau platform lain yang kurang fork. Dengan begitu setiap proses akan memiliki antrian tidak berguna yang independen. Pendekatan kedua dalam Tanya Jawab terkait tidak akan berfungsi pada platform tersebut. Ini adalah cara untuk kode non-portabel.
mlt
@mlt Menarik. Saya menggunakan Windows dan tampaknya berfungsi baik untuk saya - tidak lama setelah saya terakhir berkomentar saya menyiapkan sekelompok proses berbagi multiprocessing.Queuedengan proses utama dan saya telah menggunakannya terus-menerus sejak itu. Tidak akan mengaku mengerti mengapa itu bekerja.
fantabolous
10

Semua solusi saat ini terlalu digabungkan ke konfigurasi logging dengan menggunakan handler. Solusi saya memiliki arsitektur dan fitur berikut:

  • Anda dapat menggunakan setiap konfigurasi penebangan yang Anda inginkan
  • Logging dilakukan di daemon thread
  • Shutdown yang aman dari daemon dengan menggunakan manajer konteks
  • Komunikasi ke utas pendataan dilakukan oleh multiprocessing.Queue
  • Dalam subproses, logging.Logger(dan contoh yang sudah ditentukan) ditambal untuk mengirim semua catatan ke antrian
  • Baru : memformat traceback dan pesan sebelum mengirim ke antrian untuk mencegah kesalahan pengawetan

Kode dengan contoh penggunaan dan keluaran dapat ditemukan di Intisari berikut: https://gist.github.com/schlamar/7003737

schlamar
sumber
Kecuali aku kehilangan sesuatu, ini tidak benar-benar benang daemon, karena Anda tidak pernah mengatur daemon_thread.daemonuntuk True. Saya perlu melakukan itu untuk mendapatkan program Python saya untuk keluar dengan benar ketika pengecualian terjadi dalam manajer konteks.
blah238
Saya juga diperlukan untuk menangkap, log dan menelan pengecualian dilemparkan oleh target funcdi logged_call, jika pengecualian akan mendapatkan kacau dengan output bekas tebangan lainnya. Ini versi modifikasi saya tentang ini: gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf
blah238
8

Karena kami dapat merepresentasikan multiprocess logging karena banyak penerbit dan satu pelanggan (pendengar), menggunakan ZeroMQ untuk mengimplementasikan pesan PUB-SUB memang merupakan pilihan.

Selain itu, modul PyZMQ , binding Python untuk ZMQ, mengimplementasikan PUBHandler , yang merupakan objek untuk mempublikasikan pesan logging melalui soket zmq.PUB.

Ada solusi di web , untuk logging terpusat dari aplikasi terdistribusi menggunakan PyZMQ dan PUBHandler, yang dapat dengan mudah diadopsi untuk bekerja secara lokal dengan berbagai proses penerbitan.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Samuel
sumber
6

Saya juga suka jawaban zzzeek tetapi Andre benar bahwa antrian diperlukan untuk mencegah gangguan. Saya beruntung dengan pipa, tetapi memang melihat kekacauan yang agak diharapkan. Menerapkannya ternyata lebih sulit dari yang saya kira, terutama karena berjalan di Windows, di mana ada beberapa batasan tambahan tentang variabel dan hal-hal global (lihat: Bagaimana Proses Multi-Python Diimplementasikan pada Windows? )

Tapi, akhirnya aku berhasil. Contoh ini mungkin tidak sempurna, jadi komentar dan saran dipersilahkan. Itu juga tidak mendukung pengaturan formatter atau apa pun selain root logger. Pada dasarnya, Anda harus menginstal ulang logger di setiap proses kumpulan dengan antrian dan mengatur atribut lainnya pada logger.

Sekali lagi, saran tentang cara membuat kode lebih baik dipersilakan. Saya pasti belum tahu semua trik Python :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Mike Miller
sumber
1
Saya ingin tahu apakah if 'MainProcess' == multiprocessing.current_process().name:dapat digunakan sebagai pengganti passing child?
mlt
Jika ada orang lain yang mencoba menggunakan kumpulan proses alih-alih objek proses yang terpisah pada Windows, perlu disebutkan bahwa Manajer harus digunakan untuk mengirimkan antrian ke subproses karena tidak dapat dipilih secara langsung.
mlt
Implementasi ini bekerja dengan baik untuk saya. Saya memodifikasinya agar berfungsi dengan jumlah penangan yang sewenang-wenang. Dengan cara ini Anda dapat mengonfigurasikan root handler Anda dengan cara non-multiprocessing, lalu di mana aman untuk membuat antrian, meneruskan root handler ke ini, menghapusnya, dan menjadikan ini satu-satunya handler.
Jaxor24
3

cukup terbitkan di suatu tempat instance Anda dari logger. dengan begitu, modul dan klien lain dapat menggunakan API Anda untuk mendapatkan logger tanpa harus melakukannya import multiprocessing.

Javier
sumber
1
Masalah dengan ini adalah bahwa multiprocessing logger muncul tanpa nama, sehingga Anda tidak akan dapat menguraikan aliran pesan dengan mudah. Mungkin akan mungkin untuk memberi nama mereka setelah penciptaan, yang akan membuatnya lebih masuk akal untuk melihatnya.
cdleary
baik, terbitkan satu logger untuk setiap modul, atau lebih baik, ekspor penutupan berbeda yang menggunakan logger dengan nama modul. intinya adalah membiarkan modul lain menggunakan API Anda
Javier
Sangat masuk akal (dan +1 dari saya!), Tetapi saya akan kehilangan kemampuan untuk hanya import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')dari mana saja dan membuatnya berfungsi dengan baik.
cdleary
3
Ini adalah fenomena menarik yang saya lihat ketika saya menggunakan Python, sehingga kita menjadi terbiasa untuk dapat melakukan apa yang kita inginkan dalam 1 atau 2 baris sederhana sehingga pendekatan sederhana dan logis dalam bahasa lain (misalnya untuk menerbitkan logger atau bungkus multiprocessing itu di accessor) masih terasa seperti beban. :)
Kylotan
3

Saya menyukai jawaban zzzeek. Saya hanya akan mengganti Pipa untuk Antrian karena jika beberapa utas / proses menggunakan ujung pipa yang sama untuk menghasilkan pesan log mereka akan kacau.

André Cruz
sumber
Saya memiliki beberapa masalah dengan pawang, meskipun tidak ada pesan yang kacau, hanya semuanya akan berhenti berfungsi. Saya mengubah Pipa menjadi Antrian karena itu lebih tepat. Namun kesalahan yang saya dapatkan tidak diselesaikan dengan itu - pada akhirnya saya menambahkan coba / kecuali untuk metode accept () - sangat jarang, upaya untuk mencatat pengecualian akan gagal dan akhirnya tertangkap di sana. Setelah saya menambahkan coba / kecuali, ini berjalan selama berminggu-minggu tanpa masalah, dan file standarderr akan mengambil sekitar dua pengecualian yang salah per minggu.
zzzeek
2

Bagaimana dengan mendelegasikan semua pencatatan ke proses lain yang membaca semua entri log dari Antrian?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Cukup bagikan LOG_QUEUE melalui salah satu mekanisme multiproses atau bahkan warisan dan semuanya bekerja dengan baik!

Sawan
sumber
1

Saya punya solusi yang mirip dengan ironhacker kecuali saya menggunakan logging.exception di beberapa kode saya dan menemukan bahwa saya perlu memformat pengecualian sebelum meneruskannya kembali ke Antrean karena traceback tidak dapat dipilih:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Richard Jones
sumber
Saya menemukan contoh lengkap di sepanjang baris ini di sini .
Aryeh Leib Taurog
1

Di bawah ini adalah kelas yang dapat digunakan di lingkungan Windows, membutuhkan ActivePython. Anda juga bisa mewarisi untuk penangan logging lainnya (StreamHandler dll.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

Dan berikut ini adalah contoh yang menunjukkan penggunaan:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
pengguna6336812
sumber
Mungkin menggunakan multiprocessing.Lock()bukannya Mutex Windows akan membuat solusi portabel.
xmedeko
1

Inilah solusi sederhana saya / pemecahan masalah ... bukan yang paling komprehensif, tetapi mudah dimodifikasi dan lebih sederhana untuk dibaca dan dipahami menurut saya daripada jawaban lain yang saya temukan sebelum menulis ini:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
nmz787
sumber
1

Ada paket bagus ini

Paket: https://pypi.python.org/pypi/multiprocessing-logging/

kode: https://github.com/jruere/multiprocessing-logging

Install:

pip install multiprocessing-logging

Kemudian tambahkan:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
juan Isaza
sumber
3
Perpustakaan ini secara harfiah didasarkan dari komentar lain pada posting SO saat ini: stackoverflow.com/a/894284/1698058 .
Chris Hunt
Origins: stackoverflow.com/a/894284/1663382 Saya menghargai contoh penggunaan modul, selain dokumentasi di beranda.
Liquidgenius
0

Salah satu alternatifnya adalah menulis penebangan mutliprocessing ke file yang dikenal dan mendaftarkan atexitpawang untuk bergabung pada proses-proses tersebut membacanya kembali di stderr; namun, Anda tidak akan mendapatkan aliran waktu-nyata ke pesan keluaran di stderr dengan cara itu.

cdleary
sumber
apakah pendekatan yang Anda usulkan di bawah ini sama dengan pendekatan dari komentar Anda di sini stackoverflow.com/questions/641420/…
iruvar
0

Jika Anda memiliki kebuntuan yang terjadi dalam kombinasi kunci, utas dan garpu dalam loggingmodul, yang dilaporkan dalam laporan bug 6721 (lihat juga pertanyaan SO terkait ).

Ada solusi perbaikan kecil yang diposting di sini .

Namun, itu hanya akan memperbaiki deadlock potensial di logging. Itu tidak akan memperbaiki hal-hal yang mungkin kacau. Lihat jawaban lain yang disajikan di sini.

Albert
sumber
0

Ide paling sederhana seperti yang disebutkan:

  • Ambil nama file dan id proses dari proses saat ini.
  • Siapkan a [WatchedFileHandler][1]. Alasan untuk pawang ini dibahas secara rinci di sini , tetapi singkatnya ada beberapa kondisi ras yang lebih buruk dengan pawang penebangan lainnya. Yang ini memiliki jendela terpendek untuk kondisi lomba.
    • Pilih jalur untuk menyimpan log seperti / var / log / ...
pengguna1460675
sumber
0

Untuk siapa pun yang mungkin membutuhkan ini, saya menulis dekorator untuk paket multiprocessing_logging yang menambahkan nama proses saat ini ke log, sehingga menjadi jelas siapa yang mencatat apa.

Itu juga menjalankan install_mp_handler () sehingga menjadi tidak berguna untuk menjalankannya sebelum membuat kumpulan.

Ini memungkinkan saya untuk melihat pekerja mana yang membuat pesan log mana.

Inilah cetak biru dengan contoh:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Orsiris de Jong
sumber
-5

Kepada anak-anak saya yang mengalami masalah yang sama dalam beberapa dekade dan menemukan pertanyaan ini di situs ini saya meninggalkan jawaban ini.

Kesederhanaan vs rumit. Cukup gunakan alat lain. Python memang luar biasa, tetapi itu tidak dirancang untuk melakukan beberapa hal.

Cuplikan berikut untuk daemon logrotate berfungsi untuk saya dan tidak terlalu rumit. Jadwalkan untuk menjalankan setiap jam dan

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

Ini adalah bagaimana saya menginstalnya (symlink tidak berfungsi untuk logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
baldr
sumber