Perbedaan antara reduce dan foldLeft / fold dalam pemrograman fungsional (terutama Scala dan Scala API)?

Jawaban:

260

mengurangi vs foldLeft

Perbedaan besar yang besar, tidak disebutkan dalam jawaban stackoverflow lain yang berkaitan dengan topik ini dengan jelas, adalah bahwa reduceharus diberi monoid komutatif , yaitu operasi yang bersifat komutatif dan asosiatif. Artinya, pengoperasiannya bisa diparalelkan.

Perbedaan ini sangat penting untuk komputasi Big Data / MPP / terdistribusi, dan seluruh alasan mengapa reduceada. Koleksi dapat dipotong-potong dan reducedapat dioperasikan pada setiap potongan, kemudian reducedapat beroperasi pada hasil dari setiap potongan - pada kenyataannya, tingkat potongan tidak perlu berhenti sedalam satu tingkat. Kami juga bisa memotong setiap bagian. Inilah sebabnya mengapa menjumlahkan bilangan bulat dalam daftar adalah O (log N) jika diberi jumlah CPU yang tak terbatas.

Jika Anda hanya melihat tanda tangan, tidak ada alasan untuk reduceada karena Anda dapat mencapai semua yang Anda bisa reducedengan file foldLeft. Fungsionalitas dari foldLeftlebih besar dari pada fungsionalitas reduce.

Tetapi Anda tidak dapat memparalelkan a foldLeft, jadi waktu prosesnya selalu O (N) (meskipun Anda memasukkan monoid komutatif). Ini karena diasumsikan bahwa operasi tersebut bukan monoid komutatif sehingga nilai kumulatif akan dihitung oleh serangkaian agregasi berurutan.

foldLefttidak mengasumsikan komutatifitas atau asosiatif. Keterkaitanlah yang memberikan kemampuan untuk memotong koleksi, dan sifat komutatif yang membuat pengumpulan menjadi mudah karena urutan tidak penting (jadi tidak masalah urutan mana yang mengumpulkan setiap hasil dari setiap potongan). Sebenarnya, komutatifitas tidak diperlukan untuk paralelisasi, misalnya algoritme pengurutan terdistribusi, ini hanya membuat logika lebih mudah karena Anda tidak perlu mengatur potongan Anda.

Jika Anda melihat dokumentasi Spark untuk reduceitu secara khusus mengatakan "... operator biner komutatif dan asosiatif"

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Berikut adalah bukti bahwa reduceBUKAN hanya kasus khususfoldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

kurangi vs lipat

Sekarang di sinilah ia menjadi sedikit lebih dekat ke FP / akar matematika, dan sedikit lebih rumit untuk dijelaskan. Mengurangi didefinisikan secara formal sebagai bagian dari paradigma MapReduce, yang berhubungan dengan koleksi tak beraturan (multisets), Fold secara resmi didefinisikan dalam istilah rekursi (lihat katamorfisme) dan dengan demikian mengasumsikan struktur / urutan ke koleksi.

Tidak ada foldmetode dalam Scalding karena di bawah model pemrograman Map Reduce (ketat) kita tidak dapat mendefinisikan foldkarena potongan tidak memiliki urutan dan foldhanya membutuhkan asosiativitas, bukan komutatif.

Sederhananya, reducebekerja tanpa urutan kumulasi, foldmembutuhkan urutan kumulasi dan urutan kumulasi itulah yang mengharuskan nilai nol BUKAN keberadaan nilai nol yang membedakannya. Sebenarnya reduce harus bekerja pada koleksi kosong, karena nilai nolnya dapat disimpulkan dengan mengambil nilai arbitrer xdan kemudian menyelesaikannya x op y = x, tetapi itu tidak berfungsi dengan operasi non-komutatif karena mungkin ada nilai nol kiri dan kanan yang berbeda. (yaitu x op y != y op x). Tentu saja Scala tidak repot-repot mencari tahu apa nilai nol ini karena itu memerlukan melakukan beberapa matematika (yang mungkin tidak dapat dihitung), jadi lemparkan saja pengecualian.

