Menghindari kebocoran memori dengan Scalaz 7 zipWithIndex / group enumeratees

106

Latar Belakang

Seperti disebutkan dalam pertanyaan ini , saya menggunakan Scalaz 7 iterasi untuk memproses aliran data yang besar (yaitu, tak terbatas) dalam ruang heap konstan.

Kode saya terlihat seperti ini:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Masalah

Sepertinya saya mengalami kebocoran memori, tetapi saya tidak cukup paham dengan Scalaz / FP untuk mengetahui apakah bug tersebut ada di Scalaz atau di kode saya. Secara intuitif, saya berharap kode ini hanya membutuhkan (dengan urutan) P dikalikan dengan Chunkspasi -size.

Catatan: Saya menemukan pertanyaan serupa yang OutOfMemoryErrorditemui, tetapi kode saya tidak digunakan consume.

Menguji

Saya menjalankan beberapa tes untuk mencoba dan mengisolasi masalahnya. Untuk meringkas, kebocoran hanya muncul muncul ketika kedua zipWithIndexdan groupdigunakan.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Kode untuk tes:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Pertanyaan

  • Apakah bug ada di kode saya?
  • Bagaimana cara membuat ini berfungsi dalam ruang heap konstan?
Aaron Novstrup
sumber
6
Saya akhirnya melaporkan ini sebagai masalah di Scalaz .
Aaron Novstrup
1
Ini tidak akan menyenangkan, tetapi Anda dapat mencoba -XX:+HeapDumpOnOutOfMemoryErrordan menganalisis dump dengan eclipse MAT eclipse.org/mat untuk melihat baris kode apa yang dipegang pada array.
huynhjl
10
@huynhjl FWIW, saya mencoba menganalisis heap dengan JProfiler dan MAT tetapi sama sekali tidak dapat menjelajahi semua referensi ke kelas fungsi anonim, dll. Scala benar-benar membutuhkan alat khusus untuk hal semacam ini.
Aaron Novstrup
Bagaimana jika tidak ada kebocoran dan hanya saja apa yang Anda lakukan membutuhkan jumlah memori yang sangat meningkat? Anda dapat dengan mudah mereplikasi zipWithIndex tanpa konstruksi FP tertentu dengan hanya mempertahankan varpenghitung saat Anda pergi.
Ezekiel Victor
@EzekielVictor Saya tidak yakin saya mengerti komentarnya. Anda menyarankan bahwa menambahkan satu Longindeks per potongan akan mengubah algoritme dari ruang heap konstan menjadi non-konstan? Versi non-zip jelas menggunakan ruang heap yang konstan, karena dapat "memproses" sebanyak mungkin potongan yang ingin Anda tunggu.
Aaron Novstrup

Jawaban:

4

Ini akan menjadi sedikit penghiburan bagi siapa pun yang terjebak dengan iterateeAPI yang lebih lama , tetapi saya baru-baru ini memverifikasi bahwa tes yang setara lolos terhadap API aliran-scalaz . Ini adalah API pemrosesan aliran baru yang dimaksudkan untuk menggantikan iteratee.

Untuk kelengkapannya, berikut kode tesnya:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Ini harus bekerja dengan nilai apa pun untuk nparameter (asalkan Anda bersedia menunggu cukup lama) - Saya menguji dengan 2 ^ 14 32MiB array (yaitu, total setengah TiB memori yang dialokasikan dari waktu ke waktu).

Aaron Novstrup
sumber