Cara membuat beberapa utas untuk setiap item permintaan

9

Saya mencoba memproses kode di bawah ini menggunakan multithreading di tingkat pesanan.

List<String> orders = Arrays.asList("order1", "order2", 
                   "order3", "order4", "order1");

Eksekusi sekuensial saat ini:

orders.stream().forEach(order -> {
    rules.forEach(rule -> {
        finalList.add(beanMapper.getBean(rule)
                .applyRule(createTemplate.apply(getMetaData.apply(rule), command),
                           order));
    });
});

Saya sudah mencoba menggunakan:

orders.parallelStream().forEach(order -> {}} // code snippet.

Tapi itu mengubah pesanan rules.forEach (rule -> {}} .

Misalnya:
Input:

 List<String> orders = Arrays.asList("order1", "order2", 
                         "order3", "order4", "order1");
 List<String> rules = Arrays.asList("rule1", "rule2", "rule3");

Output yang diharapkan:

order1 with rule1, rule2, rule3
order2 with rule1, rule2, rule3

Output aktual dengan parallelStream():

order1 with rule3, rule1, rule2
order1 with rule2, rule1, rule3

Saya tidak peduli tentang urutan pesanan , tetapi saya peduli tentang aturan pesanan . Pesanan dapat diproses dalam pesanan apa pun, tetapi aturan harus dijalankan dalam urutan yang sama untuk setiap pesanan.

Tolong bantu.

mayank bisht
sumber

Jawaban:

4

Kamu bisa menggunakan :

orders.stream().parallel().forEachOrdered(// Your rules logic goes here. )

ForEachOrdered menjamin untuk menjaga urutan Stream.

Jadi untuk referensi Anda:

orders.stream().parallel().forEachOrdered( order -> {

            rules.stream().parallel().forEachOrdered ( rule -> {

                 System.out.println( " Order : " + order + " rule :" + rule);
            });

        });

Catatan: Sementara kita bisa melakukan ini, kinerja harus diawasi dengan ketat karena parellelisme dan ketertiban tidak saling menikah dengan sangat baik!

Keluaran

 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order4 rule :rule1
 Order : order4 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order1 rule :rule3
Pramod S. Nikam
sumber
Terima kasih atas jawabannya. forEachOrdered menjamin urutan aliran, tetapi kemudian juga memperlambat kinerja. Saya mencobanya & aplikasi membutuhkan waktu yang mirip dengan pemrosesan sekuensial. stream (). parallel & forEachOrdered tidak saling melengkapi.
mayank bisht
Yup, saya setuju kita perlu melakukan analisis latensi penuh sebelum melakukan ini.
Pramod S. Nikam
Yup, saya mendapatkan kinerja yang sama menggunakan ini, tidak ada peningkatan.
mayank bisht
1
Dengan cermat mengikuti utas ini untuk mendapatkan solusi yang lebih baik untuk melakukan ini.
Pramod S. Nikam
Bisakah saya mencapai pemrosesan paralel menggunakan ExecutorService?
mayank bisht
1

Anda menambahkan elemen ke finalListdari utas yang berbeda secara bersamaan. Hal ini menyebabkan pencampuran hasil penerapan aturan ke pesanan yang berbeda (aturan tidak dikelompokkan berdasarkan pesanan mereka).

Anda dapat memperbaikinya dengan membuat daftar sementara untuk masing-masing orderdan kemudian secara bersamaan menggabungkan semua daftar sementara di a finalList.

Berikut ini cara melakukannya dengan menggunakan Stream-API (Java 9+):

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.flatMapping(Collection::stream, Collectors.toList()));

Catatan: Collectors.flatMapping()digunakan di sini alih-alih sederhana flatMapuntuk menjalankan pemetaan datar secara sinkron selama pengumpulan aliran.


Analog Java 8:

