Menyebarkan stdin ke proses paralel

13

Saya punya tugas yang memproses daftar file di stdin. Waktu memulai program sangat besar, dan jumlah waktu yang dibutuhkan setiap file sangat bervariasi. Saya ingin menelurkan sejumlah besar proses ini, kemudian mengirim pekerjaan ke mana saja yang tidak sibuk. Ada beberapa alat commandline berbeda yang hampir melakukan apa yang saya inginkan, saya mempersempitnya menjadi dua opsi yang hampir berfungsi:

find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob

Masalahnya adalah splitapakah melakukan round-robin murni, sehingga salah satu proses tertinggal dan tetap di belakang, menunda penyelesaian seluruh operasi; sementara parallelingin menelurkan satu proses per N baris atau byte input dan akhirnya saya menghabiskan terlalu banyak waktu untuk overhead startup.

Apakah ada sesuatu seperti ini yang akan menggunakan kembali proses dan memberi makan garis ke proses mana saja yang telah membuka blokir stdins?

BCoates
sumber
Dari mana splitperintah itu? Nama tersebut bertentangan dengan utilitas pemrosesan teks standar .
Gilles 'SANGAT berhenti menjadi jahat'
@Gilles, ini GNU: "split (GNU coreutils) 8.13" . Menggunakannya sebagai alternatif aneh untuk xargs mungkin bukan penggunaan yang dimaksudkan tetapi yang paling dekat dengan apa yang saya inginkan saya temukan.
BCoates
2
Saya telah memikirkan hal itu, dan masalah mendasar adalah mengetahui bahwa instance myjobsiap untuk menerima lebih banyak input. Tidak ada cara untuk mengetahui bahwa suatu program siap untuk memproses lebih banyak input, yang dapat Anda ketahui adalah bahwa beberapa buffer di suatu tempat (buffer pipa, buffer stdio) siap menerima input lebih banyak. Bisakah Anda mengatur program Anda untuk mengirim beberapa jenis permintaan (mis. Tampilkan konfirmasi) ketika sudah siap?
Gilles 'SANGAT berhenti menjadi jahat'
Dengan anggapan bahwa program tidak menggunakan bufering pada stdin, sistem file FUSE yang bereaksi terhadap readpanggilan akan melakukan trik. Itu upaya pemrograman yang cukup besar.
Gilles 'SANGAT berhenti menjadi jahat'
mengapa kau menggunakan -l 1di parallelargs? IIRC, yang memberitahukan secara paralel untuk memproses satu baris input per pekerjaan (yaitu satu nama file per garpu myjob, sehingga banyak overhead startup).
cas

Jawaban:

1

Itu tidak terlihat mungkin dalam kasus umum seperti itu. Ini menyiratkan Anda memiliki buffer untuk setiap proses dan Anda dapat menonton buffer dari luar untuk memutuskan di mana menempatkan entri berikutnya (penjadwalan) ... Tentu saja Anda dapat menulis sesuatu (atau menggunakan sistem batch seperti slurm)

Tetapi tergantung pada apa prosesnya, Anda mungkin dapat melakukan pra-proses input. Sebagai contoh jika Anda ingin mengunduh file, memperbarui entri dari DB, atau serupa, tetapi 50% dari mereka akan berakhir dilewati (dan karenanya Anda memiliki perbedaan pemrosesan yang besar tergantung pada input) lalu, cukup siapkan pra-prosesor yang memverifikasi entri mana yang akan memakan waktu lama (file ada, data telah diubah, dll), jadi apa pun yang datang dari pihak lain dijamin akan memakan waktu yang cukup sama. Bahkan jika heuristik tidak sempurna Anda mungkin berakhir dengan peningkatan yang cukup besar. Anda dapat membuang yang lain ke file dan memprosesnya dengan cara yang sama.

Tapi itu tergantung pada kasus penggunaan Anda.

estani
sumber
1

