Bagaimana catatan proses Hadoop dipisahkan melintasi batas blok?

119

Menurut Hadoop - The Definitive Guide

Catatan logis yang didefinisikan FileInputFormats biasanya tidak cocok dengan baik ke dalam blok HDFS. Misalnya, catatan logis TextInputFormat adalah garis, yang akan melewati batas HDFS lebih sering daripada tidak. Ini tidak ada hubungannya dengan fungsi program Anda — garis tidak terlewat atau putus, misalnya — tetapi perlu diketahui, karena itu berarti peta data-lokal (yaitu, peta yang berjalan pada host yang sama dengan input data) akan melakukan beberapa pembacaan jarak jauh. Overhead kecil yang disebabkan ini biasanya tidak signifikan.

Misalkan baris rekaman dibagi menjadi dua blok (b1 dan b2). Pemeta yang memproses blok pertama (b1) akan melihat bahwa baris terakhir tidak memiliki pemisah EOL dan mengambil sisa baris dari blok data berikutnya (b2).

Bagaimana pemeta memproses blok kedua (b2) menentukan bahwa catatan pertama tidak lengkap dan harus memproses mulai dari catatan kedua di blok (b2)?

Praveen Sripati
sumber

Jawaban:

160

Pertanyaan menarik, saya menghabiskan beberapa waktu melihat kode untuk detailnya dan inilah pemikiran saya. Pemisahan ditangani oleh klien oleh InputFormat.getSplits, jadi lihat FileInputFormat memberikan info berikut:

  • Untuk setiap file input, dapatkan panjang file, ukuran blok dan menghitung ukuran perpecahan sebagai max(minSize, min(maxSize, blockSize))mana maxSizedapat disamakan dengan mapred.max.split.sizedan minSizeadalah mapred.min.split.size.
  • Bagilah file menjadi beberapa FileSplits berdasarkan ukuran pemisahan yang dihitung di atas. Yang penting di sini adalah masingFileSplitstart - masing diinisialisasi dengan parameter yang sesuai dengan offset dalam file masukan . Masih belum ada penanganan jalur pada saat itu. Bagian yang relevan dari kode tersebut terlihat seperti ini:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Setelah itu, jika Anda melihat LineRecordReaderyang didefinisikan oleh TextInputFormat, di situlah garis ditangani:

  • Ketika Anda menginisialisasi Anda LineRecordReadermencoba untuk membuat contoh LineReaderyang merupakan abstraksi untuk dapat membaca baris FSDataInputStream. Ada 2 kasus:
  • Jika ada yang CompressionCodecditentukan, maka codec ini bertanggung jawab untuk menangani batas-batas. Mungkin tidak relevan dengan pertanyaan Anda.
  • Namun, jika tidak ada codec, di situlah hal-hal yang menarik: jika startAnda InputSplitberbeda dari 0, maka Anda mundur 1 karakter dan kemudian lewati baris pertama yang Anda temukan yang diidentifikasi oleh \ n atau \ r \ n (Windows) ! Pelacakan mundur penting karena jika batas garis Anda sama dengan batas terpisah, ini memastikan Anda tidak melewati garis yang valid. Berikut adalah kode yang relevan:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Jadi karena pemisahan dihitung di klien, para pembuat peta tidak perlu berjalan secara berurutan, setiap pembuat peta sudah tahu apakah perlu membuang baris pertama atau tidak.