Tampaknya (seperti yang sering terjadi dalam etimologi) bahwa makna matematika asli ini telah hilang, karena satu-satunya perbedaan yang jelas dalam pemrograman adalah tanda tangannya. Hasilnya adalah yang reducetelah menjadi sinonim untuk fold, daripada mempertahankan makna aslinya dari MapReduce. Sekarang istilah ini sering digunakan secara bergantian dan berperilaku sama di sebagian besar implementasi (mengabaikan koleksi kosong). Keanehan diperburuk oleh keanehan, seperti di Spark, yang sekarang akan kita bahas.

Jadi Spark memang memiliki a fold, tetapi urutan hasil sub (satu untuk setiap partisi) digabungkan (pada saat penulisan) adalah urutan yang sama di mana tugas diselesaikan - dan dengan demikian non-deterministik. Terima kasih kepada @CafeFeed untuk menunjukkan foldkegunaannya runJob, yang setelah membaca kode saya menyadari bahwa itu non-deterministik. Kebingungan lebih lanjut dibuat oleh Spark yang memiliki treeReducetapi tidak treeFold.

Kesimpulan

Ada perbedaan antara reducedan foldbahkan saat diterapkan ke urutan yang tidak kosong. Yang pertama didefinisikan sebagai bagian dari paradigma pemrograman MapReduce pada koleksi dengan urutan sewenang-wenang ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) dan orang harus menganggap operator komutatif selain menjadi asosiatif untuk memberikan hasil deterministik. Yang terakhir didefinisikan dalam istilah katomorfisme dan mengharuskan koleksi memiliki gagasan tentang urutan (atau didefinisikan secara rekursif, seperti daftar tertaut), sehingga tidak memerlukan operator komutatif.

Dalam prakteknya karena sifat pemrograman yang tidak matematis, reducedan foldcenderung berperilaku dengan cara yang sama, baik dengan benar (seperti di Scala) atau salah (seperti di Spark).

Ekstra: Opini Saya Tentang Spark API

Pendapat saya adalah bahwa kebingungan akan dihindari jika penggunaan istilah foldtersebut benar-benar dihentikan di Spark. Setidaknya percikan memiliki catatan dalam dokumentasinya:

Ini berperilaku agak berbeda dari operasi lipatan yang diimplementasikan untuk koleksi yang tidak terdistribusi dalam bahasa fungsional seperti Scala.

samthebest
sumber
2
Itu sebabnya foldLeftmengandung Leftdalam namanya dan mengapa ada juga metode yang disebut fold.
kiritsuku
1
@Cloudtech Itu adalah kebetulan dari penerapan single threadnya, bukan dalam spesifikasinya. Pada mesin 4-inti saya, jika saya mencoba menambahkan .par, maka (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)saya mendapatkan hasil yang berbeda setiap kali.
Samthebest
2
@AlexDean dalam konteks ilmu komputer, tidak benar-benar membutuhkan identitas karena koleksi kosong cenderung hanya membuang pengecualian. Tapi secara matematis lebih elegan (dan akan lebih elegan jika koleksi melakukan ini) jika elemen identitas dikembalikan saat koleksi kosong. Dalam matematika, "melempar pengecualian" tidak ada.
samthebest
3
@samthebest: Apakah Anda yakin tentang komutatifitasnya? github.com/apache/spark/blob/… mengatakan "Untuk fungsi yang tidak komutatif, hasilnya mungkin berbeda dari lipatan yang diterapkan ke koleksi yang tidak terdistribusi."
Buat 42
1
@ Make42 Benar, seseorang dapat menulis reallyFoldmucikari mereka sendiri , karena :, rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)ini tidak perlu f untuk bolak-balik.
samthebest
10

Jika saya tidak salah, meskipun Spark API tidak memerlukannya, lipat juga membutuhkan f untuk menjadi komutatif. Karena urutan sekumpulan partisi tidak dijamin. Misalnya pada kode berikut hanya cetakan pertama yang diurutkan:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Cetak:

abcdefghijklmnopqrstuvwxyz

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

Mishael Rosenthal
sumber
Setelah beberapa kali bolak-balik, kami yakin Anda benar. Urutan penggabungan adalah first come first serve. Jika Anda menjalankan sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)dengan 2+ core beberapa kali, saya pikir Anda akan melihatnya menghasilkan urutan acak (berdasarkan partisi). Saya telah memperbarui jawaban saya.
Samthebest
3

