Saya memiliki file besar yang berisi daftar item.
Saya ingin membuat sekumpulan item, membuat permintaan HTTP dengan batch ini (semua item diperlukan sebagai parameter dalam permintaan HTTP). Saya dapat melakukannya dengan sangat mudah dengan for
loop, tetapi sebagai pecinta Java 8, saya ingin mencoba menulis ini dengan framework Stream Java 8 (dan menuai manfaat dari pemrosesan yang lambat).
Contoh:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Saya ingin melakukan sesuatu yang antrean panjang
lazyFileStream.group(500).map(processBatch).collect(toList())
apa jalan terbaik melakukan ini?
java
java-8
batch-processing
java-stream
Andy Dang
sumber
sumber
flatMap
(+ flatMap tambahan untuk menciutkan aliran lagi)? Saya tidak berpikir sesuatu seperti itu ada sebagai metode yang nyaman di perpustakaan standar. Entah Anda harus mencari lib pihak ketiga atau menulis lib Anda sendiri berdasarkan pemisah dan / atau kolektor yang memancarkan aliran sungaiStream.generate
denganreader::readLine
danlimit
, tetapi masalahnya adalah streaming tidak cocok dengan Pengecualian. Juga, ini mungkin tidak bisa diparalelkan dengan baik. Saya pikirfor
loop masih merupakan opsi terbaik.Jawaban:
Catatan! Solusi ini membaca seluruh file sebelum menjalankan forEach.
Anda dapat melakukannya dengan jOOλ , pustaka yang memperluas aliran Java 8 untuk kasus penggunaan aliran beralur tunggal dan sekuensial:
Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); });
Di balik layar,
zipWithIndex()
hanya:static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); }
... sedangkan
groupBy()
kenyamanan API untuk:default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); }
(Penafian: Saya bekerja untuk perusahaan di belakang jOOλ)
sumber
Map
(tidak seperti, misalnya, solusi Ben Manes)Untuk kelengkapannya berikut solusinya Jambu Biji .
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
Dalam pertanyaan, koleksi tersedia sehingga aliran tidak diperlukan dan dapat ditulis sebagai,
Iterables.partition(data, batchSize).forEach(this::process);
sumber
Lists.partition
adalah variasi lain yang seharusnya saya sebutkan.Stream
ke memori sebelum memproses batch yang relevanbatchSize
elemen per iterasi.Implementasi Pure Java-8 juga dimungkinkan:
int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch));
Perhatikan bahwa tidak seperti JOOl, ini dapat bekerja dengan baik secara paralel (asalkan Anda
data
adalah daftar akses acak).sumber
List
(lihatdata.size()
,data.get()
di pertanyaan). Saya menjawab pertanyaan yang diajukan. Jika Anda memiliki pertanyaan lain, tanyakan saja (meskipun saya pikir pertanyaan aliran juga sudah ditanyakan).Solusi Java 8 murni :
Kita dapat membuat kolektor khusus untuk melakukan ini dengan elegan, yang memerlukan a
batch size
dan aConsumer
untuk memproses setiap kelompok:import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } }
Secara opsional, buat kelas utilitas pembantu:
import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } }
Contoh penggunaan:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor));
Saya telah memposting kode saya di GitHub juga, jika ada yang ingin melihatnya:
Tautan ke Github
sumber
Saya menulis Spliterator khusus untuk skenario seperti ini. Ini akan mengisi daftar ukuran tertentu dari Arus input. Keuntungan dari pendekatan ini adalah ia akan melakukan pemrosesan yang lambat, dan akan bekerja dengan fungsi aliran lainnya.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } }
sumber
SUBSIZED
pemisahan yang dikembalikan daritrySplit
dapat memiliki lebih banyak item daripada sebelum pemisahan (jika pemisahan terjadi di tengah kelompok).Spliterators
benar, makatrySplit
harus selalu mempartisi data menjadi dua bagian yang kira-kira sama sehingga hasilnya tidak boleh lebih besar dari aslinya?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Kami memiliki masalah serupa untuk dipecahkan. Kami ingin mengambil aliran yang lebih besar dari memori sistem (melakukan iterasi melalui semua objek dalam database) dan mengacak urutannya sebaik mungkin - kami pikir tidak masalah untuk menyangga 10.000 item dan mengacaknya.
Targetnya adalah fungsi yang mengalir.
Dari solusi yang diusulkan di sini, tampaknya ada serangkaian opsi:
Naluri kami awalnya menggunakan kolektor khusus, tetapi ini berarti berhenti streaming. Solusi kolektor khusus di atas sangat bagus dan kami hampir menggunakannya.
Berikut adalah solusi yang menipu dengan menggunakan fakta bahwa
Stream
s dapat memberi AndaIterator
yang dapat Anda gunakan sebagai jalan keluar untuk membiarkan Anda melakukan sesuatu yang ekstra yang tidak didukung oleh aliran. ItuIterator
diubah kembali ke aliran menggunakan sedikitStreamSupport
sihir Java 8 lainnya ./** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } }
Contoh sederhana penggunaan ini akan terlihat seperti ini:
@Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); }
Cetakan di atas
Untuk kasus penggunaan kami, kami ingin mengocok kumpulan dan kemudian menyimpannya sebagai aliran - terlihat seperti ini:
@Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); }
Ini menghasilkan sesuatu seperti (ini diacak, sangat berbeda setiap saat)
Saus rahasianya di sini adalah selalu ada aliran, sehingga Anda dapat mengoperasikan aliran kumpulan, atau melakukan sesuatu untuk setiap kumpulan dan kemudian
flatMap
kembali ke aliran. Lebih baik lagi, semua hal di atas hanya berjalan sebagai ekspresi akhirforEach
ataucollect
atau ekspresi penghentian lainnya PULL data melalui aliran.Ternyata itu
iterator
adalah jenis operasi pengakhiran khusus pada aliran dan tidak menyebabkan seluruh aliran berjalan dan masuk ke memori! Terima kasih kepada orang-orang Java 8 untuk desain yang brilian!sumber
List
—Anda tidak dapat menunda iterasi elemen dalam-batch karena konsumen mungkin ingin melewati seluruh batch, dan jika Anda tidak menggunakan elemen maka mereka tidak akan melompat terlalu jauh. (Saya telah menerapkan salah satunya di C #, meskipun itu jauh lebih mudah.)Anda juga dapat menggunakan RxJava :
atau
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
atau
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
sumber
Anda juga bisa melihat cyclops-react , saya adalah penulis perpustakaan ini. Ini mengimplementasikan antarmuka jOOλ (dan dengan ekstensi JDK 8 Streams), tetapi tidak seperti Streaming Paralel JDK 8, ia memiliki fokus pada operasi Asynchronous (seperti berpotensi memblokir panggilan Async I / O). JDK Parallel Streams, sebaliknya berfokus pada paralelisme data untuk operasi terikat CPU. Ia bekerja dengan mengelola kumpulan tugas berbasis Masa Depan di bawah tenda, tetapi menyajikan API Streaming standar yang diperluas untuk pengguna akhir.
Kode contoh ini dapat membantu Anda memulai
LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
Ada tutorial tentang pengelompokan di sini
Dan Tutorial yang lebih umum di sini
Untuk menggunakan Thread Pool Anda sendiri (yang mungkin lebih sesuai untuk memblokir I / O), Anda dapat mulai memproses dengan
LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run();
sumber
Contoh Pure Java 8 yang bekerja dengan aliran paralel juga.
Cara Penggunaan:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed(); CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
Deklarasi metode dan implementasi:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor) { List<ElementType> newBatch = new ArrayList<>(batchSize); stream.forEach(element -> { List<ElementType> fullBatch; synchronized (newBatch) { if (newBatch.size() < batchSize) { newBatch.add(element); return; } else { fullBatch = new ArrayList<>(newBatch); newBatch.clear(); newBatch.add(element); } } batchProcessor.accept(fullBatch); }); if (newBatch.size() > 0) batchProcessor.accept(new ArrayList<>(newBatch)); }
sumber
Dalam semua keadilan, lihat solusi Vavr yang elegan :
Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
sumber
Contoh sederhana menggunakan Spliterator
// read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } }
Jawaban Bruce lebih komprehensif, tapi saya sedang mencari sesuatu yang cepat dan kotor untuk memproses banyak file.
sumber
ini adalah solusi java murni yang dievaluasi dengan malas.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){ List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable currentBatch.add(new ArrayList<T>(batchSize)); return Stream.concat(stream .sequential() .map(new Function<T, List<T>>(){ public List<T> apply(T t){ currentBatch.get(0).add(t); return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null; } }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0)) .limit(1) ).filter(Objects::nonNull); }
sumber
Anda dapat menggunakan apache.commons:
ListUtils.partition(ListOfLines, 500).stream() .map(partition -> processBatch(partition) .collect(Collectors.toList());
Bagian pemartisian dilakukan dengan malas tetapi setelah daftar dipartisi, Anda mendapatkan keuntungan dari bekerja dengan aliran (misalnya, menggunakan aliran paralel, menambahkan filter, dll.). Jawaban lain menyarankan solusi yang lebih rumit tetapi terkadang keterbacaan dan pemeliharaan lebih penting (dan terkadang tidak :-))
sumber
Ini dapat dengan mudah dilakukan dengan menggunakan Reaktor :
sumber
Dengan
Java 8
dancom.google.common.collect.Lists
, Anda dapat melakukan sesuatu seperti:public class BatchProcessingUtil { public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) { List<List<T>> batches = Lists.partition(data, batchSize); return batches.stream() .map(processFunction) // Send each batch to the process function .flatMap(Collection::stream) // flat results to gather them in 1 stream .collect(Collectors.toList()); } }
Di sini
T
adalah jenis item dalam daftar input danU
jenis item dalam daftar outputDan Anda bisa menggunakannya seperti ini:
List<String> userKeys = [... list of user keys] List<Users> users = BatchProcessingUtil.process( userKeys, 10, // Batch Size partialKeys -> service.getUsers(partialKeys) );
sumber