Jadi pada dasarnya jika Anda memiliki 2 baris masing-masing 100Mb dalam file yang sama, dan untuk menyederhanakan katakanlah ukuran terbagi adalah 64Mb. Kemudian ketika input split dihitung, kita akan memiliki skenario sebagai berikut:

  • Pisahkan 1 yang berisi jalur dan host ke blok ini. Diinisialisasi pada awal 200-200 = 0Mb, panjang 64Mb.
  • Split 2 diinisialisasi pada awal 200-200 + 64 = 64Mb, panjang 64Mb.
  • Split 3 diinisialisasi pada awal 200-200 + 128 = 128Mb, panjang 64Mb.
  • Pisahkan 4 diinisialisasi di awal 200-200 + 192 = 192Mb, panjang 8Mb.
  • Mapper A akan memproses split 1, start adalah 0 jadi jangan lewati baris pertama, dan baca baris lengkap yang melampaui batas 64Mb sehingga perlu dibaca dari jarak jauh.
  • Mapper B akan memproses split 2, start is! = 0 jadi lewati baris pertama setelah 64Mb-1byte, yang sesuai dengan akhir baris 1 di 100Mb yang masih di split 2, kita punya 28Mb baris di split 2, jadi remote membaca 72Mb tersisa.
  • Mapper C akan memproses pembagian 3, mulai adalah! = 0 jadi lewati baris pertama setelah 128Mb-1byte, yang sesuai dengan akhir baris 2 pada 200Mb, yang merupakan akhir file jadi jangan lakukan apa pun.
  • Mapper D sama dengan mapper C kecuali ia mencari baris baru setelah 192Mb-1byte.
Charles Menguy
sumber
Juga @PraveenSripati perlu disebutkan bahwa kasus tepi di mana batas akan berada di \ r dalam \ r \ n kembali ditangani dalam LineReader.readLinefungsi, saya tidak berpikir itu relevan dengan pertanyaan Anda tetapi dapat menambahkan lebih banyak detail jika diperlukan.
Charles Menguy
Mari kita asumsikan ada dua baris dengan 64MB tepat di masukan dan jadi InputSplits terjadi tepat di batas baris. Jadi, akankah mapper selalu mengabaikan garis pada blok kedua karena start! = 0.
Praveen Sripati
6
@PraveenSripati Dalam hal ini, mapper kedua akan melihat start! = 0, jadi mundurlah 1 karakter, yang membawa Anda kembali tepat sebelum \ n dari baris pertama dan kemudian melompat ke baris berikutnya \ n. Jadi itu akan melewati baris pertama tetapi memproses baris kedua seperti yang diharapkan.
Charles Menguy
@CharlesMenguy apakah mungkin baris pertama file dilewati entah bagaimana? Secara konkret, saya memiliki baris pertama dengan key = 1, dan nilai a, lalu ada dua baris lagi dengan kunci yang sama di suatu tempat di file, key = 1, val = b dan key = 1, val = c. Masalahnya, peredam saya mendapatkan {1, [b, c]} dan {1, [a]}, bukan {1, [a, b, c]}. Ini tidak terjadi jika saya menambahkan baris baru ke awal file saya. Apa alasannya, Pak?
Kobe-Wan Kenobi
@CharlesMenguy Bagaimana jika file pada HDFS adalah file biner (sebagai lawan dari file teks, yang \r\n, \nmerepresentasikan pemotongan record)?
CᴴᴀZ
17

Algoritma Map Reduce tidak bekerja pada blok fisik file. Ia bekerja pada input split logis. Pembagian input tergantung di mana catatan itu ditulis. Sebuah rekor bisa menjangkau dua Mappers.

Cara HDFS diatur, ia memecah file yang sangat besar menjadi blok-blok besar (misalnya, berukuran 128MB), dan menyimpan tiga salinan dari blok-blok ini pada node yang berbeda dalam cluster.

HDFS tidak mengetahui konten file-file ini. Rekaman mungkin telah dimulai di Blok-a tetapi akhir catatan itu mungkin ada di Blok-b .

Untuk mengatasi masalah ini, Hadoop menggunakan representasi logis dari data yang disimpan dalam blok file, yang dikenal sebagai input split. Ketika klien pekerjaan MapReduce menghitung perpecahan input , ia mencari tahu di mana seluruh catatan pertama dalam blok dimulai dan di mana catatan terakhir di blok berakhir .

Poin utamanya:

Dalam kasus di mana record terakhir dalam blok tidak lengkap, input split mencakup informasi lokasi untuk blok berikutnya dan offset byte dari data yang diperlukan untuk menyelesaikan record.

Lihat diagram di bawah ini.

masukkan deskripsi gambar di sini

Lihat artikel ini dan pertanyaan SE terkait: Tentang pemecahan file Hadoop / HDFS

Lebih jelasnya bisa dibaca dari dokumentasi

