Baca non-blocking pada sebuah subprocess.PIPE dengan python

507

Saya menggunakan modul subproses untuk memulai subproses dan terhubung ke aliran output (stdout). Saya ingin dapat menjalankan bacaan non-pemblokiran di stdout-nya. Apakah ada cara untuk membuat .readline tanpa pemblokiran atau untuk memeriksa apakah ada data di aliran sebelum saya memohon .readline? Saya ingin ini portabel atau setidaknya berfungsi di Windows dan Linux.

di sini adalah bagaimana saya melakukannya untuk saat ini (Ini memblokir .readlinejika tidak ada data tersedia):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Mathieu Pagé
sumber
14
(Berasal dari google?) Semua PIPE akan menemui jalan buntu ketika salah satu buffer PIPE terisi dan tidak terbaca. misal kebuntuan stdout ketika stderr diisi. Jangan pernah melewati PIPE yang tidak ingin Anda baca.
Nasser Al-Wohaibi
@ NasserAl-Wohaibi apakah ini berarti lebih baik untuk selalu membuat file?
Charlie Parker
sesuatu yang saya ingin tahu adalah mengapa itu memblokir di tempat pertama ... Saya bertanya karena saya telah melihat komentar:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Charlie Parker
Yaitu, "dengan desain", menunggu untuk menerima input.
Mathieu Pagé

Jawaban:

403

fcntl, select, asyncprocTidak akan membantu dalam kasus ini.