List<AppliedRule> finalList = orders.parallelStream().map(order ->
        rules.stream().map(rule -> applyRule(order, rule)).collect(Collectors.toList())
).collect(Collectors.toList())
        .stream()
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
Pisang
sumber
Terima kasih atas jawabannya. Saya mencoba pendekatan Anda & saya mendapatkan java.util.ConcurrentModificationException: null
mayank bisht
finalList = orders.parallelStream () .map (order -> rules.stream () .map (rule -> beanMapper.getBean (rule) .applyRule (createTemplate.apply (getMetaData.apply (rule), command), order)) .collect (Collectors.toList ())). collect (Collectors.toList ()). stream (). flatMap (Collection :: stream) .collect (Collectors.toList ());
mayank bisht
@mayankbisht, ini berarti itu beanMapper.getBean(rule) .applyRule(createTemplate.apply(getMetaData.apply(rule), command), order)bukan fungsi murni, jadi tidak bisa digunakan secara paralel. Cobalah untuk menghilangkan semua efek samping darinya; ConcurrentModificationExceptionstack trace dapat membantu menemukannya.
Bananon
0

Akankah ini berhasil?

final int rulesSize = rules.size();
AtomicInteger atomicInteger = new AtomicInteger(0);
orders.stream().parallel().forEach(order -> {
    IntStream.range(0, rulesSize).parallel().forEach( i -> {
        synchronized (atomicInteger) {
            System.out.println(" Order : " + order + " rule :" + rules.get(atomicInteger.getAndIncrement() % rulesSize));
        }
    });
});

Keluaran

 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
 Order : order3 rule :rule1
 Order : order3 rule :rule2
 Order : order3 rule :rule3
 Order : order2 rule :rule1
 Order : order2 rule :rule2
 Order : order2 rule :rule3
 Order : order1 rule :rule1
 Order : order1 rule :rule2
 Order : order4 rule :rule3
 Order : order1 rule :rule1
 Order : order4 rule :rule2
 Order : order1 rule :rule3
BHAWANI SINGH
sumber
0

Urutan pesanan bisa apa saja, tetapi urutan aturan tidak boleh berubah. Juga untuk aturan pesanan tertentu harus datang dalam grup.

Jika itu masalahnya, tidak ada ruang untuk paralelisme yang sebenarnya.

Kapan

order1-rule1
order1-rule2
order2-rule1
order2-rule2

dan

order2-rule1
order2-rule2
order1-rule1
order1-rule2

hanya menjalankan yang valid untuk 2 pesanan dan 2 aturan,
dan

order1-rule1
order2-rule1
order1-rule2
order2-rule2

dianggap tidak valid, itu bukan paralelisme, hanya pengacakan orders, mungkin tanpa keuntungan. Jika Anda "bosan" dengan order1datang pertama kali, Anda dapat mengacak daftar, tetapi hanya itu:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    Collections.shuffle(orders);
    orders.forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Bahkan streaming tidak diperlukan, hanya dua loop bersarang. Uji: https://ideone.com/qI3dqd

order2-rule1
order2-rule2
order2-rule3
order4-rule1
order4-rule2
order4-rule3
order1-rule1
order1-rule2
order1-rule3
order3-rule1
order3-rule2
order3-rule3


Jawaban asli

Tapi itu mengubah pesanan rules.forEach (rule -> {}}.

Tidak. The orders mungkin tumpang tindih, tetapi urutan rules untuk setiap pesanan disimpan. Mengapa non-paralel forEachmelakukan hal lain?

Kode contoh:

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            System.out.println(order+"-"+rule);
        });
    });
}

Uji: https://ideone.com/95Cybg
Contoh keluaran:

order2-rule1
order2-rule2
order2-rule3
order1-rule1
order1-rule2
order1-rule3
order4-rule1
order4-rule2
order4-rule3
order3-rule1
order3-rule2
order3-rule3

Urutan orders dicampur, tetapi ruleselalu 1-2-3. Saya pikir output Anda hanya menyembunyikan pasangan (sebenarnya Anda tidak menunjukkan bagaimana itu dihasilkan).

