Batasi aliran dengan predikat

187

Apakah ada operasi streaming Java 8 yang membatasi a (berpotensi tak terbatas) Streamhingga elemen pertama gagal mencocokkan predikat?

Di Java 9 kita dapat menggunakan takeWhileseperti pada contoh di bawah ini untuk mencetak semua angka kurang dari 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

Karena tidak ada operasi seperti itu di Java 8, apa cara terbaik untuk mengimplementasikannya secara umum?

MForster
sumber
1
Informasi yang mungkin berguna di: stackoverflow.com/q/19803058/248082
nobeh
Saya bertanya-tanya bagaimana para arsitek bisa melewati "untuk apa kita sebenarnya menggunakan ini" tanpa berlari ke dalam usecase ini. Pada Java 8 Streaming hanya benar-benar bermanfaat untuk struktur data yang ada: - /
Thorbjørn Ravn Andersen
Dengan Java 9, akan lebih mudah untuk menulisIntStream.iterate(1, n->n<10, n->n+1).forEach(System.out::print);
Marc Dzaebel

Jawaban:

81

Operasi semacam itu seharusnya dimungkinkan dengan Java 8 Stream, tetapi itu tidak dapat dilakukan dengan efisien - misalnya, Anda tidak dapat selalu memparalelkan operasi seperti itu, karena Anda harus melihat elemen-elemen secara berurutan.

API tidak menyediakan cara mudah untuk melakukannya, tetapi apa yang mungkin cara paling sederhana adalah dengan mengambil Stream.iterator(), membungkusnya dengan Iteratorimplementasi "take-while", dan kemudian kembali ke a Spliteratordan kemudian a Stream. Atau - mungkin - bungkus Spliterator, meskipun itu tidak dapat dipecah lagi dalam implementasi ini.

Berikut ini implementasi yang belum teruji takeWhilepada Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}
Louis Wasserman
sumber
8
Secara teori, memparalelkan takeWhile dengan predikat stateless itu mudah. Mengevaluasi kondisi dalam batch paralel (dengan asumsi predikat tidak melempar atau memiliki efek samping jika dijalankan beberapa kali tambahan). Masalahnya adalah melakukannya dalam konteks dekomposisi rekursif (fork / join framework) yang digunakan stream. Sungguh, itu Streaming yang sangat tidak efisien.
Aleksandr Dubinsky
91
Streaming akan jauh lebih baik jika mereka tidak begitu sibuk dengan paralelisme otomatis. Paralelisme hanya diperlukan di sebagian kecil tempat di mana Streaming dapat digunakan. Selain itu, jika Oracle sangat peduli pada perfoma, mereka bisa membuat JVM JIT melakukan otomatisasi, dan mendapatkan peningkatan kinerja yang jauh lebih besar, tanpa mengganggu pengembang. Sekarang paralelisme otomatis dilakukan dengan benar.
Aleksandr Dubinsky
Anda harus memperbarui jawaban ini sekarang setelah Java 9 dirilis.
Radiodef
4
Tidak, @Radiodef. Pertanyaannya menanyakan secara khusus untuk solusi Java 8.
Renato Kembali
146

Operasi takeWhiledan dropWhiletelah ditambahkan ke JDK 9. Kode contoh Anda

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

akan berperilaku tepat seperti yang Anda harapkan ketika dikompilasi dan dijalankan di bawah JDK 9.

JDK 9 telah dirilis. Ini tersedia untuk diunduh di sini: http://jdk.java.net/9/

Stuart Marks
sumber
3
Tautan langsung ke dokumen pratinjau untuk JDK9 Stream, dengan takeWhile/ dropWhile: download.java.net/jdk9/docs/api/java/util/stream/Stream.html
Miles
1
Apakah ada alasan mengapa mereka dipanggil takeWhiledan dropWhilebukannya limitWhiledan skipWhile, untuk konsistensi dengan API yang ada?
Lukas Eder
10
@LukasEder takeWhiledan dropWhilecukup luas, terjadi di Scala, Python, Groovy, Ruby, Haskell, dan Clojure. Asimetri dengan skipdan limittidak menguntungkan. Mungkin skipdan limitseharusnya dipanggil dropdan take, tetapi itu tidak seintuitif kecuali Anda sudah terbiasa dengan Haskell.
Stuart Marks
3
@StuartMarks: Saya mengerti dropXXXdan takeXXXistilah yang lebih populer tapi saya pribadi bisa hidup dengan lebih banyak SQL-esque limitXXXdan skipXXX. Saya menemukan asimetri baru ini jauh lebih membingungkan daripada pilihan istilah individu ... :) (btw: Scala juga punya drop(int)dan take(int))
Lukas Eder
1
ya izinkan saya upgrade ke Jdk 9 dalam produksi. Banyak pengembang masih di Jdk8, fitur seperti itu seharusnya sudah disertakan dengan Streams sejak awal.
wilmol
50

