Apache Spark: map vs mapPartitions?

133

Apa perbedaan antara RDD map dan mapPartitionsmetode? Dan apakah flatMapberperilaku suka mapatau suka mapPartitions? Terima kasih.

(Sunting) yaitu apa perbedaan (baik secara semantik atau dalam hal eksekusi) antara

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

Dan:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicholas White
sumber
3
Setelah membaca jawaban di bawah ini, Anda dapat melihat [pengalaman ini] dibagikan oleh seseorang yang benar-benar menggunakannya. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 / ...
Abhidemon

Jawaban:

121

Apa perbedaan antara peta RDD dan metode mapPartitions?

Peta metode mengubah setiap elemen dari RDD sumber menjadi elemen tunggal dari hasil RDD dengan menerapkan fungsi. mapPartitions mengubah setiap partisi RDD sumber menjadi beberapa elemen dari hasil (mungkin tidak ada).

Dan apakah flatMap berperilaku seperti map atau like mapPartitions?

Baik flatMap bekerja pada elemen tunggal (as map) dan menghasilkan banyak elemen dari hasilnya (as mapPartitions).

Alexey Romanov
sumber
3
Terima kasih - apakah peta menyebabkan shuffles (atau mengubah jumlah partisi)? Apakah ini memindahkan data antar node? Saya telah menggunakan mapPartitions untuk menghindari pemindahan data antar node, tetapi tidak yakin apakah flapMap akan melakukannya.
Nicholas White
Jika Anda melihat sumbernya - github.com/apache/incubator-spark/blob/… dan github.com/apache/incubator-spark/blob/… - keduanya mapdan flatMapmemiliki partisi yang persis sama dengan induknya.
Alexey Romanov
13
Sebagai catatan, presentasi yang diberikan oleh pembicara di San Francisco Spark Summit 2013 (goo.gl/JZXDCR) menyoroti bahwa tugas dengan overhead per-rekor tinggi berkinerja lebih baik dengan mapPartition daripada dengan transformasi peta. Ini, menurut presentasi, karena tingginya biaya pengaturan tugas baru.
Mikel Urkia
1
Saya melihat yang sebaliknya - bahkan dengan operasi yang sangat kecil, lebih cepat untuk memanggil mapPartitions dan iterate daripada call map. Saya berasumsi bahwa ini hanyalah overhead untuk memulai mesin bahasa yang akan memproses tugas peta. (Saya di R, yang mungkin memiliki overhead startup lebih banyak.) Jika Anda akan melakukan beberapa operasi, maka mapPartitions tampaknya sedikit lebih cepat - saya berasumsi ini karena membaca RDD hanya sekali. Bahkan jika RDD di-cache dalam RAM, itu menghemat banyak overhead dari konversi tipe.
Bob
3
mappada dasarnya mengambil fungsi Anda f, dan meneruskannya ke iter.map(f). Jadi pada dasarnya ini adalah metode kenyamanan yang membungkus mapPartitions. Saya akan terkejut jika ada keuntungan kinerja baik cara untuk pekerjaan transformasi gaya peta murni (yaitu di mana fungsinya identik), jika Anda perlu membuat beberapa objek untuk diproses, jika objek ini dapat dibagikan maka mapPartitionsakan menguntungkan.
NightWolf
129

Imp. TIP:

Setiap kali Anda memiliki inisialisasi kelas berat yang harus dilakukan satu kali untuk banyak RDDelemen daripada satu kali per RDDelemen, dan jika inisialisasi ini, seperti pembuatan objek dari perpustakaan pihak ketiga, tidak dapat diserialisasi (sehingga Spark dapat mengirimkannya melintasi gugus ke node pekerja), gunakan mapPartitions()sebagai ganti map(). mapPartitions()menyediakan inisialisasi yang harus dilakukan sekali per tugas pekerja / utas / partisi bukannya sekali per RDDelemen data misalnya: lihat di bawah.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. apakah flatMapberperilaku seperti peta atau suka mapPartitions?

Iya. silakan lihat contoh 2 dari flatmap.. penjelasannya sendiri.

Q1. Apa perbedaan antara RDD mapdanmapPartitions

mapberfungsi dengan fungsi yang digunakan pada level per elemen sambil mapPartitionsmenjalankan fungsi di level partisi.

Skenario Contoh : jika kita memiliki elemen 100K diRDDpartisitertentumaka kita akan menjalankan fungsi yang digunakan oleh transformasi pemetaan 100K kali ketika kita gunakanmap.

Sebaliknya, jika kita menggunakan mapPartitionsmaka kita hanya akan memanggil fungsi tertentu satu kali, tetapi kita akan melewati semua catatan 100 ribu dan mendapatkan kembali semua tanggapan dalam satu panggilan fungsi.

