Java 8 Stream dengan pemrosesan batch

101

Saya memiliki file besar yang berisi daftar item.

Saya ingin membuat sekumpulan item, membuat permintaan HTTP dengan batch ini (semua item diperlukan sebagai parameter dalam permintaan HTTP). Saya dapat melakukannya dengan sangat mudah dengan forloop, tetapi sebagai pecinta Java 8, saya ingin mencoba menulis ini dengan framework Stream Java 8 (dan menuai manfaat dari pemrosesan yang lambat).

Contoh:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Saya ingin melakukan sesuatu yang antrean panjang lazyFileStream.group(500).map(processBatch).collect(toList())

apa jalan terbaik melakukan ini?

Andy Dang
sumber
Saya tidak tahu bagaimana melakukan pengelompokan, maaf, tetapi Files # lines akan dengan malas membaca konten file.
Toby
1
jadi pada dasarnya Anda memerlukan kebalikan dari flatMap(+ flatMap tambahan untuk menciutkan aliran lagi)? Saya tidak berpikir sesuatu seperti itu ada sebagai metode yang nyaman di perpustakaan standar. Entah Anda harus mencari lib pihak ketiga atau menulis lib Anda sendiri berdasarkan pemisah dan / atau kolektor yang memancarkan aliran sungai
the8472
3
Mungkin Anda dapat menggabungkan Stream.generatedengan reader::readLinedan limit, tetapi masalahnya adalah streaming tidak cocok dengan Pengecualian. Juga, ini mungkin tidak bisa diparalelkan dengan baik. Saya pikir forloop masih merupakan opsi terbaik.
tobias_k
Saya baru saja menambahkan kode contoh. Saya tidak berpikir flatMap adalah cara yang tepat. Mencurigai bahwa saya mungkin harus menulis Spliterator khusus
Andy Dang
1
Saya menciptakan istilah "Penyalahgunaan aliran" untuk pertanyaan seperti ini.
kervin

Jawaban:

13

Catatan! Solusi ini membaca seluruh file sebelum menjalankan forEach.

Anda dapat melakukannya dengan jOOλ , pustaka yang memperluas aliran Java 8 untuk kasus penggunaan aliran beralur tunggal dan sekuensial:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Di balik layar, zipWithIndex()hanya:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... sedangkan groupBy()kenyamanan API untuk:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Penafian: Saya bekerja untuk perusahaan di belakang jOOλ)

Lukas Eder
sumber
Wow. Inilah PERSIS yang saya cari. Sistem kami biasanya memproses aliran data secara berurutan sehingga ini akan cocok untuk dipindahkan ke Java 8.
Andy Dang
16
Perhatikan bahwa solusi ini tidak perlu menyimpan seluruh aliran input ke perantara Map(tidak seperti, misalnya, solusi Ben Manes)
Tagir Valeev
129

Untuk kelengkapannya berikut solusinya Jambu Biji .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

Dalam pertanyaan, koleksi tersedia sehingga aliran tidak diperlukan dan dapat ditulis sebagai,

Iterables.partition(data, batchSize).forEach(this::process);
Ben Manes
sumber
11
Lists.partitionadalah variasi lain yang seharusnya saya sebutkan.
Ben Manes
2
ini malas, kan? itu tidak akan memanggil keseluruhan Streamke memori sebelum memproses batch yang relevan
orirab
1
@orirab ya. Ini malas antar batch, karena di dalamnya akan mengkonsumsi batchSizeelemen per iterasi.
Ben Manes
Bisakah Anda melihat stackoverflow.com/questions/58666190/…
gstackoverflow
62

Implementasi Pure Java-8 juga dimungkinkan:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Perhatikan bahwa tidak seperti JOOl, ini dapat bekerja dengan baik secara paralel (asalkan Anda dataadalah daftar akses acak).

Tagir Valeev
sumber
1
bagaimana jika data Anda sebenarnya adalah aliran? (katakanlah baris dalam file, atau bahkan dari jaringan).
Omry Yadan
7
@OmryYadan, pertanyaannya adalah tentang mendapatkan masukan dari List(lihat data.size(), data.get()di pertanyaan). Saya menjawab pertanyaan yang diajukan. Jika Anda memiliki pertanyaan lain, tanyakan saja (meskipun saya pikir pertanyaan aliran juga sudah ditanyakan).
Tagir Valeev
1
Bagaimana cara memproses batch secara paralel?
soup_boy
38

Solusi Java 8 murni :

Kita dapat membuat kolektor khusus untuk melakukan ini dengan elegan, yang memerlukan a batch sizedan a Consumeruntuk memproses setiap kelompok:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Secara opsional, buat kelas utilitas pembantu:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Contoh penggunaan:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Saya telah memposting kode saya di GitHub juga, jika ada yang ingin melihatnya:

