Bisakah Anda membagi aliran menjadi dua aliran?

146

Saya memiliki kumpulan data yang diwakili oleh aliran Java 8:

Stream<T> stream = ...;

Saya dapat melihat cara memfilternya untuk mendapatkan subset acak - misalnya

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

Saya juga bisa melihat bagaimana saya bisa mengurangi aliran ini untuk mendapatkan, misalnya, dua daftar yang mewakili dua bagian acak dari kumpulan data, dan kemudian mengubahnya kembali menjadi aliran. Tapi, adakah cara langsung untuk menghasilkan dua aliran dari yang pertama? Sesuatu seperti

(heads, tails) = stream.[some kind of split based on filter]

Terima kasih atas wawasannya.

pengguna1148758
sumber
Jawaban Markus jauh lebih membantu daripada jawaban Louis tetapi saya harus mengatakan bahwa jawaban Louis lebih terkait dengan pertanyaan aslinya. Pertanyaannya adalah bukan difokuskan pada kemungkinan untuk mengkonversi Streamke beberapa Streams tanpa konversi menengah , meskipun saya pikir orang-orang yang mencapai pertanyaan ini benar-benar mencari jalan untuk mencapai sehingga terlepas dari kendala tersebut, yang merupakan jawaban Markus. Ini mungkin karena fakta bahwa pertanyaan dalam judul tidak sama dengan yang ada dalam deskripsi .
devildelta

Jawaban:

9

Tidak persis. Anda tidak bisa mendapatkan dua Streamdari satu; ini tidak masuk akal - bagaimana Anda beralih satu tanpa perlu menghasilkan yang lain pada saat yang sama? Streaming hanya dapat dioperasikan sekali.

Namun, jika Anda ingin membuangnya ke daftar atau sesuatu, Anda bisa melakukannya

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Louis Wasserman
sumber
65
Mengapa itu tidak masuk akal? Karena aliran adalah pipa, tidak ada alasan tidak bisa membuat dua produsen aliran asli, saya bisa melihat ini ditangani oleh seorang kolektor yang menyediakan dua aliran.
Brett Ryan
36
Tidak aman utas. Saran buruk mencoba menambahkan langsung ke koleksi, itulah sebabnya kami memiliki stream.collect(...)for - for-safe dengan standar Collectors, yang berfungsi dengan baik bahkan pada Koleksi non-thread-safe (tanpa pertikaian kunci yang disinkronkan). Jawaban terbaik oleh @MarkJeronimus.
YoYo
1
@ JOOK Ini aman dari benang jika kepala dan ekor aman dari benang. Selain itu, dengan asumsi penggunaan aliran non-paralel, hanya pesanan tidak dijamin, sehingga aman digunakan. Terserah programmer untuk memperbaiki masalah konkurensi, jadi jawaban ini sangat cocok jika koleksi aman.
Nicolas
1
@Nixon tidak cocok dengan adanya solusi yang lebih baik, yang kami miliki di sini. Memiliki kode seperti itu dapat menyebabkan preseden buruk, menyebabkan orang lain menggunakannya dengan cara yang salah. Bahkan jika tidak ada aliran paralel yang digunakan, itu hanya satu langkah lagi. Praktik pengkodean yang baik mengharuskan kami untuk tidak mempertahankan status selama operasi streaming. Hal berikutnya yang kami lakukan adalah mengkode dalam kerangka kerja seperti percikan Apache, dan praktik yang sama akan benar-benar menghasilkan hasil yang tidak terduga. Itu solusi kreatif, saya berikan itu, yang mungkin sudah saya tulis sendiri belum lama ini.
YoYo
1
@ JOOK Ini bukan solusi yang lebih baik, ini sebenarnya lebih tidak efisien. Garis pemikiran itu akhirnya berakhir dengan kesimpulan bahwa semua Koleksi harus di-thread aman secara default untuk mencegah konsekuensi yang tidak diinginkan, yang memang salah.
Nicolas
301

Seorang kolektor dapat digunakan untuk ini.

  • Untuk dua kategori, gunakan Collectors.partitioningBy()pabrik.

Ini akan membuat Mapdari Booleanke List, dan meletakkan item dalam satu atau daftar lain berdasarkan a Predicate.

