Merangkai beberapa pekerjaan MapReduce di Hadoop

124

Dalam banyak situasi kehidupan nyata di mana Anda menerapkan MapReduce, algoritme akhir menjadi beberapa langkah MapReduce.

yaitu Map1, Reduce1, Map2, Reduce2, dan seterusnya.

Jadi Anda memiliki keluaran dari pengurangan terakhir yang diperlukan sebagai masukan untuk peta berikutnya.

Data perantara adalah sesuatu yang (secara umum) tidak ingin Anda simpan setelah pipeline berhasil diselesaikan. Juga karena data perantara ini pada umumnya adalah beberapa struktur data (seperti 'peta' atau 'kumpulan'), Anda tidak ingin terlalu banyak berusaha dalam menulis dan membaca pasangan nilai-kunci ini.

Apa cara yang disarankan untuk melakukan itu di Hadoop?

Apakah ada contoh (sederhana) yang menunjukkan cara menangani data perantara ini dengan cara yang benar, termasuk pembersihan sesudahnya?

Niels Basjes
sumber
2
menggunakan framework mapreduce yang mana?
Skaffman
1
Saya mengedit pertanyaan untuk mengklarifikasi yang saya bicarakan tentang Hadoop.
Niels Basjes
Saya akan merekomendasikan permata penggembala babi untuk ini: github.com/Ganglion/swineherd terbaik, Tobias
Tobias

Jawaban:

57

Saya rasa tutorial di jaringan pengembang Yahoo ini akan membantu Anda dalam hal ini: Merangkai Pekerjaan

Anda menggunakan JobClient.runJob(). Jalur keluaran data dari pekerjaan pertama menjadi jalur masukan ke pekerjaan kedua Anda. Ini perlu diteruskan sebagai argumen ke pekerjaan Anda dengan kode yang sesuai untuk menguraikannya dan menyiapkan parameter untuk pekerjaan itu.

Saya pikir metode di atas mungkin bagaimanapun cara API mapred yang sekarang lebih tua melakukannya, tetapi itu masih harus berfungsi. Akan ada metode serupa di API mapreduce baru tapi saya tidak yakin apa itu.

Sejauh menghapus data perantara setelah pekerjaan selesai, Anda dapat melakukan ini di kode Anda. Cara saya melakukannya sebelumnya menggunakan sesuatu seperti:

FileSystem.delete(Path f, boolean recursive);

Dimana path adalah lokasi pada data HDFS. Anda perlu memastikan bahwa Anda hanya menghapus data ini setelah tidak ada pekerjaan lain yang membutuhkannya.

Biner Nerd
sumber
3
Terima kasih untuk link ke tutorial Yahoo. The Chaining Jobs memang yang Anda inginkan jika keduanya berjalan dalam satu waktu. Yang saya cari adalah cara yang mudah dilakukan jika Anda ingin menjalankannya secara terpisah. Dalam tutorial yang disebutkan saya menemukan SequenceFileOutputFormat "Menulis file biner yang cocok untuk dibaca ke dalam pekerjaan MapReduce berikutnya" dan SequenceFileInputFormat yang cocok yang membuat semuanya sangat mudah dilakukan. Terima kasih.
Niels Basjes
20

Ada banyak cara untuk melakukannya.

(1) Pekerjaan bertingkat

Buat objek JobConf "job1" untuk pekerjaan pertama dan setel semua parameter dengan "input" sebagai direktori input dan "temp" sebagai direktori output. Jalankan pekerjaan ini:

JobClient.run(job1).

Tepat di bawahnya, buat objek JobConf "job2" untuk pekerjaan kedua dan setel semua parameter dengan "temp" sebagai direktori masukan dan "keluaran" sebagai direktori keluaran. Jalankan pekerjaan ini:

JobClient.run(job2).

(2) Buat dua objek JobConf dan setel semua parameter di dalamnya seperti (1) kecuali Anda tidak menggunakan JobClient.run.

Kemudian buat dua objek Job dengan jobconfs sebagai parameter:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

Dengan menggunakan objek jobControl, Anda menentukan dependensi pekerjaan, lalu menjalankan pekerjaan:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Jika Anda membutuhkan struktur seperti Map + | Kurangi | Map *, Anda dapat menggunakan kelas ChainMapper dan ChainReducer yang disertakan dengan Hadoop versi 0.19 dan seterusnya.

pengguna381928
sumber
7

Sebenarnya ada beberapa cara untuk melakukan ini. Saya akan fokus pada dua.

