Menggabungkan input dari banyak file / pipa tanpa mengganggu atau memblokir?

9

Apakah ada alat yang akan mengambil input dari banyak file atau pipa dan menulisnya ke stdout, tanpa memblokir bacaan, sehingga masing-masing jalur input keluar utuh? Saya pada dasarnya ingin multiplex banyak input ke satu output tanpa garis clobbering.

$ combine file1 <(prog2) ... > nice-output.txt
  1. Saya tidak peduli dengan urutan output
  2. Seharusnya tidak memblokir selama beberapa input memiliki data
  3. Itu harus efisien (yaitu, saya dapat meng-downvote Perl Anda satu-liner;)
Jay Hacker
sumber

Jawaban:

4

Anda harus dapat melakukan ini dengan multitailcukup mudah.

Caleb
sumber
1
Bisakah Anda menyarankan argumen apa yang akan saya gunakan dengan multitail? Tampaknya tidak memiliki mode non-interaktif, hang mencoba menulis ke stdout, dan crash membaca dari sebuah pipa.
Jay Hacker
Mulailah dengan -Luntuk menjalankan perintah dan menggabungkan output dengan aliran saat ini dan -auntuk menulis output ke file. Saya akan mencari lagi besok. Jika Anda memberikan contoh yang lebih terperinci, saya akan mencoba mengerjakannya.
Caleb
4

Jika proses menulis baris dalam satu writepanggilan, yang membutuhkan proses untuk menggunakan buffer garis (biasanya dimatikan jika output standar mereka bukan terminal), Anda bisa mengarahkan semuanya ke pipa.

{ { sleep .1; echo one; sleep .1; echo two; } &
  { echo hello; sleep .15; echo world; };
  wait; } | cat

Jika proses hanya melakukan buffering baris saat menulis ke terminal, cara mudah adalah menggunakan script. Agak canggung: hanya bisa menulis ke file.

script -q -c '
    { { sleep .1; echo one; sleep .1; echo two; } &
      { echo hello; sleep .15; echo world; };
      wait; }'
tail -n +2 typescript

Jika program menulis garis panjang atau tidak menggunakan buffer garis, pendekatan ini tidak akan berhasil. Anda akan memerlukan program kolektor yang membaca dan menyangga baris dari setiap input secara terpisah dan melakukan sinkronisasi pada akhir baris. Tidak ada utilitas standar dengan fungsi ini. Saya menyarankan saran Calebmultitail kedua .

Berikut ini adalah skrip Python yang membaca baris yang dihasilkan oleh beberapa perintah dan meludahkannya pada output standar, tanpa memecah baris. Saya belum mengujinya banyak, jadi pengguna peringatan. Saya belum membandingkannya sama sekali.

#!/usr/bin/env python
import Queue, itertools, os, subprocess, sys, threading
# Queue of (producer_id, line). line==None indicates the end of a producer.
lq = Queue.Queue()

# Line producer
def run_task(i, cmd):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    line = p.stdout.readline()
    while line <> "":
        lq.put((i, line))
        line = p.stdout.readline()
    lq.put((i, None))

# Start a producer for each command passed as an argument
for i in range(1,len(sys.argv)):
    threading.Thread(target=run_task, args=(i, sys.argv[i])).start()
sources = len(sys.argv) - 1
# Consumer: print lines as they come in, until no producer is left.
while sources > 0:
    (k, line) = lq.get()
    if line == None: sources -= 1
    else: sys.stdout.write(str(k) + ":" + line)

Penggunaan sampel:

./collect.py 'sleep 1; ls /; sleep 1; ls /' \
             '/bin/echo -n foo; sleep 1; /bin/echo -n bar; sleep 1; /bin/echo qux'
Gilles 'SANGAT berhenti menjadi jahat'
sumber
1

Ya multitail tampaknya terikat pada gagasan "jendela" sebagai bagian dari terminal; Saya tidak bisa membuatnya bermain bagus sebagai komponen pipa.

Jadi terlihat seperti kita hafta lakukan sendiri ini retak buku-buku jari

/* Copyright © 2015 [email protected]
** Use/modify as you see fit but leave this attribution.
** If you change the interface and want to distribute the
** result please change the binary name too! */
#include <err.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>

/* typedefs are for pussies */
struct {
    char *filename; /* for clarity of errors */
    char *data;
    long len;
    long cap;
} saved[FD_SETSIZE] = {0};