folddi Apache Spark tidak sama dengan foldkoleksi yang tidak didistribusikan. Sebenarnya dibutuhkan fungsi komutatif untuk menghasilkan hasil deterministik:

Ini berperilaku agak berbeda dari operasi lipatan yang diimplementasikan untuk koleksi yang tidak terdistribusi dalam bahasa fungsional seperti Scala. Operasi lipatan ini dapat diterapkan ke partisi satu per satu, lalu melipat hasil tersebut menjadi hasil akhir, daripada menerapkan lipatan ke setiap elemen secara berurutan dalam beberapa urutan yang ditentukan. Untuk fungsi yang tidak komutatif, hasilnya mungkin berbeda dari lipatan yang diterapkan ke koleksi yang tidak terdistribusi.

Ini telah ditunjukkan oleh Mishael Rosenthal dan disarankan oleh Make42 dalam komentarnya .

Disarankan bahwa perilaku yang diamati terkait dengan HashPartitionersaat sebenarnya parallelizetidak mengocok dan tidak digunakan HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Dijelaskan:

Strukturfold untuk RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

sama dengan strukturreduce untuk RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

di mana runJobdilakukan dengan mengabaikan urutan partisi dan mengakibatkan kebutuhan fungsi komutatif.

foldPartitiondan reducePartitionsetara dalam hal urutan pemrosesan dan secara efektif (dengan pewarisan dan pendelegasian) dilaksanakan oleh reduceLeftdan foldLeftseterusnya TraversableOnce.

Kesimpulan: foldpada RDD tidak dapat bergantung pada urutan potongan dan kebutuhan komutatif dan asosiatif .

Pengguna 2 revs6022341
sumber
Saya harus mengakui bahwa etimologinya membingungkan dan literatur pemrograman kurang dalam definisi formal. Saya pikir aman untuk mengatakan bahwa foldpada RDDs memang benar-benar sama dengan reduce, tetapi ini tidak menghormati perbedaan matematika akar (saya telah memperbarui jawaban saya untuk menjadi lebih jelas). Meskipun saya tidak setuju bahwa kami benar-benar membutuhkan peralihan asalkan seseorang yakin apa pun yang dilakukan mitra mereka, itu menjaga ketertiban.
samthebest
Urutan lipatan yang tidak ditentukan tidak terkait dengan partisi. Ini adalah konsekuensi langsung dari implementasi runJob.
AH! Maaf saya tidak tahu apa maksud Anda, tetapi setelah membaca runJobkode saya melihat bahwa memang itu menggabungkan sesuai dengan kapan tugas selesai, BUKAN urutan partisi. Detail kunci inilah yang membuat segalanya jatuh pada tempatnya. Saya telah mengedit jawaban saya lagi dan dengan demikian mengoreksi kesalahan yang Anda tunjukkan. Tolong bisakah Anda menghapus bounty Anda karena kita sekarang setuju?
samthebest
Saya tidak dapat mengedit atau menghapus - tidak ada opsi seperti itu. Saya dapat memberikan penghargaan tetapi saya pikir Anda mendapatkan beberapa poin dari perhatian saja, apakah saya salah? Jika Anda mengonfirmasi bahwa Anda ingin saya memberi penghargaan, saya akan melakukannya dalam 24 jam ke depan. Terima kasih atas koreksi dan maaf untuk metodenya tetapi sepertinya Anda mengabaikan semua peringatan, ini adalah hal yang besar, dan jawaban telah dikutip di semua tempat.
1
Bagaimana kalau Anda memberikannya kepada @Mishael Rosenthal karena dia adalah orang pertama yang dengan jelas menyatakan kekhawatirannya. Saya tidak tertarik dengan poin-poin itu, saya hanya suka menggunakan SO untuk SEO dan organisasi.
Samthebest
2

Satu perbedaan lain untuk Scalding adalah penggunaan combiners di Hadoop.

Bayangkan operasi Anda bersifat monoid komutatif, dengan mengurangi itu akan diterapkan di sisi peta juga daripada mengocok / menyortir semua data ke pereduksi. Dengan foldLeft ini tidak terjadi.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

Itu selalu merupakan praktik yang baik untuk mendefinisikan operasi Anda sebagai monoid di Scalding.

morazow
sumber