Catatan: Karena aliran perlu dikonsumsi keseluruhan, ini tidak dapat bekerja pada aliran yang tak terbatas. Dan karena aliran dikonsumsi pula, metode ini hanya menempatkan mereka di Daftar daripada membuat aliran-dengan-memori baru. Anda selalu dapat melakukan streaming daftar tersebut jika Anda membutuhkan streaming sebagai output.

Juga, tidak perlu untuk iterator, bahkan tidak dalam contoh head-only yang Anda berikan.

  • Pemisahan biner terlihat seperti ini:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • Untuk kategori lainnya, gunakan Collectors.groupingBy()pabrik.
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

Jika aliran tidak Stream, tetapi salah satu aliran primitif suka IntStream, maka .collect(Collectors)metode ini tidak tersedia. Anda harus melakukannya dengan cara manual tanpa pabrik pengumpul. Implementasinya terlihat seperti ini:

[Contoh 2.0 sejak 2020-04-16]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

Dalam contoh ini saya menginisialisasi ArrayLists dengan ukuran penuh dari koleksi awal (jika ini diketahui sama sekali). Ini mencegah mengubah ukuran peristiwa bahkan dalam skenario terburuk, tetapi berpotensi dapat melahap ruang 2 * N * T (N = jumlah elemen awal, T = jumlah utas). Untuk menukar ruang untuk kecepatan, Anda dapat meninggalkannya atau menggunakan tebakan terbaik Anda, seperti jumlah elemen tertinggi yang diharapkan dalam satu partisi (biasanya lebih dari N / 2 untuk pemisahan seimbang).

Saya harap saya tidak menyinggung siapa pun dengan menggunakan metode Java 9. Untuk versi Java 8, lihat pada edit history.

Mark Jeronimus
sumber
2
Cantik. Namun, solusi terakhir untuk IntStream tidak akan aman dalam hal aliran paralel. Solusinya jauh lebih sederhana daripada yang Anda pikirkan ... stream.boxed().collect(...);! Ini akan melakukan seperti yang diiklankan: konversi primitif IntStreamke Stream<Integer>versi kotak .
YoYo
32
Ini harus menjadi jawaban yang diterima karena langsung memecahkan pertanyaan OP.
ejel
27
Saya berharap Stack Overflow akan memungkinkan komunitas untuk menimpa jawaban yang dipilih jika yang lebih baik ditemukan.
GuiSim
Saya tidak yakin ini menjawab pertanyaan. Pertanyaan meminta pemisahan aliran menjadi aliran - bukan Daftar.
AlikElzin-kilaka
1
Fungsi akumulator tidak perlu verbose. Alih-alih (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }Anda cukup menggunakan (map, x) -> map.get(p.test(x)).add(x). Lebih jauh, saya tidak melihat alasan mengapa collectoperasi ini tidak aman. Ia bekerja persis seperti yang seharusnya bekerja dan sangat erat dengan cara Collectors.partitioningBy(p)kerjanya. Tapi saya akan menggunakan IntPredicatebukan Predicate<Integer>saat tidak menggunakan boxed(), untuk menghindari tinju dua kali.
Holger
21

Saya menemukan pertanyaan ini pada diri saya dan saya merasa bahwa aliran bercabang memiliki beberapa kasus penggunaan yang dapat membuktikan valid. Saya menulis kode di bawah ini sebagai konsumen sehingga tidak melakukan apa pun tetapi Anda dapat menerapkannya pada fungsi dan hal lain yang mungkin Anda temui.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Sekarang implementasi kode Anda bisa seperti ini:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
Ludger
sumber
20

Sayangnya, apa yang Anda minta langsung disukai di JavaDoc of Stream :

Suatu aliran harus dioperasikan pada (menjalankan operasi aliran antara atau terminal) hanya sekali. Ini mengesampingkan, misalnya, aliran "bercabang", di mana sumber yang sama memberi makan dua atau lebih saluran pipa, atau beberapa lintasan lintas dari aliran yang sama.

