Saya ingin menggunakan Stream
untuk memparalelkan pemrosesan set heterogen file JSON yang disimpan dari nomor yang tidak dikenal (jumlah file tidak diketahui dimuka). Ukuran file dapat sangat bervariasi, mulai dari 1 catatan JSON per file hingga 100.000 catatan di beberapa file lainnya. Sebuah JSON catatan dalam hal ini berarti mandiri JSON objek direpresentasikan sebagai satu baris dalam file.
Saya benar-benar ingin menggunakan Streaming untuk ini dan jadi saya menerapkan ini Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Masalah yang saya alami adalah ketika Stream memparalelkan dengan indah pada awalnya, akhirnya file terbesar dibiarkan diproses dalam satu utas. Saya percaya penyebab proksimal didokumentasikan dengan baik: spliterator "tidak seimbang".
Lebih konkret, tampak bahwa trySplit
metode ini tidak dipanggil setelah titik tertentu dalam Stream.forEach
siklus hidup, sehingga logika ekstra untuk mendistribusikan batch kecil pada akhir trySplit
jarang dieksekusi.
Perhatikan bagaimana semua spliterator yang dikembalikan dari trySplit berbagi paths
iterator yang sama . Saya pikir ini adalah cara yang sangat pintar untuk menyeimbangkan pekerjaan di semua pembagi, tapi itu belum cukup untuk mencapai paralelisme penuh.
Saya ingin pemrosesan paralel untuk melanjutkan pertama di file, dan kemudian ketika beberapa file besar masih dibiarkan membelah, saya ingin memparalelkan antar potongan file yang tersisa. Itulah maksud dari else
blok di akhir trySplit
.
Apakah ada cara yang mudah / sederhana / kanonik untuk mengatasi masalah ini?
sumber
Long.MAX_VALUE
menyebabkan pembelahan yang berlebihan dan tidak perlu, sementara setiap perkiraan selainLong.MAX_VALUE
penyebab pembelahan selanjutnya berhenti, membunuh paralelisme. Mengembalikan campuran perkiraan yang akurat tampaknya tidak mengarah ke optimasi cerdas.AbstractSpliterator
tetapi mengesampingkantrySplit()
yang merupakan kombo yang buruk untuk apa pun selainLong.MAX_VALUE
, karena Anda tidak mengadaptasi perkiraan ukuran ditrySplit()
. Setelah itutrySplit()
, estimasi ukuran harus dikurangi dengan jumlah elemen yang telah dipisahkan.Jawaban:
trySplit
Keluaran Anda harus berukuran sama, terlepas dari ukuran file yang mendasarinya. Anda harus memperlakukan semua file sebagai satu unit dan mengisiArrayList
spliterator yang dikembalikan dengan jumlah objek JSON yang sama setiap kali. Jumlah objek harus sedemikian rupa sehingga pemrosesan satu split membutuhkan waktu antara 1 dan 10 milidetik: lebih rendah dari 1 ms dan Anda mulai mendekati biaya menyerahkan batch ke thread pekerja, lebih tinggi dari itu dan Anda mulai berisiko beban CPU yang tidak merata karena tugas-tugas yang terlalu kasar.Pemisah tidak wajib melaporkan perkiraan ukuran, dan Anda sudah melakukan ini dengan benar: perkiraan Anda adalah
Long.MAX_VALUE
, yang merupakan nilai khusus yang berarti "tidak terikat". Namun, jika Anda memiliki banyak file dengan objek JSON tunggal, menghasilkan kumpulan ukuran 1, ini akan merusak kinerja Anda dalam dua cara: overhead pembukaan-membaca-menutup file mungkin menjadi hambatan dan, jika Anda berhasil melarikan diri bahwa, biaya handoff benang mungkin signifikan dibandingkan dengan biaya pemrosesan satu item, sekali lagi menyebabkan kemacetan.Lima tahun yang lalu saya memecahkan masalah yang sama, Anda dapat melihat solusi saya .
sumber
Long.MAX_VALUE
dengan benar menggambarkan ukuran yang tidak diketahui, tetapi itu tidak membantu ketika implementasi Stream yang sebenarnya berkinerja buruk saat itu. Bahkan menggunakan hasilThreadLocalRandom.current().nextInt(100, 100_000)
estimasi ukuran menghasilkan hasil yang lebih baik.ArraySpliterator
yang memiliki ukuran yang diperkirakan (bahkan ukuran yang tepat). Jadi implementasi Stream akan melihat ukuran array vsLong.MAX_VALUE
, pertimbangkan ini tidak seimbang dan pisahkan spliterator "lebih besar" (mengabaikan ituLong.MAX_VALUE
berarti "tidak diketahui"), sampai tidak dapat membelah lebih lanjut. Kemudian, jika tidak ada cukup potongan, ia akan membagi spliterator berbasis array menggunakan ukuran yang diketahui. Ya, ini bekerja dengan sangat baik, tetapi tidak bertentangan dengan pernyataan saya bahwa Anda memerlukan perkiraan ukuran, terlepas dari seberapa buruknya.Long.MAX_VALUE
akan dilakukan.Setelah banyak percobaan, saya masih tidak bisa mendapatkan paralelisme tambahan dengan bermain dengan perkiraan ukuran. Pada dasarnya, nilai apa pun selain
Long.MAX_VALUE
akan cenderung menyebabkan spliterator berakhir terlalu dini (dan tanpa pemisahan apa pun), sementara di sisi lainLong.MAX_VALUE
perkiraan akan menyebabkantrySplit
dipanggil tanpa henti hingga kembalinull
.Solusi yang saya temukan adalah berbagi sumber daya secara internal di antara para pembagi dan membiarkan mereka menyeimbangkan kembali di antara mereka sendiri.
Kode kerja:
sumber