Cara tepercaya untuk membaca aliran tanpa memblokir apa pun sistem operasinya adalah dengan menggunakan Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
jfs
sumber
6
Ya ini bekerja untuk saya, saya telah menghapus banyak. Ini termasuk praktik yang baik tetapi tidak selalu perlu. Python 3.x 2.X compat dan close_fds mungkin dihilangkan, itu masih akan berfungsi. Tetapi sadarilah apa yang semuanya lakukan dan jangan menyalinnya secara membabi buta, meskipun itu hanya berhasil! (Sebenarnya solusi paling sederhana adalah dengan menggunakan utas dan melakukan readline seperti yang dilakukan Seb, Qeues hanyalah cara mudah untuk mendapatkan data, ada yang lain, utas adalah jawabannya!)
Aki
3
Di dalam utas, panggilan untuk out.readlinememblokir utas, dan utas utama, dan saya harus menunggu sampai readline kembali sebelum semuanya berlanjut. Adakah cara mudah untuk mengatasi itu? (Saya membaca beberapa baris dari proses saya, yang juga merupakan file .py lain yang melakukan DB dan hal-hal lain)
Justin
3
@Justin: 'out.readline' tidak memblokir utas utama yang dijalankan di utas lainnya.
jfs
4
bagaimana jika saya gagal mematikan subproses, mis. karena pengecualian? utas pembaca stdout tidak akan mati dan python akan menggantung, bahkan jika utas utama keluar, bukan? bagaimana orang bisa mengatasi ini? python 2.x tidak mendukung membunuh utas, yang lebih buruk, tidak mendukung menyela mereka. :( (jelas orang harus menangani pengecualian untuk memastikan subproses dimatikan, tapi kalau-kalau tidak, apa yang dapat Anda lakukan?)
n611x007
3
Saya telah membuat beberapa pembungkus yang bersahabat dalam paket ini shelljob pypi.python.org/pypi/shelljob
edA-qa mort-ora-y
77

Saya sering mengalami masalah yang sama; Program python yang saya tulis sering harus memiliki kemampuan untuk menjalankan beberapa fungsi utama sekaligus menerima input pengguna dari baris perintah (stdin). Cukup dengan menempatkan fungsi penanganan input pengguna di utas lain tidak menyelesaikan masalah karena readline()blok dan tidak memiliki batas waktu. Jika fungsi utama selesai dan tidak perlu lagi menunggu input pengguna lebih lanjut, saya biasanya ingin program saya keluar, tetapi tidak bisa karena readline()masih memblokir di utas lainnya menunggu garis. Solusi yang saya temukan untuk masalah ini adalah membuat stdin file non-blocking menggunakan modul fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

Menurut pendapat saya ini sedikit lebih bersih daripada menggunakan modul pilih atau sinyal untuk menyelesaikan masalah ini tetapi sekali lagi itu hanya bekerja pada UNIX ...

Jesse
sumber
1
Menurut dokumen, fcntl () dapat menerima deskriptor file, atau objek yang memiliki metode .fileno ().
Denilson Sá Maia
10
Jawaban Jesse tidak benar. Menurut Guido, readline tidak bekerja dengan benar dengan mode non-blocking, dan itu tidak akan sebelum Python 3000. bugs.python.org/issue1175#msg56041 Jika Anda ingin menggunakan fcntl untuk mengatur file ke mode non-blocking, Anda harus menggunakan os.read tingkat rendah () dan memisahkan sendiri garis-garisnya. Mencampur fcntl dengan panggilan tingkat tinggi yang melakukan penyanggaan jalur meminta masalah.
Anonnn
2
Penggunaan readline tampaknya salah dalam Python 2. Lihat jawaban anonnn, stackoverflow.com/questions/375427/…
Catalin Iacob
10
Tolong, jangan gunakan loop sibuk. Gunakan jajak pendapat () dengan batas waktu untuk menunggu data.
Ivo Danihelka
@Stefano apa yang buffer_sizedidefinisikan sebagai?
kucing
39

Python 3.4 memperkenalkan API sementara baru untuk asynciomodul IO - asinkron .

Pendekatannya mirip dengan twistedjawaban berbasis-oleh @Bryan Ward - mendefinisikan protokol dan metode-metodenya dipanggil segera setelah data siap:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Lihat "Subproses" di dokumen .

Ada antarmuka tingkat tinggi asyncio.create_subprocess_exec()yang mengembalikan Processobjek yang memungkinkan untuk membaca baris asynchroniosly menggunakan StreamReader.readline()coroutine (dengan sintaks async/ awaitPython 3.5+ ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() melakukan tugas-tugas berikut:

  • mulai subproses, arahkan stdout ke pipa
  • membaca baris dari stdout subproses secara tidak sinkron
  • bunuh subproses
  • tunggu sampai keluar

Setiap langkah dapat dibatasi oleh batas waktu detik jika perlu.

jfs
sumber
Ketika saya mencoba sesuatu seperti ini menggunakan corthon python 3.4, saya hanya mendapatkan output setelah seluruh skrip berjalan. Saya ingin melihat garis output dicetak, segera setelah subproses mencetak garis. Inilah yang saya dapatkan: pastebin.com/qPssFGep .
flutefreak7
1
@ flutefreak7: masalah buffer tidak terkait dengan pertanyaan saat ini. Ikuti tautan untuk solusi yang memungkinkan.
jfs
Terima kasih! Memecahkan masalah untuk skrip saya dengan hanya menggunakan print(text, flush=True)sehingga teks yang dicetak akan segera tersedia untuk pemantau panggilan readline. Ketika saya mengujinya dengan executable berbasis Fortran saya benar-benar ingin membungkus / menonton, itu tidak menyangga output itu, sehingga berperilaku seperti yang diharapkan.
flutefreak7
Apakah mungkin untuk membiarkan subproses bertahan dan melakukan operasi baca / tulis lebih lanjut. readline_and_kill, dalam skrip kedua Anda, berfungsi sangat mirip subprocess.comunicatedengan itu mengakhiri proses setelah satu operasi baca / tulis. Saya juga melihat bahwa Anda menggunakan satu pipa stdout, yang ditangani oleh subproses sebagai non-pemblokiran. Mencoba menggunakan keduanya stdoutdan stderr saya menemukan saya akhirnya memblokir .
Carel
@Carel kode dalam jawaban berfungsi sebagaimana dimaksud seperti yang dijelaskan dalam jawaban secara eksplisit. Dimungkinkan untuk menerapkan perilaku lain jika diinginkan. Kedua pipa sama-sama nonblocking jika digunakan, berikut adalah contoh cara membaca dari kedua pipa secara bersamaan .
jfs
19

Coba modul asyncproc . Sebagai contoh:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Modul ini menangani semua threading seperti yang disarankan oleh S.Lott.

Nuh
sumber
1
Sangat brilian. Jauh lebih mudah daripada modul subproses mentah. Berfungsi sempurna untuk saya di Ubuntu.
Cerin
12
asyncproc tidak bekerja di windows, dan windows tidak mendukung os.WNOHANG :-(
Bryan Oakley
26
asyncproc adalah GPL, yang selanjutnya membatasi penggunaannya :-(
Bryan Oakley
Terima kasih. Satu hal kecil: Tampaknya mengganti tab dengan 8 spasi di asyncproc.py adalah cara untuk pergi :)
benjaoming
Sepertinya Anda tidak bisa mendapatkan kode pengembalian dari proses yang Anda luncurkan melalui modul asyncproc; hanya output yang dihasilkannya.
grayaii
17

Anda dapat melakukannya dengan sangat mudah di Twisted . Tergantung pada basis kode Anda yang ada, ini mungkin tidak mudah digunakan, tetapi jika Anda sedang membangun aplikasi bengkok, maka hal-hal seperti ini menjadi hampir sepele. Anda membuat ProcessProtocolkelas, dan mengganti outReceived()metode. Memutar (tergantung pada reaktor yang digunakan) biasanya hanya select()loop besar dengan callback diinstal untuk menangani data dari deskriptor file yang berbeda (sering soket jaringan). Jadi outReceived()metode ini hanya menginstal panggilan balik untuk menangani data yang berasal STDOUT. Contoh sederhana yang menunjukkan perilaku ini adalah sebagai berikut:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The twisted dokumentasi memiliki beberapa informasi yang baik ini.

Jika Anda membangun seluruh aplikasi Anda di sekitar Twisted, itu membuat komunikasi tidak sinkron dengan proses lain, lokal atau jauh, sangat elegan seperti ini. Di sisi lain, jika program Anda tidak dibangun di atas Twisted, ini tidak benar-benar bermanfaat. Semoga ini dapat bermanfaat bagi pembaca lain, bahkan jika itu tidak berlaku untuk aplikasi khusus Anda.

Bryan Ward
sumber
tidak baik. selectseharusnya tidak bekerja pada windows dengan deskriptor file, menurutdocs
n611x007
2
@naxa Saya tidak berpikir yang select()dia maksud adalah sama dengan Anda. Saya menganggap ini karena Twistedberfungsi pada windows ...
notbad.jpeg
1
"Memuntir (tergantung pada reaktor yang digunakan) biasanya hanya sebuah pilih besar () loop" berarti ada beberapa reaktor untuk dipilih. Yang select()satu adalah yang paling portabel di unix dan suka-unix, tetapi ada juga dua reaktor yang tersedia untuk Windows: twistedmatrix.com/documents/current/core/howto/…
clacke
14

Gunakan pilih & baca (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Untuk readline () - seperti:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Andy Jackson
sumber
6
tidak baik. selectseharusnya tidak bekerja di windows dengan deskriptor file, menurut docs
n611x007
OH TUHAN. Baca megabyte, atau mungkin gigabytes satu karakter pada suatu waktu ... itu adalah ide terburuk yang pernah saya lihat dalam waktu yang lama ... tidak perlu disebutkan, kode ini tidak berfungsi, karena proc.stdout.read()sekecil apa pun argumennya. panggilan pemblokiran.
wvxvw
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787
8

Salah satu solusinya adalah membuat proses lain untuk melakukan pembacaan proses Anda, atau membuat utas proses dengan batas waktu.

Inilah versi utas dari fungsi batas waktu:

http://code.activestate.com/recipes/473878/

Namun, apakah Anda perlu membaca stdout saat datang? Solusi lain mungkin dengan membuang output ke file dan menunggu proses selesai menggunakan p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
monkut
sumber
Sepertinya utas recpie tidak akan keluar setelah batas waktu habis dan membunuhnya tergantung pada kemampuan untuk membunuh subproses (sg. jika tidak terkait dalam hal ini) bunyinya (hal yang Anda harus dapat tetapi jika Anda tidak bisa ..) .
n611x007
7

Penafian: ini hanya berfungsi untuk tornado

Anda dapat melakukan ini dengan mengatur fd menjadi nonblocking dan kemudian menggunakan ioloop untuk mendaftarkan panggilan balik. Saya telah mengemas ini dalam telur yang disebut tornado_subprocess dan Anda dapat menginstalnya melalui PyPI:

easy_install tornado_subprocess

sekarang kamu bisa melakukan sesuatu seperti ini:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

Anda juga dapat menggunakannya dengan RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Vukasin Toroman
sumber
Terima kasih untuk fitur yang bagus! Hanya untuk memperjelas, mengapa kita tidak bisa hanya menggunakan threading.Threaduntuk membuat proses non-blocking baru? Saya menggunakannya dalam on_messagecontoh websocket Tornado, dan itu berhasil dengan baik.
VisioN
1
threading sebagian besar tidak dianjurkan dalam tornado. mereka baik-baik saja untuk fungsi berjalan kecil dan pendek. Anda dapat membacanya di sini: stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
Vukasin Toroman
@ VukasinToroman Anda benar-benar menyelamatkan saya di sini dengan ini. terima kasih banyak atas modul tornado_subprocess :)
James Gentes
apakah ini bekerja di windows? (perhatikan bahwa select, dengan deskriptor file, tidak )
n611x007
Lib ini tidak menggunakan selectpanggilan. Saya belum mencoba ini di Windows tetapi Anda mungkin akan mengalami masalah karena lib menggunakan fcntlmodul. Singkatnya: tidak, ini mungkin tidak akan berfungsi di Windows.
Vukasin Toroman
6

Solusi yang ada tidak berhasil untuk saya (detail di bawah). Yang akhirnya berhasil adalah menerapkan readline menggunakan read (1) (berdasarkan jawaban ini ). Yang terakhir tidak memblokir:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Mengapa solusi yang ada tidak berfungsi:

  1. Solusi yang memerlukan readline (termasuk yang berbasis Antrian) selalu dicekal. Sulit (mustahil?) Untuk membunuh utas yang mengeksekusi readline. Itu hanya terbunuh ketika proses yang membuatnya selesai, tetapi tidak ketika proses produksi-output dibunuh.
  2. Mencampur fcntl tingkat rendah dengan panggilan readline tingkat tinggi mungkin tidak berfungsi sebagaimana mestinya.
  3. Menggunakan select.poll () rapi, tetapi tidak berfungsi pada Windows menurut python docs.
  4. Menggunakan perpustakaan pihak ketiga tampaknya berlebihan untuk tugas ini dan menambahkan dependensi tambahan.
Vikram Pudi
sumber
1
1. q.get_nowait()dari jawaban saya tidak boleh memblokir, pernah, itulah gunanya menggunakannya. 2. Thread yang mengeksekusi readline ( enqueue_output()fungsi ) keluar pada EOF misalnya, termasuk kasus ketika proses produksi-output dimatikan. Jika Anda percaya tidak demikian; tolong, berikan contoh kode minimal lengkap yang menunjukkan sebaliknya (mungkin sebagai pertanyaan baru ).
jfs
1
@sebastian Saya menghabiskan satu jam atau lebih mencoba untuk memberikan contoh minimal. Pada akhirnya saya harus setuju bahwa jawaban Anda menangani semua kasus. Saya kira itu tidak bekerja lebih awal untuk saya karena ketika saya mencoba untuk mematikan proses produksi, itu sudah mati dan memberikan kesalahan yang sulit untuk debug. Jamnya dihabiskan dengan baik, karena ketika datang dengan contoh minimal, saya bisa datang dengan solusi yang lebih sederhana.
Vikram Pudi
Bisakah Anda memposting solusi yang lebih sederhana juga? :) (jika berbeda dari Sebastian)
n611x007
@ hazard89: Saya kira dcmpid = myprocess.
ViFI
Dalam kondisi setelah membaca () memanggil (sesaat setelah Benar): keluar tidak akan pernah menjadi string kosong karena Anda membaca setidaknya string / byte dengan panjang 1.
sergzach
6

Berikut adalah kode saya, digunakan untuk menangkap setiap output dari ASAP subproses, termasuk garis parsial. Ini memompa pada waktu yang sama dan stdout dan stderr dalam urutan yang hampir benar.

Diuji dan bekerja dengan benar di Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
sumber
Salah satu dari sedikit jawaban yang memungkinkan Anda membaca hal-hal yang tidak harus diakhiri dengan baris baru.
totaam
5

Saya menambahkan masalah ini untuk membaca beberapa subprocess.Popen stdout. Berikut adalah solusi baca yang tidak menghalangi saya:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Sebastien Claeys
sumber
5
fcntl tidak berfungsi di windows, menurut dokumen .
n611x007
@anatolytechtonik gunakan msvcrt.kbhit()saja
kucing
4

Versi non-blocking read ini tidak memerlukan modul khusus dan akan bekerja secara otomatis di sebagian besar distro Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Tom Lime
sumber
3

Berikut adalah solusi sederhana berdasarkan utas yang:

  • bekerja di Linux dan Windows (tidak mengandalkan select ).
  • membaca keduanya stdoutdanstderr sinkron.
  • tidak bergantung pada polling aktif dengan waktu tunggu sewenang-wenang (ramah CPU).
  • tidak menggunakan asyncio(yang mungkin bertentangan dengan perpustakaan lain).
  • berjalan sampai proses anak berakhir.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Olivier Michel
sumber
2

Menambahkan jawaban ini di sini karena memberikan kemampuan untuk mengatur pipa non-blocking pada Windows dan Unix.

Semua ctypesdetailnya berkat jawaban @ techtonik .

Ada versi yang sedikit dimodifikasi untuk digunakan pada sistem Unix dan Windows.

  • Kompatibel dengan Python3 (hanya diperlukan sedikit perubahan) .
  • Termasuk versi posix, dan mendefinisikan pengecualian untuk digunakan.

Dengan cara ini Anda dapat menggunakan fungsi dan pengecualian yang sama untuk kode Unix dan Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Untuk menghindari membaca data yang tidak lengkap, saya akhirnya menulis generator readline saya sendiri (yang mengembalikan string byte untuk setiap baris).

Ini generator sehingga Anda dapat misalnya ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
gagasanman42
sumber
(1) komentar ini menunjukkan bahwa readline()tidak berfungsi dengan pipa non-blocking (seperti diatur menggunakan fcntl) pada Python 2 - apakah menurut Anda itu tidak lagi benar? (jawaban saya berisi tautan ( fcntl) yang memberikan info yang sama tetapi sepertinya dihapus sekarang). (2) Lihat bagaimana multiprocessing.connection.PipemenggunakanSetNamedPipeHandleState
jfs
Saya hanya menguji ini di Python3. Tetapi melihat informasi ini juga dan berharap itu tetap valid. Saya juga menulis kode saya sendiri untuk menggunakan readline di tempat, saya telah memperbarui jawaban saya untuk memasukkannya.
ideasman42
2

Saya memiliki masalah dengan penanya asli, tetapi tidak ingin meminta utas. Saya mencampur solusi Jesse dengan membaca langsung () dari pipa, dan buffer-handler saya sendiri untuk membaca baris (namun, sub-proses saya - ping - selalu menulis baris penuh <ukuran halaman sistem). Saya menghindari kesibukan-menunggu dengan hanya membaca di io arloji terdaftar-gobject. Hari-hari ini saya biasanya menjalankan kode dalam gobject MainLoop untuk menghindari utas.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

Pengamat itu

def watch(f, *other):
print 'reading',f.read()
return True

Dan program utama mengatur ping dan kemudian memanggil loop email gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Pekerjaan lain dilampirkan ke callback di gobject.

Dave Kitchen
sumber
2

Banyak hal yang jauh lebih baik dalam Python modern.

Berikut program anak sederhana, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Dan program untuk berinteraksi dengannya:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Itu mencetak:

b'hello bob\n'
b'hello alice\n'

Perhatikan bahwa pola aktual, yang juga oleh hampir semua jawaban sebelumnya, baik di sini maupun dalam pertanyaan terkait, adalah untuk mengatur deskriptor file stdout anak ke non-blocking dan kemudian polling dalam semacam loop pilih. Hari-hari ini, tentu saja, loop itu disediakan oleh asyncio.

pengguna240515
sumber
1

The pilih modul membantu Anda menentukan di mana masukan yang berguna berikutnya adalah.

Namun, Anda hampir selalu lebih bahagia dengan utas terpisah. Satu tidak memblokir membaca stdin, yang lain tidak di mana pun Anda tidak ingin diblokir.

S.Lott
sumber
11
Saya pikir jawaban ini tidak membantu karena dua alasan: (a) Modul pilih tidak akan bekerja pada pipa di bawah Windows (seperti yang dinyatakan tautan yang jelas), yang mengalahkan niat OP untuk memiliki solusi portabel. (B) Asinkron utas tidak memungkinkan untuk dialog sinkron antara proses induk dan anak. Bagaimana jika proses orang tua ingin mengirim tindakan selanjutnya sesuai dengan baris berikutnya yang dibaca dari anak ?!
ThomasH
4
select juga tidak berguna karena pembacaan Python akan memblokir bahkan setelah select, karena itu tidak memiliki semantik C standar dan tidak akan mengembalikan data parsial.
Helmut Grohne
Sebuah thresd terpisah untuk membaca dari output anak memecahkan masalah saya yang mirip dengan ini. Jika Anda membutuhkan interaksi sinkron, saya kira Anda tidak dapat menggunakan solusi ini (kecuali jika Anda tahu output yang diharapkan) Saya akan menerima jawaban ini
Emiliano
1

mengapa mengganggu utas & antrian? tidak seperti readline (), BufferedReader.read1 () tidak akan menunggu \ r \ n, ia mengembalikan ASAP jika ada output yang masuk.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
sumber
Apakah akan mengembalikan ASAP jika tidak ada yang masuk? Jika tidak, itu memblokir.
Mathieu Pagé
@ MathieuPagé benar. read1akan memblokir jika blok baca yang mendasari pertama, yang terjadi ketika pipa masih terbuka tetapi input tidak tersedia.
Jack O'Connor
1

Dalam kasus saya, saya membutuhkan modul logging yang menangkap output dari aplikasi latar belakang dan menambahnya (menambahkan perangko waktu, warna, dll.).

Saya berakhir dengan utas latar belakang yang melakukan I / O yang sebenarnya. Kode berikut hanya untuk platform POSIX. Saya menanggalkan bagian yang tidak penting.

Jika seseorang akan menggunakan binatang ini untuk jangka panjang, pertimbangkan untuk mengelola deskriptor terbuka. Dalam kasus saya itu bukan masalah besar.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Dmytro
sumber
1

Masalah saya agak berbeda karena saya ingin mengumpulkan stdout dan stderr dari proses yang berjalan, tetapi pada akhirnya sama karena saya ingin membuat output dalam widget seperti yang dihasilkan.

Saya tidak ingin menggunakan banyak solusi yang diusulkan menggunakan Antrian atau Utas tambahan karena mereka tidak perlu melakukan tugas umum seperti menjalankan skrip lain dan mengumpulkan hasilnya.

Setelah membaca solusi yang diusulkan dan dokumen python saya menyelesaikan masalah saya dengan implementasi di bawah ini. Ya itu hanya berfungsi untuk POSIX karena saya menggunakan selectpemanggilan fungsi.

Saya setuju bahwa dokumen membingungkan dan implementasinya canggung untuk tugas scripting yang umum. Saya percaya bahwa versi python yang lebih lama memiliki standar Popendan penjelasan yang berbeda sehingga menciptakan banyak kebingungan. Ini tampaknya bekerja dengan baik untuk Python 2.7.12 dan 3.5.2.

Kuncinya adalah mengatur bufsize=1buffer line dan kemudian universal_newlines=Truememproses sebagai file teks, bukan biner yang tampaknya menjadi default saat pengaturan bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG dan VERBOSE hanyalah makro yang mencetak output ke terminal.

Solusi ini adalah IMHO 99,99% efektif karena masih menggunakan readlinefungsi pemblokiran , jadi kami menganggap sub prosesnya bagus dan menampilkan garis yang lengkap.

Saya menyambut umpan balik untuk meningkatkan solusi karena saya masih baru di Python.

Brooke Wallace
sumber
Dalam kasus khusus ini, Anda dapat mengatur stderr = subprocess.STDOUT di konstruktor Popen, dan mendapatkan semua output dari cmd.stdout.readline ().
Aaron
Contoh jelas yang bagus. Sedang mengalami masalah dengan select.select () tetapi ini menyelesaikannya untuk saya.
maharvey67
0

Bekerja dari jawaban JF Sebastian, dan beberapa sumber lain, saya telah mengumpulkan manajer subproses sederhana. Ini menyediakan permintaan pembacaan non-blocking, serta menjalankan beberapa proses secara paralel. Itu tidak menggunakan panggilan khusus OS (yang saya tahu) dan karenanya harus bekerja di mana saja.

Ini tersedia dari pypi, jadi adil pip install shelljob. Lihat halaman proyek untuk contoh dan dokumen lengkap.

edA-qa mort-ora-y
sumber
0

EDIT: Implementasi ini masih memblokir. Gunakan jawaban JFSebastian sebagai gantinya.

Saya mencoba jawaban teratas , tetapi risiko tambahan dan pemeliharaan kode utas mengkhawatirkan.

Melihat melalui modul io (dan terbatas pada 2.6), saya menemukan BufferedReader. Ini adalah solusi tanpa-ulir saya yang tanpa ulir.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
romc
sumber
sudahkah kamu mencoba for line in iter(p.stdout.readline, ""): # do stuff with the line ? Itu tanpa benang (utas tunggal) dan memblokir ketika kode Anda diblokir.
jfs
@ jf-sebastian Ya, saya akhirnya kembali ke jawaban Anda. Implementasi saya kadang-kadang masih diblokir. Saya akan mengedit jawaban saya untuk memperingatkan orang lain agar tidak turun rute ini.
romc
0

Saya baru-baru ini menemukan pada masalah yang sama saya perlu membaca satu baris pada waktu dari aliran (ekor berjalan dalam proses) dalam mode non-blocking Saya ingin menghindari masalah berikutnya: tidak membakar cpu, jangan membaca aliran dengan satu byte ( seperti readline lakukan), dll

Berikut ini adalah implementasi saya https://gist.github.com/grubberr/5501e1a9760c3eab5e0a itu tidak mendukung windows (polling), tidak menangani EOF, tetapi berfungsi dengan baik untuk saya

grubberr
sumber
jawaban benang berbasis tidak tidak membakar cpu (Anda dapat menentukan sewenang-wenang timeoutseperti dalam solusi Anda) dan .readline()membaca lebih dari satu byte pada suatu waktu ( bufsize=1berarti garis -buffered (hanya relevan untuk menulis)). Apa masalah lain yang Anda temukan? Jawaban hanya tautan tidak terlalu berguna.
jfs
0

Ini adalah contoh untuk menjalankan perintah interaktif dalam subproses, dan stdout bersifat interaktif dengan menggunakan terminal pseudo. Anda dapat merujuk ke: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Liao
sumber
0

Solusi ini menggunakan select modul untuk "membaca data apa pun yang tersedia" dari aliran IO. Fungsi ini awalnya memblokir sampai data tersedia, tetapi kemudian hanya membaca data yang tersedia dan tidak memblokir lebih lanjut.

Mengingat fakta bahwa ia menggunakan selectmodul, ini hanya berfungsi pada Unix.

Kode ini sepenuhnya sesuai dengan PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Bradley Odell
sumber
0

Saya juga menghadapi masalah yang dijelaskan oleh Jesse dan menyelesaikannya dengan menggunakan "pilih" seperti yang dilakukan Bradley , Andy dan lainnya tetapi dalam mode pemblokiran untuk menghindari loop sibuk. Ia menggunakan dummy Pipe sebagai stdin palsu. Pilih blok dan tunggu stdin atau pipa siap. Ketika tombol ditekan stdin membuka blokir pilih dan nilai kunci dapat diambil dengan membaca (1). Ketika utas yang berbeda menulis ke pipa maka pipa membuka blokir pilih dan dapat diambil sebagai indikasi bahwa kebutuhan untuk stdin sudah berakhir. Berikut ini beberapa kode referensi:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
sumber
CATATAN: Agar ini berfungsi di Windows, pipa harus diganti dengan soket. Saya belum mencobanya tetapi harus bekerja sesuai dengan dokumentasi.
gonzaedu61
0

Coba wexpect , yang merupakan alternatif windows dari pexpect .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
antara horizontal
sumber
0

Pada sistem seperti Unix dan Python 3.5+ ada os.set_blockingyang melakukan persis apa yang dikatakannya.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

Output ini:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

Dengan os.set_blockingberkomentar itu:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
saaj
sumber
-2

Berikut adalah modul yang mendukung pembacaan non-blocking dan penulisan latar belakang dengan python:

https://pypi.python.org/pypi/python-nonblock

Menyediakan fungsi,

nonblock_read yang akan membaca data dari stream, jika tersedia, jika tidak mengembalikan string kosong (atau Tidak ada jika stream ditutup di sisi lain dan semua data yang mungkin telah dibaca)

Anda juga dapat mempertimbangkan modul python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

yang menambah modul subproses. Jadi pada objek yang dikembalikan dari "subprocess.Popen" ditambahkan metode tambahan, jalankanInBackground. Ini memulai utas dan mengembalikan objek yang secara otomatis akan diisi ketika barang ditulis ke stdout / stderr, tanpa memblokir utas utama Anda.

Nikmati!

Tim Savannah
sumber
Saya ingin mencoba modul nonblock ini , tetapi saya relatif baru di beberapa prosedur Linux. Bagaimana cara saya menginstal rutinitas ini? Saya menjalankan Raspbian Jessie, rasa Debian Linux untuk Raspberry Pi. Saya mencoba 'sudo apt-get install nonblock' dan python-nonblock dan keduanya melempar kesalahan - tidak ditemukan. Saya telah mengunduh file zip dari situs ini pypi.python.org/pypi/python-nonblock , tetapi tidak tahu apa yang harus dilakukan dengannya. Terima kasih .... RDK
RDK