allMatch()adalah fungsi hubungan arus pendek, sehingga Anda dapat menggunakannya untuk menghentikan pemrosesan. Kerugian utama adalah Anda harus melakukan tes dua kali: sekali untuk melihat apakah Anda harus memprosesnya, dan sekali lagi untuk melihat apakah akan terus berjalan.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);
Michael Rowley
sumber
5
Ini tampaknya tidak intuitif bagi saya pada awalnya (diberi nama metode), tetapi dokumen mengkonfirmasi bahwa itu Stream.allMatch()adalah operasi hubungan arus pendek . Jadi ini akan selesai bahkan pada aliran infinite seperti IntStream.iterate(). Tentu saja, dalam retrospeksi, ini adalah optimasi yang masuk akal.
Bailey Parker
3
Ini rapi, tetapi saya tidak berpikir itu berkomunikasi dengan baik bahwa maksudnya adalah tubuh peek. Jika saya bertemu bulan depan, saya akan bertanya-tanya mengapa programmer sebelum saya memeriksa apakah allMatchdan kemudian mengabaikan jawabannya.
Joshua Goldberg
10
Kerugian dari solusi ini adalah ia mengembalikan boolean sehingga Anda tidak dapat mengumpulkan hasil streaming seperti biasa.
neXus
35

Sebagai tindak lanjut dari jawaban @StuartMarks . Pustaka StreamEx saya memiliki takeWhileoperasi yang kompatibel dengan implementasi JDK-9 saat ini. Ketika berjalan di bawah JDK-9 itu hanya akan mendelegasikan ke implementasi JDK (melalui MethodHandle.invokeExactyang sangat cepat). Saat berjalan di bawah JDK-8, implementasi "polyfill" akan digunakan. Jadi, menggunakan perpustakaan saya masalahnya bisa diselesaikan seperti ini:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);
Tagir Valeev
sumber
Mengapa Anda belum menerapkannya untuk kelas StreamEx?
Someguy
@Segeguy saya mengimplementasikannya.
Tagir Valeev
14

takeWhileadalah salah satu fungsi yang disediakan oleh perpustakaan protonpack .

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));
Dominic Fox
sumber
11

Pembaruan: Java 9 Streamsekarang hadir dengan metode takeWhile .

Tidak perlu peretasan atau solusi lain. Gunakan saja itu!


Saya yakin ini bisa sangat ditingkatkan: (seseorang mungkin bisa membuat thread-safe mungkin)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Peretasan pasti ... Tidak elegan - tetapi berhasil ~: D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}
Koordinator
sumber
8

Anda dapat menggunakan java8 + rxjava .

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));
frhack
sumber
6

Sebenarnya ada 2 cara untuk melakukannya di Java 8 tanpa perpustakaan tambahan atau menggunakan Java 9.

Jika Anda ingin mencetak angka dari 2 hingga 20 pada konsol Anda dapat melakukan ini:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

atau

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

Output dalam kedua kasus:

2
4
6
8
10
12
14
16
18
20

Tidak ada yang disebutkan anyMatch belum. Ini adalah alasan untuk posting ini.

gil.fernandes
sumber
5

Ini adalah sumber yang disalin dari JDK 9 java.util.stream.Stream.takeWhile (Predikat). Sedikit perbedaan untuk bekerja dengan JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}
martian
sumber
4

Ini adalah versi yang dilakukan pada int - seperti yang ditanyakan dalam pertanyaan

Pemakaian:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Berikut kode untuk StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}
Chris Greenaway
sumber
2

Pergi untuk mendapatkan perpustakaan AbacusUtil . Ini menyediakan API persis yang Anda inginkan dan banyak lagi:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Deklarasi : Saya pengembang AbacusUtil.

user_3380739
sumber
0