Tautan ke Github

rohitvats
sumber
1
Ini adalah solusi yang baik, kecuali Anda tidak dapat memasukkan semua elemen dari aliran Anda ke dalam memori. Juga tidak akan bekerja pada aliran tanpa akhir - metode kumpulkan adalah terminal, yang berarti bahwa daripada menghasilkan aliran kumpulan, ia akan menunggu sampai aliran selesai, dan kemudian memproses hasilnya dalam kumpulan.
Alex Ackerman
2
@AlexAckerman, aliran tak terbatas berarti finisher tidak pernah dipanggil, tetapi akumulator akan tetap dipanggil sehingga item masih akan diproses. Selain itu, ini hanya membutuhkan ukuran kumpulan item untuk berada di memori pada satu waktu.
Solubris
@Solubris, kamu benar! Saya buruk, terima kasih telah menunjukkan hal ini - saya tidak akan menghapus komentar untuk referensi, jika seseorang memiliki gagasan yang sama tentang cara kerja metode kumpulkan.
Alex Ackerman
Daftar yang dikirim ke konsumen harus disalin agar modifikasi aman, misalnya: batchProcessor.accept (copyOf (ts))
Solubris
19

Saya menulis Spliterator khusus untuk skenario seperti ini. Ini akan mengisi daftar ukuran tertentu dari Arus input. Keuntungan dari pendekatan ini adalah ia akan melakukan pemrosesan yang lambat, dan akan bekerja dengan fungsi aliran lainnya.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Bruce Hamilton
sumber
sangat membantu. Jika seseorang ingin melakukan batch pada beberapa kriteria kustom (misalnya ukuran koleksi dalam byte), maka Anda dapat mendelegasikan predikat kustom Anda dan menggunakannya dalam for-loop sebagai syarat (imho while loop akan lebih mudah dibaca)
tolong
Saya tidak yakin penerapannya benar. Misalnya, jika aliran dasar adalah SUBSIZEDpemisahan yang dikembalikan dari trySplitdapat memiliki lebih banyak item daripada sebelum pemisahan (jika pemisahan terjadi di tengah kelompok).
Malt
@ Garam jika pemahaman saya Spliteratorsbenar, maka trySplitharus selalu mempartisi data menjadi dua bagian yang kira-kira sama sehingga hasilnya tidak boleh lebih besar dari aslinya?
Bruce Hamilton
@BruceHamilton Sayangnya, menurut dokumen, bagian-bagiannya kurang lebih sama. Mereka harus setara:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Malt
Ya, itu sesuai dengan pemahaman saya tentang pemisahan Spliterator. Namun, saya kesulitan memahami bagaimana "pemisahan yang dikembalikan dari trySplit dapat memiliki lebih banyak item daripada sebelum pemisahan", dapatkah Anda menjelaskan apa yang Anda maksud di sana?
Bruce Hamilton
14

Kami memiliki masalah serupa untuk dipecahkan. Kami ingin mengambil aliran yang lebih besar dari memori sistem (melakukan iterasi melalui semua objek dalam database) dan mengacak urutannya sebaik mungkin - kami pikir tidak masalah untuk menyangga 10.000 item dan mengacaknya.

Targetnya adalah fungsi yang mengalir.

Dari solusi yang diusulkan di sini, tampaknya ada serangkaian opsi:

  • Gunakan berbagai pustaka tambahan non-java 8
  • Mulailah dengan sesuatu yang bukan aliran - misalnya daftar akses acak
  • Memiliki aliran yang dapat dipisahkan dengan mudah dalam spliterator

Naluri kami awalnya menggunakan kolektor khusus, tetapi ini berarti berhenti streaming. Solusi kolektor khusus di atas sangat bagus dan kami hampir menggunakannya.

Berikut adalah solusi yang menipu dengan menggunakan fakta bahwa Streams dapat memberi Anda Iteratoryang dapat Anda gunakan sebagai jalan keluar untuk membiarkan Anda melakukan sesuatu yang ekstra yang tidak didukung oleh aliran. Itu Iteratordiubah kembali ke aliran menggunakan sedikit StreamSupportsihir Java 8 lainnya .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Contoh sederhana penggunaan ini akan terlihat seperti ini:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Cetakan di atas

[A, B, C]
[D, E, F]

Untuk kasus penggunaan kami, kami ingin mengocok kumpulan dan kemudian menyimpannya sebagai aliran - terlihat seperti ini:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Ini menghasilkan sesuatu seperti (ini diacak, sangat berbeda setiap saat)

A
C
B
E
D
F