void
ewriten(int fd, char *buf, int n)
{
    int done = 0, c;
    while (done < n) {
        if ((c=write(fd, buf + done, n - done)) <= 0 && errno != EINTR) {
            err(1, "write");
        }
        done += c;
    }
}

int
empty(fd_set *fdset, int maxfd)
{
    int i;
    for (i=0; i <= maxfd; i++) {
        if (FD_ISSET(i, fdset)) return 0;
    }
    return 1;
}

void
combine(fd_set *fdset, int maxfd)
{
    char buf[4096], *cp;
    fd_set ready;
    int n, i, fd, left;
    while (!empty(fdset, maxfd)) {
        ready = *fdset;
        /* timeouts are for pussies */
        if (select(maxfd + 1, &ready, NULL, NULL, NULL) == -1) err(1, "select");
        for (fd=0; fd <= maxfd; fd++) {
            if (!FD_ISSET(fd, &ready)) continue;

            switch (n=read(fd, &buf, sizeof(buf))) {
            case -1:
                if (errno == EINTR)
                    break; /* ignore interrupts; we'll re-read next iteration */
                if (saved[fd].filename) err(1, "read: %s", saved[fd].filename);
                err(1, "read: %d", fd);
            case 0:
                if (saved[fd].len > 0) {
                    /* someone forgot their newline at EOF... */
                    ewriten(1, saved[fd].data, saved[fd].len);
                    saved[fd].data[0] = '\n'; /* put it back for them */
                    ewriten(1, saved[fd].data, 1);
                }
                free(saved[fd].data);
                FD_CLR(fd, fdset);
                break;
            default:
                for (cp=buf + n - 1; cp >= buf && *cp != '\n'; cp--); /* find last newline */
                left = n - (cp - buf + 1);
                if (cp >= buf) {
                    /* we found one! first dump any saved data from the last read */
                    if (saved[fd].len > 0) {
                        ewriten(1, saved[fd].data, saved[fd].len);
                        saved[fd].len = 0;
                    }
                    ewriten(1, buf, cp - buf + 1);
                }
                if (left > 0) {
                    /* now save any leftover data for later */
                    int need = saved[fd].len + left;
                    if (saved[fd].cap < need &&
                       (saved[fd].data=realloc(saved[fd].data, need)) == NULL) {
                        errx(1, "realloc: failed on %d bytes", need);
                        /* it was good enough for quake... */
                    }
                    saved[fd].cap = need;
                    memcpy(saved[fd].data + saved[fd].len, buf + n - 1 - left, left);
                    saved[fd].len += left;
                }
            }
        }
    }
}

void
addfd(int fd, fd_set *fdset, int *maxfd)
{
    FD_SET(fd, fdset);
    if (*maxfd < fd) {
        *maxfd = fd;
    }
}

int
main(int argc, char **argv)
{
    fd_set fdset;
    char **arg = argv + 1;
    char *cp;
    struct stat st;
    int fd, maxfd = -1;
    FD_ZERO(&fdset);
    while (*arg != NULL) {
        /* getopt is for pussies */
        if (strncmp("-u", *arg, 2) == 0) {
            *arg += 2;
            if (**arg == '\0' && *++arg == NULL ) errx(1, "-u requires argument (comma separated FD list)");
            /* reentrancy is for pussies */
            for (cp=strtok(*arg, ","); cp != NULL; cp=strtok(NULL, ",")) {
                fd = atoi(cp);
                if (fstat(fd, &st) != 0) err(1, "%d", fd);
                addfd(fd, &fdset, &maxfd);
            }
            arg++;
        } else if (strcmp("-", *arg) == 0) {
            if (fstat(0, &st) != 0) err(1, "stdin", fd);
            addfd(0, &fdset, &maxfd);
            saved[0].filename = "stdin";
            arg++;
        } else if (strcmp("--", *arg) == 0) {
            arg++;
            break;
        } else if (**arg == '-') {
            errx(1, "unrecognized argument %s", *arg);
        } else {
            break; /* treat as filename */
        }
    }
    /* remaining args are filenames */
    for (; *arg != NULL; arg++) {
        /* stdio is for pussies */
        if ((fd=open(*arg, O_RDONLY)) == -1) err(1, "open: %s", *arg);
        addfd(fd, &fdset, &maxfd);
        saved[fd].filename = *arg;
    }
    combine(&fdset, maxfd);
    return 0;
}

Ahhh itu terasa enak.

(catatan: ini diuji pada sekitar dua set input. bug mungkin ada atau tidak ada)

sqweek
sumber