Anda tidak dapat membatalkan aliran kecuali dengan operasi terminal hubungan pendek, yang akan membuat beberapa nilai aliran tidak diproses terlepas dari nilainya. Tetapi jika Anda hanya ingin menghindari operasi pada aliran Anda dapat menambahkan transformasi dan filter ke aliran:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

Itu mengubah aliran hal-hal menjadi nol ketika hal-hal memenuhi beberapa kondisi, lalu menyaring nol. Jika Anda ingin menikmati efek samping, Anda dapat mengatur nilai kondisi menjadi true setelah beberapa hal ditemukan, sehingga semua hal berikutnya disaring terlepas dari nilainya. Tetapi bahkan jika tidak, Anda dapat menyimpan banyak pemrosesan (jika tidak semuanya) dengan memfilter nilai dari aliran yang tidak ingin Anda proses.

Matius
sumber
Ini payah bahwa beberapa penilai anonim merendahkan jawaban saya tanpa mengatakan alasannya. Jadi saya dan pembaca lain tidak tahu apa yang salah dengan jawaban saya. Dengan tidak adanya pembenaran mereka, saya akan menganggap kritik mereka tidak valid, dan jawaban saya sebagai benar.
Matius
Jawaban Anda tidak menyelesaikan masalah OP, yang berhadapan dengan aliran tak terbatas. Ini juga tampaknya mempersulit hal-hal yang tidak perlu karena Anda dapat menulis kondisi di filter () memanggil dirinya sendiri, tanpa perlu peta (). Pertanyaan sudah memiliki kode contoh, coba terapkan jawaban Anda untuk kode itu dan Anda akan melihat program akan berulang selamanya.
SenoCtar
0

Bahkan saya memiliki persyaratan yang sama - meminta layanan web, jika gagal, coba lagi 3 kali. Jika gagal bahkan setelah banyak percobaan ini, kirim pemberitahuan email. Setelah banyak googling, anyMatch()datang sebagai penyelamat. Kode sampel saya sebagai berikut. Dalam contoh berikut, jika metode webServiceCall mengembalikan true di iterasi pertama itu sendiri, streaming tidak beralih lebih jauh seperti yang kita sebut anyMatch(). Saya percaya, inilah yang Anda cari.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}
Chinmay Phadke
sumber
0

Jika Anda tahu persis jumlah repitisi yang akan dilakukan, Anda bisa melakukannya

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);
Dilip Tharoor
sumber
1
Meskipun ini mungkin menjawab pertanyaan penulis, tidak ada beberapa kata yang menjelaskan dan tautan ke dokumentasi. Cuplikan kode mentah tidak sangat membantu tanpa frasa di sekitarnya. Anda juga dapat menemukan cara menulis jawaban yang baik sangat membantu. Harap edit jawaban Anda.
berteriak
0
    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

alih-alih puncak, Anda dapat menggunakan mapToObj untuk mengembalikan objek atau pesan akhir

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);
Oleksandr Potomkin
sumber
-2

Jika Anda memiliki masalah yang berbeda, solusi yang berbeda mungkin diperlukan tetapi untuk masalah Anda saat ini, saya hanya akan pergi dengan:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);
krmanish007
sumber
-2

Mungkin sedikit keluar dari topik tetapi ini yang kita miliki List<T>alih - alihStream<T> .

Pertama, Anda perlu memiliki takemetode util. Metode ini mengambil nelemen pertama :

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

itu hanya berfungsi seperti scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

sekarang akan cukup mudah untuk menulis takeWhilemetode berdasarkantake

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

kerjanya seperti ini:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

implementasi ini mengulangi daftar sebagian untuk beberapa kali tetapi tidak menambah O(n^2)operasi. Harapan itu bisa diterima.

Maks
sumber
-3

Saya punya solusi cepat lain dengan menerapkan ini (yang sebenarnya tidak bersih, tetapi Anda tahu):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}
pengguna2504380
sumber
2
Anda mengevaluasi seluruh aliran di muka! Dan jika currenttidak .equals(e), Anda akan mendapatkan loop tanpa akhir. Keduanya bahkan jika Anda kemudian menerapkan mis .limit(1). Itu jauh lebih buruk daripada 'najis' .
charlie
-3

Ini adalah upaya saya hanya menggunakan perpustakaan Java Stream.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();
climbing_bum
sumber
The filterpredikat seharusnya tanpa kewarganegaraan. System.out.printlnadalah efek samping.
Radiodef