Kesalahan penanganan dengan stream node.js

164

Apa cara yang benar untuk menangani kesalahan dengan streaming? Saya sudah tahu ada acara 'kesalahan' yang bisa Anda dengarkan, tetapi saya ingin tahu lebih banyak detail tentang situasi rumit yang sewenang-wenang.

Sebagai permulaan, apa yang Anda lakukan ketika ingin melakukan rantai pipa sederhana:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

Dan bagaimana Anda benar membuat salah satu dari transformasi itu sehingga kesalahan ditangani dengan benar?

Pertanyaan terkait lainnya:

  • ketika kesalahan terjadi, apa yang terjadi pada acara 'akhir'? Apakah tidak pernah dipecat? Apakah kadang-kadang dipecat? Apakah ini tergantung pada transformasi / aliran? Apa standarnya di sini?
  • apakah ada mekanisme untuk menyebarkan kesalahan melalui pipa?
  • apakah domain menyelesaikan masalah ini secara efektif? Contohnya akan menyenangkan.
  • apakah kesalahan yang keluar dari peristiwa 'kesalahan' memiliki jejak tumpukan? Terkadang? Tidak pernah? apakah ada cara untuk mendapatkannya dari mereka?
BT
sumber
1
Ini bukan hal sepele. Promisekerangka kerja membuatnya lebih sederhana
slezica
27
Sayangnya janji / masa depan tidak bisa membantu Anda dengan streaming ...
BT

Jawaban:

222

mengubah

Transform stream dapat dibaca dan ditulis, dan karenanya benar-benar bagus 'stream tengah'. Karena alasan ini, mereka kadang-kadang disebut sebagai throughstream. Mereka mirip dengan aliran duplex dengan cara ini, kecuali mereka menyediakan antarmuka yang bagus untuk memanipulasi data daripada hanya mengirimkannya. Tujuan dari aliran transformasi adalah untuk memanipulasi data saat disalurkan melalui aliran. Anda mungkin ingin melakukan beberapa panggilan async, misalnya, atau menurunkan beberapa bidang, memetakan beberapa hal, dll.


Di mana Anda dapat menempatkan aliran transformasi


Untuk cara membuat aliran transformasi lihat di sini dan di sini . Yang harus Anda lakukan adalah:

  1. termasuk modul stream
  2. instantiate (atau mewarisi dari) kelas Transform
  3. menerapkan _transformmetode yang membutuhkan a (chunk, encoding, callback).

Potongan adalah data Anda. Sebagian besar waktu Anda tidak perlu khawatir tentang penyandian jika Anda bekerja objectMode = true. Callback dipanggil saat Anda selesai memproses chunk. Potongan ini kemudian didorong ke aliran berikutnya.

Jika Anda menginginkan modul pembantu yang bagus yang akan memungkinkan Anda melakukan melalui aliran dengan sangat mudah, saya sarankan through2 .

Untuk penanganan kesalahan, terus membaca.

pipa

Dalam rantai pipa, penanganan kesalahan memang tidak mudah. Menurut utas ini .pipe () tidak dibangun untuk meneruskan kesalahan. Jadi sesuatu seperti ...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

... hanya akan mendengarkan kesalahan pada aliran c. Jika acara kesalahan dipancarkan a, itu tidak akan diturunkan dan, pada kenyataannya, akan melempar. Untuk melakukan ini dengan benar:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

Sekarang, meskipun cara kedua lebih bertele-tele, Anda setidaknya bisa menjaga konteks di mana kesalahan Anda terjadi. Ini biasanya hal yang baik.

Satu perpustakaan saya menemukan membantu jika Anda memiliki kasus di mana Anda hanya ingin menangkap kesalahan di tempat tujuan dan Anda tidak begitu peduli tentang di mana itu terjadi adalah event-stream .

akhir

Ketika acara kesalahan dipecat, acara akhir tidak akan dipecat (secara eksplisit). Memancarkan peristiwa kesalahan akan mengakhiri aliran.

domain

Dalam pengalaman saya, sebagian besar domain berfungsi dengan sangat baik. Jika Anda memiliki peristiwa kesalahan yang tidak ditangani (yaitu memancarkan kesalahan pada aliran tanpa pendengar), server dapat macet. Sekarang, seperti yang ditunjukkan artikel di atas, Anda dapat membungkus aliran dalam domain yang seharusnya menangkap semua kesalahan.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

Keindahan domain adalah bahwa mereka akan melestarikan jejak tumpukan. Meskipun event-stream melakukan pekerjaan dengan baik juga.

Untuk bacaan lebih lanjut, lihat stream-handbook . Cukup mendalam, tetapi sangat berguna dan memberikan beberapa tautan bagus ke banyak modul bermanfaat.