Tidak, tidak ada solusi umum. Operator Anda perlu tahu kapan setiap program siap membaca baris lain, dan tidak ada standar yang saya ketahui yang memungkinkan untuk itu. Yang bisa Anda lakukan adalah memasang STDOUT dan menunggu sesuatu untuk mengkonsumsinya; benar-benar tidak ada cara yang baik bagi produsen untuk memastikan apakah konsumen berikutnya siap atau tidak.

dannysauer
sumber
0

Saya kira tidak. Di majalah favorit saya adalah sebuah artikel tentang pemrograman bash yang melakukan apa yang Anda inginkan. Saya bersedia percaya bahwa jika ada alat untuk melakukan itu mereka akan menyebutkannya. Jadi, Anda menginginkan sesuatu seperti:

set -m # enable job control
max_processes=8
concurrent_processes=0

child_has_ended() { concurrent_processes=$((concurrent_processes - 1)) }

trap child_has_ended SIGCHLD # that's magic calling our bash function when a child processes ends

for i in $(find . -type f)
do
  # don't do anything while there are max_processes running
  while [ ${concurrent_processes} -ge ${max_processes}]; do sleep 0.5; done 
  # increase the counter
  concurrent_processes=$((concurrent_processes + 1))
  # start a child process to actually deal with one file
  /path/to/script/to/handle/one/file $i &
done

Jelas Anda dapat mengubah permintaan untuk skrip yang aktif sesuai keinginan Anda. Majalah yang saya sebutkan awalnya melakukan hal-hal seperti memasang pipa dan benar-benar memulai thread pekerja. Lihat mkfifoitu, tetapi rute itu jauh lebih rumit karena proses pekerja perlu memberi sinyal pada proses master bahwa mereka siap menerima lebih banyak data. Jadi Anda memerlukan satu fifo untuk setiap proses pekerja untuk mengirimkan data dan satu fifo untuk proses master untuk menerima barang dari pekerja.

DISCLAIMER Saya menulis naskah itu dari atas kepala saya. Mungkin ada beberapa masalah sintaksis.

Bananguin
sumber
1
Ini tampaknya tidak memenuhi persyaratan: Anda memulai contoh program yang berbeda untuk setiap item.
Gilles 'SANGAT berhenti menjadi jahat'
Biasanya lebih baik digunakan find . -type f | while read idaripada for i in $(find . -type f).
0

Untuk GNU Parallel Anda dapat mengatur ukuran blok menggunakan --block. Namun, itu mengharuskan Anda memiliki cukup memori untuk menyimpan 1 blok dalam memori untuk setiap proses yang berjalan.

Saya mengerti ini bukan apa yang Anda cari, tetapi mungkin ini merupakan solusi yang dapat diterima untuk saat ini.

Jika tugas Anda rata-rata membutuhkan waktu yang sama, maka Anda mungkin dapat menggunakan mbuffer:

find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
Ole Tange
sumber
0

Coba ini:

mkfifo untuk setiap proses.

Kemudian bertahan tail -f | myjobdi setiap fifo.

Misalnya menyiapkan pekerja (proses pekerjaanku)

mkdir /tmp/jobs
for X in 1 2 3 4
do
   mkfifo pipe$X
   tail -f pipe$X | myjob &
   jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done

Tergantung pada aplikasi Anda (pekerjaan saya) Anda mungkin dapat menggunakan pekerjaan -s untuk menemukan pekerjaan yang dihentikan. Kalau tidak, daftarkan proses yang diurutkan berdasarkan CPU dan pilih sumber daya yang paling sedikit dikonsumsi. Tentu memiliki laporan pekerjaan itu sendiri, misalnya dengan menetapkan bendera di sistem file ketika ingin lebih banyak pekerjaan.

Dengan asumsi pekerjaan berhenti ketika menunggu input, gunakan

jobs -sl untuk mengetahui pid dari pekerjaan yang berhenti dan menetapkannya bekerja, misalnya

grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
   cat workset > $PIPE
done

Saya menguji ini dengan

garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes

Ini harus saya akui hanya diciptakan jadi ymmv.

Johan
sumber
0

