Spark, secara optimal membagi RDD tunggal menjadi dua

10

Saya memiliki dataset besar yang perlu saya bagi menjadi beberapa kelompok sesuai dengan parameter tertentu. Saya ingin pekerjaan diproses seefisien mungkin. Saya dapat membayangkan dua cara untuk melakukannya

Opsi 1 - Buat peta dari RDD asli dan filter

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Opsi 2 - Saring RDD asli secara langsung

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Metode tinju harus mengulangi semua catatan dari data asli yang ditetapkan 3 kali, di mana yang kedua hanya harus melakukannya dua kali, dalam keadaan normal, namun, percikan melakukan beberapa di belakang layar grafik bangunan, jadi saya bisa membayangkan bahwa mereka efektif dilakukan dengan cara yang sama. Pertanyaan saya adalah: a.) Apakah satu metode lebih efisien daripada yang lain, atau apakah bangunan grafik percikan membuatnya setara b.) Apakah mungkin untuk melakukan split ini dalam satu pass tunggal

Jagartner
sumber
Saya juga menemukan diri saya dengan masalah yang sangat mirip, dan tidak benar-benar menemukan solusi. Tetapi apa yang sebenarnya terjadi tidak jelas dari kode ini, karena percikan memiliki 'evaluasi malas' dan seharusnya hanya mampu mengeksekusi apa yang benar-benar perlu dijalankan, dan juga menggabungkan peta, filter, dan apa pun yang dapat dilakukan bersama. Jadi mungkin apa yang Anda gambarkan dapat terjadi dalam satu kali lintasan. Namun, tidak cukup akrab dengan mekanisme evaluasi malas untuk diceritakan. Sebenarnya saya hanya memperhatikan .cache (). Mungkin ada cara hanya melakukan satu .cache () dan mendapatkan hasil lengkap?
user3780968

Jawaban:

9

Pertama-tama izinkan saya memberi tahu Anda bahwa saya bukan ahli Spark; Saya telah menggunakannya cukup banyak dalam beberapa bulan terakhir, dan saya percaya saya sekarang memahaminya, tetapi saya mungkin salah.

Jadi, jawablah pertanyaan Anda:

a.) mereka setara, tetapi tidak dalam cara Anda melihatnya; Spark tidak akan mengoptimalkan grafik jika Anda bertanya-tanya, tetapi percikan customMappermasih akan dieksekusi dua kali dalam kedua kasus; ini disebabkan oleh fakta bahwa untuk percikan, rdd1dan rdd2dua RDD yang sama sekali berbeda, dan itu akan membangun grafik transformasi dari bawah ke atas mulai dari daun; jadi Opsi 1 akan diterjemahkan ke:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Seperti yang Anda katakan, customMapperdijalankan dua kali (apalagi, juga rddInakan dibaca dua kali, yang berarti bahwa jika itu berasal dari database, mungkin akan lebih lambat).

b.) ada cara, Anda hanya perlu bergerak cache()di tempat yang tepat:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Dengan melakukan ini, kami memberitahu percikan bahwa itu dapat menyimpan hasil sebagian dari mappedRdd; kemudian akan menggunakan hasil parsial ini untuk rdd1dan rdd2. Dari sudut pandang percikan ini setara dengan:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
sumber