Tentu saja dapat diperpanjang dengan beberapa penundaan, jadi pemrosesan orders sebenarnya akan tumpang tindih:

public static void delay(){
    try{
        Thread.sleep(ThreadLocalRandom.current().nextInt(100,300));
    }catch(Exception ex){}
}

public static void main (String[] args) throws java.lang.Exception
{
    List<String> orders = Arrays.asList("order1", "order2", "order3", "order4");
    List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
    orders.stream().parallel().forEach(order->{
        rules.forEach(rule->{
            delay();
            System.out.println(order+"-"+rule);
        });
    });
}

Uji: https://ideone.com/cSFaqS
Contoh keluaran:

order3-rule1
order2-rule1
order2-rule2
order3-rule2
order3-rule3
order2-rule3
order1-rule1
order4-rule1
order1-rule2
order4-rule2
order4-rule3
order1-rule3

Ini mungkin sesuatu yang Anda lihat, hanya tanpa orderxbagian. Dengan orders yang terlihat, dapat dilacak yang ruleterus menjadi 1-2-3, perorder . Juga, daftar contoh Anda berisi order1dua kali yang pasti tidak membantu dalam melihat apa yang terjadi.

tevemadar
sumber
Terima kasih atas jawabannya. Output di atas mungkin benar untuk jumlah pesanan yang lebih sedikit. Tetapi jika Anda meningkatkan pesanan, maka Anda akan mendapatkan output yang berbeda. Untuk mis. (Order4-rule1 order4-rule2 order4-rule1) (order1-rule1 order1-rule2) (order3-rule1 order3-rule2) (order4-rule1 order4-rule2 order4-rule1 order4-rule2).
mayank bisht
Urutan pesanan bisa apa saja, tetapi urutan aturan tidak boleh berubah. Juga untuk aturan pesanan tertentu harus datang dalam grup. Misalnya (order1- rule 1 order1-rule2 order1-rule3) dan bukan (order1-rule1 order2-rule1 order1-rule2 order1-rule3).)
mayank bisht
@ mayankbisht Saya pikir pembatasan ini tidak memungkinkan pemrosesan paralel. Lihat jawaban yang diperbarui (saya menulis bagian baru di awal).
tevemadar
Yup, saya mengerti itu, dan itulah mengapa saya memposting pertanyaan ini di sini. Saya pikir mungkin ada cara lain untuk melakukan ini, atau mungkin kita dapat mengubah algo
mayank bisht
@mayankbisht Anda bisa menjelaskan mengapa orders tidak bisa tumpang tindih (apakah ruleitu stateful mungkin, dan ada dalam jumlah terbatas, mungkin hanya satu?). Tetapi secara umum tidak ada paralelisme tanpa hal-hal yang berjalan paralel, itulah inti dari paralelisme.
tevemadar
0

Jika Anda tidak keberatan untuk mencoba perpustakaan pihak ketiga. Berikut adalah contoh dengan perpustakaan saya: sempoa-util

StreamEx.of(orders).parallelStream().forEach(order -> {}}

Dan Anda bahkan dapat menentukan nomor utas:

StreamEx.of(orders).parallelStream(maxThreadNum).forEach(order -> {}}

Urutan ruleakan disimpan.

By the way, karena itu dalam aliran paralel, potongan kode ...finalList.add(...kemungkinan besar tidak akan berfungsi. Saya pikir lebih baik mengumpulkan hasilnya ke daftar:

StreamEx.of(orders).parallelStream().map/flatMap(order -> {...}}.toList()

itu juga bisa dilakukan bahkan jika Anda ingin menjaga urutan orderuntuk beberapa alasan nanti:

StreamEx.of(orders).indexed().parallelStream()
      .map/flatMap(order -> {...}}.sortedBy(...index).toList()
user_3380739
sumber