Salah satunya adalah melalui Riffle ( http://github.com/cwensel/riffle ) pustaka anotasi untuk mengidentifikasi hal-hal yang bergantung dan 'mengeksekusinya' dalam urutan ketergantungan (topologis).

Atau Anda dapat menggunakan Cascade (dan MapReduceFlow) di Cascading ( http://www.cascading.org/ ). Versi mendatang akan mendukung anotasi Riffle, tetapi berfungsi dengan baik sekarang dengan pekerjaan MR JobConf mentah.

Variannya adalah tidak mengelola tugas MR secara manual, tetapi mengembangkan aplikasi Anda menggunakan Cascading API. Kemudian JobConf dan rantai pekerjaan ditangani secara internal melalui perencana Cascading dan kelas Flow.

Dengan cara ini Anda menghabiskan waktu Anda untuk fokus pada masalah Anda, bukan pada mekanisme mengelola pekerjaan Hadoop dll. Anda bahkan dapat melapisi bahasa yang berbeda di atas (seperti clojure atau jruby) untuk lebih menyederhanakan pengembangan dan aplikasi Anda. http://www.cascading.org/modules.html

cwensel.dll
sumber
6

Saya telah melakukan rangkaian pekerjaan menggunakan objek JobConf satu demi satu. Saya mengambil contoh WordCount untuk merangkai pekerjaan. Satu pekerjaan menghitung berapa kali sebuah kata diulangi dalam keluaran yang diberikan. Pekerjaan kedua mengambil keluaran pekerjaan pertama sebagai masukan dan menghitung total kata dalam masukan yang diberikan. Di bawah ini adalah kode yang perlu ditempatkan di kelas Driver.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

Perintah untuk menjalankan pekerjaan ini adalah:

bin / toples hadoop TotalWords.

Kita perlu memberikan nama tugas akhir untuk perintah tersebut. Dalam kasus di atas, ini adalah TotalWords.

psrklr.dll
sumber
5

Anda dapat menjalankan rantai MR dengan cara seperti yang diberikan dalam kode.

HARAP DICATAT : Hanya kode driver yang telah disediakan

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

THE SEQUENCE IS

( JOB1 ) MAP-> REDUCE-> ( JOB2 ) MAP
Ini dilakukan untuk menyortir kunci namun ada lebih banyak cara seperti menggunakan peta hierarki.
Namun saya ingin memfokuskan perhatian Anda ke cara Pekerjaan telah dirantai! !
Terima kasih

Aniruddha Sinha
sumber
3

Kita dapat menggunakan waitForCompletion(true)metode Job untuk menentukan ketergantungan di antara pekerjaan.

Dalam skenario saya, saya memiliki 3 pekerjaan yang bergantung satu sama lain. Di kelas pengemudi saya menggunakan kode di bawah ini dan berfungsi seperti yang diharapkan.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
Shivaprasad
sumber
Jawaban Anda adalah tentang bagaimana bergabung dengan pekerjaan ini dalam hal pelaksanaan. Pertanyaan aslinya adalah tentang struktur data terbaik. Jadi jawaban Anda tidak relevan untuk pertanyaan khusus ini.
Niels Basjes
2

Kelas baru org.apache.hadoop.mapreduce.lib.chain.ChainMapper membantu skenario ini

Xavi
sumber
1
Jawabannya bagus - tetapi Anda harus menambahkan beberapa detail lebih lanjut tentang fungsinya atau setidaknya tautan ke referensi API sehingga orang dapat memilih
Jeremy Hajek
ChainMapper dan ChainReducer digunakan untuk memiliki 1 atau lebih pembuat peta sebelum Reduce dan 0 atau lebih pembuat peta setelah Reduce, spec. (Mapper +) Kurangi (Mapper *). Koreksi saya jika saya salah jelas tetapi saya tidak berpikir pendekatan ini menyelesaikan rangkaian pekerjaan secara serial seperti yang diminta OP.
oczkoisse
1

Meskipun ada mesin alur kerja Hadoop berbasis server yang kompleks misalnya, oozie, saya memiliki pustaka java sederhana yang memungkinkan eksekusi beberapa pekerjaan Hadoop sebagai alur kerja. Konfigurasi pekerjaan dan alur kerja yang menentukan ketergantungan antar pekerjaan dikonfigurasikan dalam file JSON. Semuanya dapat dikonfigurasi secara eksternal dan tidak memerlukan perubahan apa pun dalam peta yang ada, kurangi implementasi menjadi bagian dari alur kerja.

Detailnya dapat ditemukan disini. Kode sumber dan jar tersedia di github.

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

Pranab
sumber
1

Menurut saya, oozie membantu pekerjaan selanjutnya untuk menerima masukan langsung dari pekerjaan sebelumnya. Ini untuk menghindari operasi I / o yang dilakukan dengan kontrol kerja.

stholy
sumber
1

Jika Anda ingin merantai pekerjaan secara terprogram, Anda perlu menggunakan JobControl. Penggunaannya cukup sederhana:

JobControl jobControl = new JobControl(name);

Setelah itu Anda menambahkan instance ControlledJob. ControlledJob mendefinisikan pekerjaan dengan dependensinya, sehingga secara otomatis memasukkan input dan output agar sesuai dengan "rantai" pekerjaan.

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

memulai rantai. Anda akan ingin memasukkannya ke dalam utas yang lebih cepat. Ini memungkinkan untuk memeriksa status rantai Anda saat berjalan:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }
Erik Schmiegelow
sumber
0

Seperti yang telah Anda sebutkan dalam kebutuhan Anda bahwa Anda ingin output daya MRJob1 menjadi i / p MRJob2 dan seterusnya, Anda dapat mempertimbangkan untuk menggunakan alur kerja oozie untuk kasus penggunaan ini. Anda juga dapat mempertimbangkan untuk menulis data perantara Anda ke HDFS karena akan digunakan oleh MRJob berikutnya. Dan setelah pekerjaan selesai, Anda dapat membersihkan data perantara Anda.

<start to="mr-action1"/>
<action name="mr-action1">
   <!-- action for MRJob1-->
   <!-- set output path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="mr-action2">
   <!-- action for MRJob2-->
   <!-- set input path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="success">
        <!-- action for success-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="fail">
        <!-- action for fail-->
    <ok to="end"/>
    <error to="end"/>
</action>

<end name="end"/>

Neha Kumari
sumber