Java parallel stream - urutan pemanggilan metode parallel () [ditutup]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Ketika saya menulis ini, saya berasumsi bahwa utas akan memunculkan hanya panggilan peta karena paralel ditempatkan setelah peta. Tetapi beberapa baris dalam file mendapatkan nomor catatan yang berbeda untuk setiap eksekusi.

Saya membaca dokumentasi stream Java resmi dan beberapa situs web untuk memahami bagaimana stream bekerja di bawah tenda.

Beberapa pertanyaan:

  • Java parallel stream bekerja berdasarkan SplitIterator , yang diimplementasikan oleh setiap koleksi seperti ArrayList, LinkedList dll. Ketika kita membangun aliran paralel dari koleksi-koleksi itu, iterator split yang sesuai akan digunakan untuk membagi dan mengulangi koleksi. Ini menjelaskan mengapa paralelisme terjadi pada tingkat sumber input asli (File lines) bukan pada hasil peta (yaitu Rekam pojo). Apakah pemahaman saya benar?

  • Dalam kasus saya, inputnya adalah aliran file IO. Iterator split mana yang akan digunakan?

  • Tidak masalah di mana kita menempatkan parallel()pipa. Sumber input asli akan selalu dipecah dan operasi perantara yang tersisa akan diterapkan.

    Dalam hal ini, Java seharusnya tidak mengizinkan pengguna untuk menempatkan operasi paralel di mana saja dalam pipa kecuali pada sumber aslinya. Karena, ini memberikan pemahaman yang salah bagi mereka yang tidak tahu bagaimana java stream bekerja secara internal. Saya tahu parallel()operasi akan ditentukan untuk tipe objek Stream dan karenanya, ia bekerja dengan cara ini. Tetapi, lebih baik memberikan beberapa solusi alternatif.

  • Dalam cuplikan kode di atas, saya mencoba menambahkan nomor baris ke setiap catatan dalam file input dan karenanya harus dipesan. Namun, saya ingin menerapkannya doSomeOperation()secara paralel karena ini adalah logika yang berat. Satu-satunya cara untuk mencapai adalah menulis iterator split saya sendiri. Apakah ada cara lain?

penjelajah
sumber
2
Ini lebih berkaitan dengan bagaimana pembuat Java memutuskan untuk mendesain antarmuka. Anda menempatkan permintaan Anda ke saluran pipa dan segala sesuatu yang bukan merupakan operasi final akan dikumpulkan terlebih dahulu. parallel()tidak lebih dari permintaan pengubah umum yang diterapkan ke objek aliran yang mendasarinya. Ingat bahwa hanya ada satu sumber-aliran jika Anda tidak menerapkan operasi akhir ke pipa, yaitu selama tidak ada yang "dieksekusi". Karena itu, Anda pada dasarnya hanya mempertanyakan pilihan desain Java. Yang berdasarkan opini dan kami tidak bisa membantu dengan itu.
Zabuzard
1
Saya benar-benar mengerti maksud dan kebingungan Anda tetapi saya tidak berpikir bahwa ada solusi yang jauh lebih baik. Metode ini ditawarkan dalam Streamantarmuka secara langsung dan karena cascading yang bagus setiap operasi memberikan kembali Streamlagi. Bayangkan seseorang ingin memberi Anda Streamtetapi sudah menerapkan beberapa operasi seperti mapitu. Anda, sebagai pengguna, masih ingin dapat memutuskan apakah akan menjalankannya secara paralel atau tidak. Jadi Anda harus parallel()tetap menelepon , meskipun streaming sudah ada.
Zabuzard
1
Selain itu, saya lebih suka mempertanyakan mengapa Anda ingin menjalankan bagian dari aliran secara berurutan dan kemudian, beralih ke paralel. Jika aliran sudah cukup besar untuk memenuhi syarat untuk eksekusi paralel, maka ini mungkin juga berlaku untuk semua yang sebelumnya dalam pipa. Jadi mengapa tidak menggunakan eksekusi paralel untuk bagian itu juga? Saya mendapatkan bahwa ada kasus tepi seperti jika Anda secara dramatis meningkatkan ukuran dengan flatMapatau jika Anda menjalankan metode thread-tidak aman atau serupa.
Zabuzard
1
@Zabuza Saya tidak mempersoalkan pilihan desain java tetapi saya hanya menyampaikan kekhawatiran saya. Setiap pengguna java stream dasar bisa mendapatkan kebingungan yang sama kecuali mereka memahami cara kerja stream. Saya sangat setuju dengan komentar kedua Anda. Saya baru saja menyoroti satu kemungkinan solusi yang dapat memiliki kelemahannya sendiri seperti yang telah Anda sebutkan. Tapi, kita bisa melihat apakah itu bisa diselesaikan dengan cara lain. Mengenai komentar Anda yang ke-3, saya telah menyebutkan kasus penggunaan saya pada poin terakhir dari uraian saya
explorer
1
@Eugene ketika Pathada pada sistem file lokal dan Anda menggunakan JDK baru-baru ini, spliterator akan memiliki kemampuan pemrosesan paralel yang lebih baik daripada kumpulan batch 1024. Tetapi pemisahan yang seimbang mungkin bahkan kontraproduktif dalam beberapa findFirstskenario ...
Holger

