Pertanyaan menarik, saya menghabiskan beberapa waktu melihat kode untuk detailnya dan inilah pemikiran saya. Pemisahan ditangani oleh klien oleh InputFormat.getSplits
, jadi lihat FileInputFormat memberikan info berikut:
- Untuk setiap file input, dapatkan panjang file, ukuran blok dan menghitung ukuran perpecahan sebagai
max(minSize, min(maxSize, blockSize))
mana maxSize
dapat disamakan dengan mapred.max.split.size
dan minSize
adalah mapred.min.split.size
.
Bagilah file menjadi beberapa FileSplit
s berdasarkan ukuran pemisahan yang dihitung di atas. Yang penting di sini adalah masingFileSplit
start
- masing diinisialisasi dengan parameter yang sesuai dengan offset dalam file masukan . Masih belum ada penanganan jalur pada saat itu. Bagian yang relevan dari kode tersebut terlihat seperti ini:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Setelah itu, jika Anda melihat LineRecordReader
yang didefinisikan oleh TextInputFormat
, di situlah garis ditangani:
- Ketika Anda menginisialisasi Anda
LineRecordReader
mencoba untuk membuat contoh LineReader
yang merupakan abstraksi untuk dapat membaca baris FSDataInputStream
. Ada 2 kasus:
- Jika ada yang
CompressionCodec
ditentukan, maka codec ini bertanggung jawab untuk menangani batas-batas. Mungkin tidak relevan dengan pertanyaan Anda.
Namun, jika tidak ada codec, di situlah hal-hal yang menarik: jika start
Anda InputSplit
berbeda dari 0, maka Anda mundur 1 karakter dan kemudian lewati baris pertama yang Anda temukan yang diidentifikasi oleh \ n atau \ r \ n (Windows) ! Pelacakan mundur penting karena jika batas garis Anda sama dengan batas terpisah, ini memastikan Anda tidak melewati garis yang valid. Berikut adalah kode yang relevan:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Jadi karena pemisahan dihitung di klien, para pembuat peta tidak perlu berjalan secara berurutan, setiap pembuat peta sudah tahu apakah perlu membuang baris pertama atau tidak.
Jadi pada dasarnya jika Anda memiliki 2 baris masing-masing 100Mb dalam file yang sama, dan untuk menyederhanakan katakanlah ukuran terbagi adalah 64Mb. Kemudian ketika input split dihitung, kita akan memiliki skenario sebagai berikut:
- Pisahkan 1 yang berisi jalur dan host ke blok ini. Diinisialisasi pada awal 200-200 = 0Mb, panjang 64Mb.
- Split 2 diinisialisasi pada awal 200-200 + 64 = 64Mb, panjang 64Mb.
- Split 3 diinisialisasi pada awal 200-200 + 128 = 128Mb, panjang 64Mb.
- Pisahkan 4 diinisialisasi di awal 200-200 + 192 = 192Mb, panjang 8Mb.
- Mapper A akan memproses split 1, start adalah 0 jadi jangan lewati baris pertama, dan baca baris lengkap yang melampaui batas 64Mb sehingga perlu dibaca dari jarak jauh.
- Mapper B akan memproses split 2, start is! = 0 jadi lewati baris pertama setelah 64Mb-1byte, yang sesuai dengan akhir baris 1 di 100Mb yang masih di split 2, kita punya 28Mb baris di split 2, jadi remote membaca 72Mb tersisa.
- Mapper C akan memproses pembagian 3, mulai adalah! = 0 jadi lewati baris pertama setelah 128Mb-1byte, yang sesuai dengan akhir baris 2 pada 200Mb, yang merupakan akhir file jadi jangan lakukan apa pun.
- Mapper D sama dengan mapper C kecuali ia mencari baris baru setelah 192Mb-1byte.
LineReader.readLine
fungsi, saya tidak berpikir itu relevan dengan pertanyaan Anda tetapi dapat menambahkan lebih banyak detail jika diperlukan.\r\n, \n
merepresentasikan pemotongan record)?Algoritma Map Reduce tidak bekerja pada blok fisik file. Ia bekerja pada input split logis. Pembagian input tergantung di mana catatan itu ditulis. Sebuah rekor bisa menjangkau dua Mappers.
Cara HDFS diatur, ia memecah file yang sangat besar menjadi blok-blok besar (misalnya, berukuran 128MB), dan menyimpan tiga salinan dari blok-blok ini pada node yang berbeda dalam cluster.
HDFS tidak mengetahui konten file-file ini. Rekaman mungkin telah dimulai di Blok-a tetapi akhir catatan itu mungkin ada di Blok-b .
Untuk mengatasi masalah ini, Hadoop menggunakan representasi logis dari data yang disimpan dalam blok file, yang dikenal sebagai input split. Ketika klien pekerjaan MapReduce menghitung perpecahan input , ia mencari tahu di mana seluruh catatan pertama dalam blok dimulai dan di mana catatan terakhir di blok berakhir .
Poin utamanya:
Dalam kasus di mana record terakhir dalam blok tidak lengkap, input split mencakup informasi lokasi untuk blok berikutnya dan offset byte dari data yang diperlukan untuk menyelesaikan record.
Lihat diagram di bawah ini.
Lihat artikel ini dan pertanyaan SE terkait: Tentang pemecahan file Hadoop / HDFS
Lebih jelasnya bisa dibaca dari dokumentasi
Framework Map-Reduce mengandalkan InputFormat tugas untuk:
InputSplit[] getSplits(JobConf job,int numSplits
) adalah API untuk menangani hal-hal ini.FileInputFormat , yang memperluas metode
InputFormat
implementasikangetSplits
(). Lihat internal metode ini di grepcodesumber
Saya melihatnya sebagai berikut: InputFormat bertanggung jawab untuk membagi data menjadi beberapa pembagian logis dengan mempertimbangkan sifat data.
Tidak ada yang mencegahnya untuk melakukannya, meskipun ini dapat menambahkan latensi yang signifikan ke pekerjaan - semua logika dan pembacaan di sekitar batas ukuran terpisah yang diinginkan akan terjadi di pelacak pekerjaan.
Format masukan sadar rekaman paling sederhana adalah TextInputFormat. Ini berfungsi sebagai berikut (sejauh yang saya pahami dari kode) - format input membuat pemisahan berdasarkan ukuran, terlepas dari garisnya, tetapi LineRecordReader selalu:
a) Lewati baris pertama dalam pemisahan (atau bagiannya), jika tidak pemisahan pertama
b) Baca satu baris setelah batas pemisahan di bagian akhir (jika datanya tersedia, jadi bukan pemisahan terakhir).
sumber
Skip first line in the split (or part of it), if it is not the first split
- jika record pertama di blok bukan pertama selesai, maka tidak yakin bagaimana logika ini akan bekerja.Dari apa yang saya pahami, ketika
FileSplit
diinisialisasi untuk blok pertama, konstruktor default dipanggil. Oleh karena itu, nilai awal dan panjang awalnya nol. Pada akhir proses blok pertama, jika baris terakhir tidak lengkap, maka nilai panjangnya akan lebih besar dari panjang perpecahan dan baris pertama blok berikutnya juga akan terbaca. Oleh karena itu nilai awal untuk blok pertama akan lebih besar dari nol dan dalam kondisi ini,LineRecordReader
akan melewati garis kepalan dari blok kedua. (Lihat sumber )Jika baris terakhir dari balok pertama sudah selesai, maka nilai panjangnya akan sama dengan panjang balok pertama dan nilai awal balok kedua adalah nol. Dalam hal ini,
LineRecordReader
mereka tidak akan melewatkan baris pertama dan membaca blok kedua dari awal.Masuk akal?
sumber
Dari kode sumber hadoop LineRecordReader.java konstruktor: Saya menemukan beberapa komentar:
dari sini saya percaya hadoop akan membaca satu baris tambahan untuk setiap pemisahan (di akhir pemisahan saat ini, baca baris berikutnya di pemisahan berikutnya), dan jika tidak dibagi pertama, baris pertama akan dibuang. sehingga tidak ada catatan garis yang hilang dan tidak lengkap
sumber
Para pembuat peta tidak harus berkomunikasi. Blok file dalam HDFS dan dapatkah mapper saat ini (RecordReader) dapat membaca blok yang memiliki sisa bagian baris. Ini terjadi di balik layar.
sumber