Mengurai file log besar di Node.js - baca baris demi baris

126

Saya perlu melakukan beberapa parsing file log besar (5-10 Gb) di Javascript / Node.js (Saya menggunakan Cube).

Logline terlihat seperti ini:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Kita perlu membaca setiap baris, melakukan beberapa parsing (misalnya strip out 5, 7dan SUCCESS), lalu memompa data ini ke dalam Cube ( https://github.com/square/cube ) menggunakan klien JS mereka.

Pertama, apa cara kanonik di Node untuk membaca dalam file, baris demi baris?

Tampaknya pertanyaan online yang cukup umum:

Banyak jawaban yang tampaknya mengarah ke sekumpulan modul pihak ketiga:

Namun, ini tampak seperti tugas yang cukup mendasar - tentunya, ada cara sederhana dalam stdlib untuk membaca dalam file teks, baris demi baris?

Kedua, saya kemudian perlu memproses setiap baris (misalnya mengubah cap waktu menjadi objek Tanggal, dan mengekstrak bidang yang berguna).

Apa cara terbaik untuk melakukan ini, memaksimalkan hasil? Adakah cara yang tidak akan memblokir pembacaan di setiap baris, atau saat mengirimkannya ke Cube?

Ketiga - Saya menebak menggunakan pemisahan string, dan JS yang setara dengan berisi (IndexOf! = -1?) Akan jauh lebih cepat daripada regex? Adakah yang punya banyak pengalaman dalam mengurai sejumlah besar data teks di Node.js?

Cheers, Victor

victorhooi
sumber
Saya membangun parser log di node yang mengambil banyak string regex dengan 'menangkap' bawaan dan output ke JSON. Anda bahkan dapat memanggil fungsi pada setiap pengambilan jika Anda ingin menghitung. Itu mungkin melakukan apa yang Anda inginkan: npmjs.org/package/logax
Jess

Jawaban:

209

Saya mencari solusi untuk mengurai file yang sangat besar (gbs) baris demi baris menggunakan aliran. Semua pustaka dan contoh pihak ketiga tidak sesuai dengan kebutuhan saya karena mereka memproses file tidak baris demi baris (seperti 1, 2, 3, 4 ..) atau membaca seluruh file ke memori

Solusi berikut dapat mengurai file yang sangat besar, baris demi baris menggunakan aliran & pipa. Untuk pengujian saya menggunakan file 2.1 gb dengan 17.000.000 catatan. Penggunaan ram tidak melebihi 60 mb.

Pertama, instal paket aliran acara :

npm install event-stream

Kemudian:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

masukkan deskripsi gambar di sini

Tolong beritahu saya bagaimana kelanjutannya!

Gerard
sumber
6
FYI, kode ini tidak sinkron. Ini asinkron. Jika Anda memasukkan console.log(lineNr)setelah baris terakhir kode Anda, itu tidak akan menampilkan jumlah baris terakhir karena file dibaca secara asynchronous.
jfriend00
4
Terima kasih, ini adalah satu-satunya solusi yang dapat saya temukan yang benar-benar berhenti dan dilanjutkan ketika seharusnya. Readline tidak.
Brent
3
Contoh yang luar biasa, dan itu benar-benar berhenti. Selain itu, jika Anda memutuskan untuk menghentikan pembacaan file lebih awal, Anda dapat menggunakans.end();
zipzit
2
Bekerja seperti pesona. Digunakan untuk mengindeks 150 juta dokumen ke indeks elasticsearch. readlinemodul adalah rasa sakit. Itu tidak berhenti dan selalu menyebabkan kegagalan setelah 40-50 juta. Membuang satu hari. Terima kasih banyak atas jawabannya. Yang ini bekerja dengan sempurna
Mandeep Singh
3
aliran acara telah disusupi: medium.com/intrinsic/… tetapi 4+ tampaknya aman blog.npmjs.org/post/180565383195/…
John Vandivier
72

Anda dapat menggunakan readlinepaket bawaan , lihat dokumen di sini . Saya menggunakan aliran untuk membuat aliran keluaran baru.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

File besar akan membutuhkan waktu untuk diproses. Beri tahu apakah itu berhasil.

pengguna568109
sumber
2
Seperti yang tertulis, baris kedua hingga terakhir gagal karena kubus tidak ditentukan.
Greg
2
Dengan menggunakan readline, apakah mungkin untuk menjeda / melanjutkan aliran baca untuk melakukan tindakan asinkron di area "lakukan barang"?
jchook
1
@jchook readlinememberi saya banyak masalah ketika saya mencoba jeda / melanjutkan. Itu tidak menghentikan aliran dengan benar menciptakan banyak masalah jika proses hilir lebih lambat
Mandeep Singh
31

Saya sangat menyukai jawaban @gerard yang sebenarnya pantas menjadi jawaban yang benar di sini. Saya membuat beberapa perbaikan:

  • Kode ada di kelas (modular)
  • Parsing disertakan
  • Kemampuan untuk melanjutkan diberikan ke luar jika ada pekerjaan asinkron yang dirantai untuk membaca CSV seperti memasukkan ke DB, atau permintaan HTTP
  • Membaca dalam ukuran potongan / batch yang dapat dideklarasikan oleh pengguna. Saya juga menangani pengkodean dalam aliran, jika Anda memiliki file dalam pengkodean yang berbeda.

Berikut kodenya:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Jadi pada dasarnya, inilah cara Anda menggunakannya:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

Saya menguji ini dengan file CSV 35GB dan itu berhasil untuk saya dan itulah mengapa saya memilih untuk membangunnya berdasarkan jawaban @gerard , umpan balik disambut.

ambodi
sumber
berapa lama waktu yang dibutuhkan?
Z. Khullah
Rupanya, ini tidak ada pause()panggilan, bukan?
Vanuan
Selain itu, ini tidak memanggil fungsi panggilan balik. Jadi jika batchSize adalah 100, ukuran file adalah 150, hanya 100 item yang akan diproses. Apakah aku salah?
Vanuan
16

Saya menggunakan https://www.npmjs.com/package/line-by-line untuk membaca lebih dari 1.000.000 baris dari file teks. Dalam hal ini, kapasitas RAM yang terisi sekitar 50-60 megabyte.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Eugene Ilyushin
sumber
'baris demi baris' lebih hemat memori daripada jawaban yang dipilih. Untuk 1 juta baris dalam csv, jawaban yang dipilih memiliki proses node saya di 800s megabyte yang rendah. Menggunakan 'baris demi baris' itu secara konsisten di 700-an rendah. Modul ini juga menjaga kode tetap bersih dan mudah dibaca. Secara total, saya perlu membaca sekitar 18 juta jadi setiap mb berarti!
Neo
sayang sekali ini menggunakan event 'line' itu sendiri dan bukan 'chunk' standar, yang berarti Anda tidak akan bisa menggunakan 'pipe'.
Rene Wooller
Setelah berjam-jam pengujian dan pencarian, ini adalah satu-satunya solusi yang benar-benar berhenti pada lr.cancel()metode. Membaca 1000 baris pertama dari file 5Gig dalam 1ms. Luar biasa !!!!
Perez Lamed van Niekerk
6

Selain membaca file besar baris demi baris, Anda juga dapat membacanya potongan demi potongan. Untuk lebih lanjut lihat artikel ini

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Kris Roofe
sumber
Mungkinkah, bahwa berikut harus menjadi perbandingan bukan tugas: if(bytesRead = chunkSize)?
Stefan Rein
4

Dokumentasi Node.js menawarkan contoh yang sangat elegan menggunakan modul Readline.

Contoh: Baca Aliran File Baris demi Baris

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Catatan: kami menggunakan opsi crlfDelay untuk mengenali semua contoh CR LF ('\ r \ n') sebagai satu baris baru.

Jaime Gómez
sumber
3

Saya memiliki masalah yang sama. Setelah membandingkan beberapa modul yang tampaknya memiliki fitur ini, saya memutuskan untuk melakukannya sendiri, ini lebih sederhana dari yang saya kira.

intinya: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Ini mencakup file yang dibuka dalam penutupan, yang fetchBlock()dikembalikan akan mengambil blok dari file, dan berakhir dibagi menjadi array (akan menangani segmen dari pengambilan terakhir).

Saya telah mengatur ukuran blok menjadi 1024 untuk setiap operasi baca. Ini mungkin memiliki bug, tetapi logika kodenya jelas, coba sendiri.

deemstone
sumber
2

node-byline menggunakan aliran, jadi saya lebih suka yang itu untuk file besar Anda.

untuk konversi tanggal Anda, saya akan menggunakan moment.js .

untuk memaksimalkan throughput Anda, Anda dapat memikirkan tentang menggunakan cluster perangkat lunak. ada beberapa nice-modules yang membungkus cluster-module node-native dengan cukup baik. saya suka cluster-master dari isaacs. misalnya Anda bisa membuat sebuah cluster x pekerja yang semuanya menghitung sebuah file.

untuk benchmarking split vs regexes gunakan benchmark.js . saya belum mengujinya sampai sekarang. benchmark.js tersedia sebagai modul-node

di sini dan sekarang78
sumber
2

Berdasarkan jawaban pertanyaan ini , saya menerapkan kelas yang dapat Anda gunakan untuk membaca file secara sinkron baris demi baris fs.readSync(). Anda bisa membuat ini "pause" dan "resume" dengan menggunakan sebuah Qpromise ( jQuerysepertinya membutuhkan DOM jadi tidak bisa menjalankannya dengan nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
Benvorth
sumber
0
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
  [s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
  protected file: string;
  protected csvOptions = {
    delimiter: ',',
    headers: true,
    ignoreEmpty: true,
    trim: true
  };
  constructor(file: string, csvOptions = {}) {
    if (!fs.existsSync(file)) {
      throw new Error(`File ${file} not found.`);
    }
    this.file = file;
    this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
  }
  public read(callback: RowCallBack): Promise < Array < object >> {
    return new Promise < Array < object >> (resolve => {
      const readStream = fs.createReadStream(this.file);
      const results: Array < any > = [];
      let index = 0;
      const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
        index++;
        results.push(await callback(data, index));
      }).on('error', (err: Error) => {
        console.error(err.message);
        throw err;
      }).on('end', () => {
        resolve(results);
      });
      readStream.pipe(csvStream);
    });
  }
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
  const reader = new CSVReader('./database/migrations/csv/users.csv');
  const users = await reader.read(async data => {
    return {
      username: data.username,
      name: data.name,
      email: data.email,
      cellPhone: data.cell_phone,
      homePhone: data.home_phone,
      roleId: data.role_id,
      description: data.description,
      state: data.state,
    };
  });
  console.log(users);
})();
Raza
sumber
-1

Saya telah membuat modul node untuk membaca file besar teks asinkron atau JSON. Diuji pada file besar.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Simpan saja file tersebut sebagai file-reader.js, dan gunakan seperti ini:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Eyal Zoref
sumber
7
Sepertinya Anda menyalin dari jawaban Gerard. Anda harus memberikan kredit kepada Gerard untuk bagian yang Anda salin.
Paul Lynch