Jawaban:

8

Ini menjelaskan mengapa paralelisme terjadi pada tingkat sumber input asli (File lines) bukan pada hasil peta (yaitu Rekam pojo).

Seluruh aliran bersifat paralel atau berurutan. Kami tidak memilih subset operasi untuk dijalankan secara berurutan atau paralel.

Ketika operasi terminal dimulai, pipa aliran dijalankan secara berurutan atau paralel tergantung pada orientasi aliran yang digunakannya. [...] Ketika operasi terminal dimulai, pipa aliran dijalankan secara berurutan atau paralel tergantung pada mode aliran yang digunakan. sumber yang sama

Seperti yang Anda sebutkan, stream paralel menggunakan iterator split. Jelas, ini untuk mempartisi data sebelum operasi mulai berjalan.


Dalam kasus saya, inputnya adalah aliran file IO. Iterator split mana yang akan digunakan?

Melihat sumbernya, saya melihatnya menggunakan java.nio.file.FileChannelLinesSpliterator


Tidak masalah di mana kami menempatkan paralel () di dalam pipa. Sumber input asli akan selalu dipecah dan operasi perantara yang tersisa akan diterapkan.

Baik. Anda bahkan dapat menelepon parallel()dan sequential()beberapa kali. Yang dipanggil terakhir akan menang. Saat kami menelepon parallel(), kami mengaturnya untuk aliran yang dikembalikan; dan seperti yang dinyatakan di atas, semua operasi berjalan secara berurutan atau paralel.


Dalam hal ini, Java seharusnya tidak mengizinkan pengguna untuk melakukan operasi paralel di mana saja dalam pipa kecuali pada sumber aslinya ...

Ini menjadi masalah pendapat. Saya pikir Zabuza memberikan alasan yang bagus untuk mendukung pilihan desainer JDK.


Satu-satunya cara untuk mencapai adalah menulis iterator split saya sendiri. Apakah ada cara lain?

Ini tergantung pada operasi Anda

  • Jika findFirst()operasi terminal Anda sebenarnya, maka Anda bahkan tidak perlu khawatir tentang eksekusi paralel, karena bagaimanapun juga tidak akan ada banyak panggilan doSomething()( findFirst()adalah hubungan arus pendek). .parallel()sebenarnya dapat menyebabkan lebih dari satu elemen diproses, sementara findFirst()pada aliran sekuensial akan mencegahnya.
  • Jika operasi terminal Anda tidak menghasilkan banyak data, maka mungkin Anda bisa membuat Recordobjek menggunakan aliran sekuensial, kemudian memproses hasilnya secara paralel:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
  • Jika pipa Anda akan memuat banyak data dalam memori (yang mungkin menjadi alasan Anda menggunakan Files.lines()), maka mungkin Anda akan memerlukan iterator pemecah kustom. Sebelum saya pergi ke sana, saya akan melihat opsi lain (seperti garis simpanan dengan kolom id untuk memulai - itu hanya pendapat saya).
    Saya juga mencoba memproses catatan dalam kumpulan yang lebih kecil, seperti ini:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }

    Ini dijalankan doSomeOperation()secara paralel tanpa memuat semua data ke dalam memori. Tetapi perhatikan bahwa batchSizeperlu dipikirkan.