Apa yang benar-benar diperlukan untuk menyelesaikan ini adalah mekanisme antrian dari beberapa jenis.

Apakah mungkin untuk memiliki pekerjaan membaca input mereka dari Antrian, seperti antrian pesan SYSV, dan kemudian memiliki program yang dijalankan secara paralel hanya dengan mendorong nilai-nilai ke antrian?

Kemungkinan lain adalah menggunakan direktori untuk antrian, seperti ini:

  1. output find menciptakan symlink ke setiap file untuk diproses dalam direktori, pending
  2. setiap proses pekerjaan melakukan satu mvdari file pertama yang dilihatnya di direktori ke direktori saudara pending, bernama inprogress.
  3. jika pekerjaan berhasil memindahkan file, ia melakukan pemrosesan; jika tidak, ia kembali untuk mencari dan memindahkan nama file lain daripending
Abu
sumber
0

menguraikan jawaban @ ash, Anda dapat menggunakan antrian pesan SYSV untuk mendistribusikan pekerjaan. Jika Anda tidak ingin menulis program Anda sendiri di C ada sebuah utilitas bernama ipcmdyang dapat membantu. Inilah yang saya kumpulkan untuk mengirimkan output find $DIRECTORY -type fke $PARALLELsejumlah proses:

set -o errexit
set -o nounset

export IPCMD_MSQID=$(ipcmd msgget)

DIRECTORY=$1
PARALLEL=$2

# clean up message queue on exit
trap 'ipcrm -q $IPCMD_MSQID' EXIT

for i in $(seq $PARALLEL); do
   {
      while true
      do
          message=$(ipcmd msgrcv) || exit
          [ -f $message ] || break
          sleep $((RANDOM/3000))
      done
   } &
done

find "$DIRECTORY" -type f | xargs ipcmd msgsnd

for i in $(seq $PARALLEL); do
   ipcmd msgsnd "/dev/null/bar"
done
wait

Inilah uji coba:

$ for i in $(seq 20 10 100) ; do time parallel.sh /usr/lib/ $i ; done
parallel.sh /usr/lib/ $i  0.30s user 0.67s system 0% cpu 1:57.23 total
parallel.sh /usr/lib/ $i  0.28s user 0.69s system 1% cpu 1:09.58 total
parallel.sh /usr/lib/ $i  0.19s user 0.80s system 1% cpu 1:05.29 total
parallel.sh /usr/lib/ $i  0.29s user 0.73s system 2% cpu 44.417 total
parallel.sh /usr/lib/ $i  0.25s user 0.80s system 2% cpu 37.353 total
parallel.sh /usr/lib/ $i  0.21s user 0.85s system 3% cpu 32.354 total
parallel.sh /usr/lib/ $i  0.30s user 0.82s system 3% cpu 28.542 total
parallel.sh /usr/lib/ $i  0.27s user 0.88s system 3% cpu 30.219 total
parallel.sh /usr/lib/ $i  0.34s user 0.84s system 4% cpu 26.535 total
kouk
sumber
0

Kecuali Anda dapat memperkirakan berapa lama file input tertentu akan diproses dan proses pekerja tidak memiliki cara untuk melaporkan kembali ke penjadwal (seperti yang mereka lakukan dalam skenario komputasi paralel paralel - biasanya melalui MPI ), Anda umumnya kurang beruntung - membayar denda dari beberapa pekerja yang memproses input lebih lama dari yang lain (karena ketidaksetaraan input), atau membayar denda karena menelurkan satu proses baru untuk setiap file input.

peterph
sumber
0

GNU Parallel telah berubah dalam 7 tahun terakhir. Jadi hari ini dapat melakukannya:

Contoh ini menunjukkan bahwa lebih banyak blok diberikan pada proses 11 dan 10 daripada proses 4 dan 5 karena 4 dan 5 dibaca lebih lambat:

seq 1000000 |
  parallel -j8 --tag --roundrobin --pipe --block 1k 'pv -qL {}0000 | wc' ::: 11 4 5 6 9 8 7 10
Ole Tange
sumber