Framework Map-Reduce mengandalkan InputFormat tugas untuk:

  1. Validasi spesifikasi input pekerjaan.
  2. Pisahkan file masukan menjadi InputSplit logis, yang masing-masing akan ditetapkan ke Pemeta individu.
  3. Setiap InputSplit kemudian ditetapkan ke Pemeta individu untuk diproses. Split bisa jadi tuple . InputSplit[] getSplits(JobConf job,int numSplits) adalah API untuk menangani hal-hal ini.

FileInputFormat , yang memperluas metode InputFormatimplementasikan getSplits(). Lihat internal metode ini di grepcode

Ravindra babu
sumber
7

Saya melihatnya sebagai berikut: InputFormat bertanggung jawab untuk membagi data menjadi beberapa pembagian logis dengan mempertimbangkan sifat data.
Tidak ada yang mencegahnya untuk melakukannya, meskipun ini dapat menambahkan latensi yang signifikan ke pekerjaan - semua logika dan pembacaan di sekitar batas ukuran terpisah yang diinginkan akan terjadi di pelacak pekerjaan.
Format masukan sadar rekaman paling sederhana adalah TextInputFormat. Ini berfungsi sebagai berikut (sejauh yang saya pahami dari kode) - format input membuat pemisahan berdasarkan ukuran, terlepas dari garisnya, tetapi LineRecordReader selalu:
a) Lewati baris pertama dalam pemisahan (atau bagiannya), jika tidak pemisahan pertama
b) Baca satu baris setelah batas pemisahan di bagian akhir (jika datanya tersedia, jadi bukan pemisahan terakhir).

David Gruzman
sumber
Skip first line in the split (or part of it), if it is not the first split- jika record pertama di blok bukan pertama selesai, maka tidak yakin bagaimana logika ini akan bekerja.
Praveen Sripati
Sejauh yang saya lihat kode - setiap perpecahan membaca apa yang dimilikinya + baris berikutnya. Jadi jika jeda baris tidak berada pada batas blok - tidak apa-apa. Bagaimana tepatnya menangani kasus ketika jeda baris tepat di blok terikat - harus dipahami - saya akan membaca kode sedikit lagi
David Gruzman
3

Dari apa yang saya pahami, ketika FileSplitdiinisialisasi untuk blok pertama, konstruktor default dipanggil. Oleh karena itu, nilai awal dan panjang awalnya nol. Pada akhir proses blok pertama, jika baris terakhir tidak lengkap, maka nilai panjangnya akan lebih besar dari panjang perpecahan dan baris pertama blok berikutnya juga akan terbaca. Oleh karena itu nilai awal untuk blok pertama akan lebih besar dari nol dan dalam kondisi ini, LineRecordReaderakan melewati garis kepalan dari blok kedua. (Lihat sumber )

Jika baris terakhir dari balok pertama sudah selesai, maka nilai panjangnya akan sama dengan panjang balok pertama dan nilai awal balok kedua adalah nol. Dalam hal ini, LineRecordReadermereka tidak akan melewatkan baris pertama dan membaca blok kedua dari awal.

Masuk akal?

aa8y
sumber
2
Dalam skenario ini, pembuat peta harus berkomunikasi satu sama lain dan memproses blok secara berurutan ketika baris terakhir di blok tertentu tidak lengkap. Tidak yakin apakah ini cara kerjanya.
Praveen Sripati
1

Dari kode sumber hadoop LineRecordReader.java konstruktor: Saya menemukan beberapa komentar:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

dari sini saya percaya hadoop akan membaca satu baris tambahan untuk setiap pemisahan (di akhir pemisahan saat ini, baca baris berikutnya di pemisahan berikutnya), dan jika tidak dibagi pertama, baris pertama akan dibuang. sehingga tidak ada catatan garis yang hilang dan tidak lengkap

Shenghai.Geng
sumber
0

Para pembuat peta tidak harus berkomunikasi. Blok file dalam HDFS dan dapatkah mapper saat ini (RecordReader) dapat membaca blok yang memiliki sisa bagian baris. Ini terjadi di balik layar.

pengguna3507308
sumber