Akan ada perolehan kinerja karena mapbekerja pada fungsi tertentu berkali-kali, terutama jika fungsi tersebut melakukan sesuatu yang mahal setiap kali itu tidak perlu dilakukan jika kita melewati semua elemen sekaligus (dalam kasus mappartitions).

peta

Menerapkan fungsi transformasi pada setiap item RDD dan mengembalikan hasilnya sebagai RDD baru.

Daftar Varian

def map [U: ClassTag] (f: T => U): RDD [U]

Contoh:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Ini adalah peta khusus yang hanya dipanggil sekali untuk setiap partisi. Seluruh konten partisi masing-masing tersedia sebagai aliran nilai berurutan melalui argumen input (Iterarator [T]). Fungsi kustom harus mengembalikan Iterator [U] lainnya. Iterator hasil gabungan secara otomatis dikonversi menjadi RDD baru. Harap dicatat, bahwa tupel (3,4) dan (6,7) hilang dari hasil berikut karena partisi yang kami pilih.

preservesPartitioningmenunjukkan apakah fungsi input mempertahankan partisi, yang seharusnya falsekecuali ini adalah RDD pasangan dan fungsi input tidak mengubah kunci.

Daftar Varian

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], preservesPartitioning: Boolean = false): RDD [U]

Contoh 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Contoh 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Program di atas juga dapat ditulis menggunakan flatMap sebagai berikut.

Contoh 2 menggunakan flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Kesimpulan:

mapPartitionstransformasi lebih cepat daripada mapkarena memanggil fungsi Anda sekali / partisi, bukan sekali / elemen ..

Bacaan lebih lanjut: foreach Vs foreachPartitions Kapan menggunakan What?

Ram Ghadiyaram
sumber
4
Saya tahu Anda dapat menggunakan mapatau mapPartitionsuntuk mencapai hasil yang sama (lihat dua contoh dalam pertanyaan); pertanyaan ini adalah tentang mengapa Anda memilih satu cara dari yang lain. Komentar di jawaban lain sangat berguna! Juga, Anda tidak menyebutkan bahwa mapdan flatMaplulus falseuntuk preservesPartitioning, dan apa implikasi dari yang.
Nicholas White
2
fungsi dieksekusi setiap kali versus fungsi dieksekusi sekali untuk parisi adalah tautan yang saya lewatkan. Memiliki akses ke lebih dari satu catatan data pada suatu waktu dengan mapPartition adalah hal yang sangat berharga. hargai jawabannya
Titik Koma dan Lakban
1
Apakah ada skenario di mana maplebih baik daripada mapPartitions? Jika mapPartitionsbegitu bagus, mengapa implementasi peta tidak standar?
ruhong
1
@oneleggedmule: keduanya untuk persyaratan yang berbeda yang harus kita gunakan dengan bijak jika Anda membuat sumber daya seperti koneksi db (seperti ditunjukkan pada contoh di atas) yang mahal maka partisi adalah pendekatan yang tepat karena satu koneksi per partisi. juga saveAsTextFile mappartitions yang digunakan secara internal lihat
Ram Ghadiyaram
@oneleggedmule Dari sudut pandang saya, map () lebih mudah dipahami dan dipelajari, dan juga merupakan metode umum dari banyak bahasa yang berbeda. Mungkin lebih mudah digunakan daripada mapPartitions () jika seseorang tidak terbiasa dengan metode spesifik Spark ini di awal. Jika tidak ada perbedaan kinerja maka saya lebih suka menggunakan peta ().
Raymond Chen
15

Peta :

  1. Ini memproses satu baris pada satu waktu, sangat mirip dengan metode map () dari MapReduce.
  2. Anda kembali dari transformasi setelah setiap baris.

MapPartitions

  1. Ini memproses partisi lengkap dalam sekali jalan.
  2. Anda dapat kembali dari fungsi hanya sekali setelah memproses seluruh partisi.
  3. Semua hasil antara harus disimpan dalam memori sampai Anda memproses seluruh partisi.
  4. Memberi Anda fungsi pengaturan () peta () dan pembersihan () di MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
sumber
mengenai 2 - jika Anda melakukan transformasi iterator-ke-iterator, dan tidak mematerialisasikan iterator ke kumpulan semacam, Anda tidak harus memegang seluruh partisi dalam memori, pada kenyataannya, dengan cara itu percikan akan dapat tumpah bagian partisi ke disk.
ilcord
4
Anda tidak harus menahan seluruh partisi dalam memori, tetapi hasilnya. Anda tidak dapat mengembalikan hasilnya sampai Anda telah memproses seluruh partisi
KrazyGautam