Anda dapat mengatasi ini menggunakan peekatau metode lain jika Anda benar-benar menginginkan jenis perilaku itu. Dalam hal ini, yang harus Anda lakukan adalah alih-alih mencoba untuk mundur dua aliran dari sumber Stream asli yang sama dengan filter forking, Anda akan menduplikasi aliran Anda dan memfilter masing-masing duplikat dengan tepat.

Namun, Anda mungkin ingin mempertimbangkan kembali jika a Streamadalah struktur yang sesuai untuk kasus penggunaan Anda.

Trevor Freeman
sumber
6
Kata-kata javadoc tidak mengecualikan mempartisi menjadi beberapa aliran selama item aliran tunggal hanya berjalan di salah satu dari ini
Thorbjørn Ravn Andersen
2
@ ThorbjørnRavnAndersen Saya tidak yakin menduplikasi item aliran adalah hambatan utama untuk aliran bercabang dua. Masalah utama adalah bahwa operasi forking pada dasarnya adalah operasi terminal, jadi ketika Anda memutuskan untuk melakukan fork, Anda pada dasarnya membuat koleksi sejenis. Misalnya saya dapat menulis sebuah metode List<Stream> forkStream(Stream s)tetapi aliran yang dihasilkan saya setidaknya akan sebagian didukung oleh koleksi dan tidak secara langsung oleh aliran yang mendasarinya, sebagai lawan untuk mengatakan filteryang bukan operasi aliran terminal.
Trevor Freeman
7
Ini adalah salah satu alasan saya merasa aliran Java agak setengah-setengah dibandingkan dengan github.com/ReactiveX/RxJava/wiki karena titik alirannya adalah untuk menerapkan operasi pada set elemen yang berpotensi tak terbatas dan operasi dunia nyata sering membutuhkan pemisahan. , menggandakan dan menggabungkan aliran.
Usman Ismail
8

Ini bertentangan dengan mekanisme umum Stream. Katakanlah Anda dapat membagi Stream S0 ke Sa dan Sb seperti yang Anda inginkan. Melakukan operasi terminal apa pun, katakanlah count(), pada Sa akan selalu "mengkonsumsi" semua elemen di S0. Karena itu Sb kehilangan sumber datanya.

Sebelumnya, Stream punya tee()metode, saya pikir, yang menduplikasi aliran menjadi dua. Sudah dihapus sekarang.

Aliran memiliki metode mengintip (), Anda mungkin dapat menggunakannya untuk mencapai kebutuhan Anda.

ZhongYu
sumber
1
peekpersis seperti dulu tee.
Louis Wasserman
5

tidak persis, tetapi Anda mungkin dapat mencapai apa yang Anda butuhkan dengan memohon Collectors.groupingBy(). Anda membuat Koleksi baru, dan kemudian dapat instantiate stream pada koleksi baru itu.

aepurniet
sumber
2

Ini adalah jawaban yang paling buruk yang bisa kudapat.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

Ini mengambil aliran bilangan bulat dan membaginya menjadi 5. Untuk yang lebih besar dari 5 itu hanya menyaring bilangan genap dan menempatkannya dalam daftar. Selebihnya bergabung dengan mereka dengan |.

output:

 ([6, 8],0|1|2|3|4|5)

Tidak ideal karena mengumpulkan segala sesuatu ke dalam koleksi perantara yang memecah aliran (dan memiliki terlalu banyak argumen!)

Ian Jones
sumber
1

Saya menemukan pertanyaan ini sambil mencari cara untuk memfilter elemen tertentu dari aliran dan mencatatnya sebagai kesalahan. Jadi saya tidak benar-benar perlu untuk membagi aliran sebanyak melampirkan tindakan penghentian prematur untuk predikat dengan sintaksis yang tidak mengganggu. Inilah yang saya pikirkan:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
Sebastian Hans
sumber
0

Versi lebih pendek yang menggunakan Lombok

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}
OneCricketeer
sumber
-3

Bagaimana tentang:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
Matius
sumber
1
Karena pemasok dipanggil dua kali, Anda akan mendapatkan dua koleksi acak yang berbeda. Saya pikir itu adalah pikiran OP untuk memisahkan peluang dari yang sama dalam urutan yang sama
usr-local-ΕΨΗΕΛΩΝ