Saus rahasianya di sini adalah selalu ada aliran, sehingga Anda dapat mengoperasikan aliran kumpulan, atau melakukan sesuatu untuk setiap kumpulan dan kemudian flatMapkembali ke aliran. Lebih baik lagi, semua hal di atas hanya berjalan sebagai ekspresi akhir forEachatau collectatau ekspresi penghentian lainnya PULL data melalui aliran.

Ternyata itu iteratoradalah jenis operasi pengakhiran khusus pada aliran dan tidak menyebabkan seluruh aliran berjalan dan masuk ke memori! Terima kasih kepada orang-orang Java 8 untuk desain yang brilian!

Ashley Frieze
sumber
Dan sangat baik jika Anda melakukan iterasi penuh pada setiap batch ketika dikumpulkan dan bertahan ke List—Anda tidak dapat menunda iterasi elemen dalam-batch karena konsumen mungkin ingin melewati seluruh batch, dan jika Anda tidak menggunakan elemen maka mereka tidak akan melompat terlalu jauh. (Saya telah menerapkan salah satunya di C #, meskipun itu jauh lebih mudah.)
ErikE
9

Anda juga dapat menggunakan RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

atau

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

atau

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
frhack
sumber
8

Anda juga bisa melihat cyclops-react , saya adalah penulis perpustakaan ini. Ini mengimplementasikan antarmuka jOOλ (dan dengan ekstensi JDK 8 Streams), tetapi tidak seperti Streaming Paralel JDK 8, ia memiliki fokus pada operasi Asynchronous (seperti berpotensi memblokir panggilan Async I / O). JDK Parallel Streams, sebaliknya berfokus pada paralelisme data untuk operasi terikat CPU. Ia bekerja dengan mengelola kumpulan tugas berbasis Masa Depan di bawah tenda, tetapi menyajikan API Streaming standar yang diperluas untuk pengguna akhir.

Kode contoh ini dapat membantu Anda memulai

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Ada tutorial tentang pengelompokan di sini

Dan Tutorial yang lebih umum di sini

Untuk menggunakan Thread Pool Anda sendiri (yang mungkin lebih sesuai untuk memblokir I / O), Anda dapat mulai memproses dengan

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
John McClean
sumber
3

Contoh Pure Java 8 yang bekerja dengan aliran paralel juga.

Cara Penggunaan:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

Deklarasi metode dan implementasi:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Nicolas Lacombe
sumber
2

Dalam semua keadilan, lihat solusi Vavr yang elegan :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
sumber
1

Contoh sederhana menggunakan Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Jawaban Bruce lebih komprehensif, tapi saya sedang mencari sesuatu yang cepat dan kotor untuk memproses banyak file.

rhinmass
sumber
1

ini adalah solusi java murni yang dievaluasi dengan malas.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Hei
sumber
1

Anda dapat menggunakan apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

Bagian pemartisian dilakukan dengan malas tetapi setelah daftar dipartisi, Anda mendapatkan keuntungan dari bekerja dengan aliran (misalnya, menggunakan aliran paralel, menambahkan filter, dll.). Jawaban lain menyarankan solusi yang lebih rumit tetapi terkadang keterbacaan dan pemeliharaan lebih penting (dan terkadang tidak :-))

Tal Joffe
sumber
Tidak yakin siapa yang memberikan suara negatif tetapi alangkah baiknya untuk memahami mengapa .. Saya memberikan jawaban yang melengkapi jawaban lain untuk orang yang tidak dapat menggunakan Jambu biji
Tal Joffe
Anda sedang memproses daftar di sini, bukan aliran.
Drakemor
@Drakemor Saya sedang memproses aliran sub-daftar. perhatikan panggilan fungsi stream ()
Tal Joffe
Tapi pertama-tama Anda mengubahnya menjadi daftar sub-daftar, yang tidak akan berfungsi dengan benar untuk data streaming yang sebenarnya . Berikut adalah referensi untuk partisi: commons.apache.org/proper/commons-collections/apidocs/org/…
Drakemor
1
TBH Saya tidak sepenuhnya mendapatkan argumen Anda tetapi saya kira kita bisa setuju untuk tidak setuju. Saya telah mengedit jawaban saya untuk mencerminkan percakapan kita di sini. Terima kasih untuk diskusinya
Tal Joffe
1

Ini dapat dengan mudah dilakukan dengan menggunakan Reaktor :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
sumber
0

Dengan Java 8dan com.google.common.collect.Lists, Anda dapat melakukan sesuatu seperti:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Di sini Tadalah jenis item dalam daftar input dan Ujenis item dalam daftar output

Dan Anda bisa menggunakannya seperti ini:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
josebui
sumber