mshell_lauren
sumber
Itu info yang sangat bagus, terima kasih! Bisakah Anda menambahkan sedikit tentang mengapa Anda ingin membuat aliran transformasi dan mengapa itu terkait dengan pertanyaan saya?
BT
Tentu - meskipun saya pikir itu terkait karena Anda bertanya tentang hal itu; )
mshell_lauren
1
Poskan ini oleh isaccs di Google Groups- nodejs: groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ (bukan grokbase)
jpillora
Jawaban ini ditulis dengan sempurna. Saya akan menyelidiki saran domain - tampaknya merupakan solusi yang saya cari.
Titik koma
12
Perhatikan bahwa Anda tidak perlu membungkus .on('error')pawang dalam fungsi anonim yaitu a.on('error', function(e){handleError(e)})hanya bisaa.on('error', handleError)
timoxley
28

Jika Anda menggunakan simpul> = v10.0.0 Anda dapat menggunakan stream.pipeline dan stream.finished .

Sebagai contoh:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

Lihat PR github ini untuk diskusi lebih lanjut.

shusson
sumber
1
Mengapa Anda menggunakan finished, ketika pipelinesudah memiliki panggilan balik?
Marcos Pereira
4
Anda mungkin ingin menangani kesalahan secara berbeda antara pipa dan aliran individual.
shusson
25

domain tidak digunakan lagi. kamu tidak membutuhkannya.

untuk pertanyaan ini, perbedaan antara transformasi atau tulisan tidak begitu penting.

Jawaban mshell_lauren bagus, tetapi sebagai alternatif Anda juga dapat secara eksplisit mendengarkan acara kesalahan pada setiap aliran yang menurut Anda mungkin salah. dan gunakan kembali fungsi penangan jika Anda mau.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

melakukan hal itu untuk mencegah pengecualian yang tidak tertangkap yang terkenal itu jika salah satu dari mereka menembakkan kesalahannya

Bent Cardan
sumber
3
lol bersenang-senang menangani 3 peristiwa kesalahan yang berbeda dan berdoa agar siapa pun yang menulis 3 streaming lib yang berbeda menerapkan penanganan kesalahan dengan benar
Alexander Mills
4
@Alex Mills 1) Apa masalah penanganan 3 peristiwa, dan mengapa mereka "berbeda", ketika tipenya sama - error, orang mungkin juga puas dengan fakta bahwa setiap peristiwa berbeda; 2) streaming streaming apa yang ditulis di atas, selain fungsionalitas Node.js asli? dan 3) mengapa tidak masalah bagaimana mereka menangani acara secara internal, ketika ini jelas memungkinkan siapa pun untuk melampirkan penangan kesalahan tambahan di atas apa pun yang sudah ada di sana?
amn
10

Kesalahan dari seluruh rantai dapat diperbanyak ke aliran paling kanan menggunakan fungsi sederhana:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

yang bisa digunakan seperti:

safePipe(readable, [ transform1, transform2, ... ]);
Gleba
sumber
5

.on("error", handler)hanya menangani kesalahan Stream tetapi jika Anda menggunakan Transform stream kustom, .on("error", handler)jangan tangkap kesalahan yang terjadi di dalam _transformfungsi. Jadi orang dapat melakukan sesuatu seperti ini untuk mengendalikan aliran aplikasi: -

thiskata kunci dalam _transformfungsi mengacu pada Streamdirinya sendiri, yang merupakan EventEmitter. Jadi Anda dapat menggunakan try catchseperti di bawah ini untuk menangkap kesalahan dan kemudian meneruskannya ke penangan acara khusus.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

Dengan cara ini, Anda dapat menjaga logika dan penangan kesalahan Anda terpisah. Anda juga dapat memilih untuk menangani hanya beberapa kesalahan dan mengabaikan yang lain.

UPDATE
Alternatif: RXJS diamati

Vikas Gautam
sumber
4

Gunakan paket multipipe untuk menggabungkan beberapa aliran menjadi satu aliran dupleks. Dan menangani kesalahan di satu tempat.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
Sergey Savenko
sumber
1

Gunakan pola Node.js dengan membuat mekanisme Transform stream dan memanggil panggilan baliknya donedengan argumen untuk menyebarkan kesalahan:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
Derek
sumber
Hmm, jadi Anda mengatakan jika semua prosesor aliran dibangun seperti ini, kesalahan akan menyebar?
BT
-2

Coba tangkap tidak akan menangkap kesalahan yang terjadi di aliran karena karena mereka dilemparkan setelah kode panggilan sudah keluar. Anda dapat merujuk ke dokumentasi:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

Mehran
sumber
Terima kasih, tetapi ini tidak menjawab pertanyaan sama sekali.
BT
Memberi saya dokumen 40 halaman tidak membantu. Menurut Anda apa yang harus saya rujuk di halaman raksasa itu? Juga, sudahkah Anda membaca pertanyaan saya? Pertanyaan saya bukan "apakah mencoba menangkap pekerjaan dengan aliran?" Saya sudah sangat sadar bahwa try-catch tidak akan berfungsi dengan kesalahan asinkron, misalnya yang dari jalur pemroses aliran.
BT