ernest_k
sumber
1
Terimakasih atas klarifikasinya. Adalah baik untuk mengetahui tentang solusi ke-3 yang telah Anda sorot. Saya akan memeriksanya karena saya belum pernah menggunakan takeWhile dan Supplier.
penjelajah
2
SpliteratorImplementasi kustom tidak akan lebih rumit dari ini, sementara memungkinkan pemrosesan paralel yang lebih efisien ...
Holger
1
Setiap parallelStreamoperasi internal Anda memiliki overhead tetap untuk memulai operasi dan menunggu hasil akhirnya, sementara dibatasi pada paralelisme batchSize. Pertama, Anda perlu kelipatan jumlah inti CPU yang tersedia saat ini untuk menghindari utas menganggur. Kemudian, jumlahnya harus cukup tinggi untuk mengkompensasi overhead tetap, tetapi semakin tinggi jumlahnya, semakin tinggi jeda yang diberlakukan oleh operasi pembacaan sekuensial terjadi sebelum pemrosesan paralel dimulai.
Holger
1
Mengubah paralel aliran luar akan menyebabkan gangguan buruk dengan bagian dalam implementasi saat ini, selain titik yang Stream.generatemenghasilkan aliran tidak teratur, yang tidak berfungsi dengan kasus penggunaan yang dimaksudkan OP seperti findFirst(). Sebaliknya, aliran paralel tunggal dengan spliterator yang mengembalikan bongkahan dalam trySplitpekerjaan lurus ke depan dan memungkinkan pekerja thread untuk memproses bongkahan berikutnya tanpa menunggu penyelesaian sebelumnya.
Holger
2
Tidak ada alasan untuk menganggap bahwa suatu findFirst()operasi hanya akan memproses sejumlah kecil elemen. Kecocokan pertama mungkin masih terjadi setelah memproses 90% dari semua elemen. Selanjutnya, ketika memiliki sepuluh juta baris, bahkan menemukan kecocokan setelah 10% masih membutuhkan pemrosesan satu juta baris.
Holger
7

Desain Stream asli menyertakan ide untuk mendukung tahapan pipa berikutnya dengan pengaturan eksekusi paralel yang berbeda, tetapi ide ini telah ditinggalkan. API dapat berasal dari saat ini, tetapi di sisi lain, desain API yang memaksa penelepon untuk membuat keputusan tunggal untuk eksekusi paralel atau berurutan akan jauh lebih rumit.

Yang sebenarnya Spliteratordigunakan oleh Files.lines(…)tergantung pada implementasi. Di Java 8 (Oracle atau OpenJDK), Anda selalu mendapatkan yang sama dengan BufferedReader.lines(). Di JDK yang lebih baru, jika Pathmilik filesystem default dan charset adalah salah satu yang didukung untuk fitur ini, Anda mendapatkan Stream dengan Spliteratorimplementasi khusus , the java.nio.file.FileChannelLinesSpliterator. Jika prasyarat tidak terpenuhi, Anda mendapatkan yang sama dengan BufferedReader.lines(), yang masih didasarkan pada yang Iteratorditerapkan di dalam BufferedReaderdan dibungkus melalui Spliterators.spliteratorUnknownSize.

Tugas spesifik Anda paling baik ditangani dengan kebiasaan Spliteratoryang dapat melakukan penomoran baris langsung pada sumbernya, sebelum pemrosesan paralel, untuk memungkinkan pemrosesan paralel berikutnya tanpa batasan.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
sumber
0

Dan berikut ini adalah demonstrasi sederhana ketika penerapan paralel diterapkan. Output dari mengintip dengan jelas menunjukkan perbedaan antara dua contoh. Catatan: mapPanggilan baru saja dilemparkan untuk menambahkan metode lain sebelum parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
sumber