Foreach paralel dengan lambda asinkron

138

Saya ingin menangani koleksi secara paralel, tetapi saya kesulitan mengimplementasikannya dan oleh karena itu saya berharap bantuan.

Masalah muncul jika saya ingin memanggil metode bertanda async dalam C #, dalam lambda dari loop paralel. Sebagai contoh:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Masalah terjadi dengan jumlah menjadi 0, karena semua utas dibuat secara efektif hanya utas latar belakang dan Parallel.ForEachpanggilan tidak menunggu penyelesaian. Jika saya menghapus kata kunci async, metodenya terlihat seperti ini:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Ini bekerja, tetapi itu benar-benar menonaktifkan kepintaran menunggu dan saya harus melakukan beberapa penanganan pengecualian manual .. (Dihapus untuk singkatnya).

Bagaimana saya bisa menerapkan Parallel.ForEachperulangan, yang menggunakan kata kunci tunggu dalam lambda? Apa itu mungkin?

Prototipe metode Parallel.ForEach mengambil Action<T>parameter sebagai, tapi saya ingin menunggu lambda asinkron saya.

clausndk
sumber
1
Saya berasumsi Anda bermaksud menghapus awaitdari await GetData(item)dalam blok kode kedua Anda karena akan menghasilkan kesalahan kompilasi apa adanya.
Josh M.
2
Kemungkinan rangkap dari Nesting menunggu dalam Paralel.
FOREach

Jawaban:

186

Jika Anda hanya menginginkan paralelisme sederhana, Anda dapat melakukan ini:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Jika Anda membutuhkan sesuatu yang lebih kompleks, lihat posting Stephen ToubForEachAsync .

Stephen Cleary
sumber
46
Mungkin diperlukan mekanisme pelambatan. Ini akan segera membuat banyak tugas karena ada item yang mungkin berakhir pada permintaan jaringan 10k dan semacamnya.
usr
10
@ usr Contoh terakhir dalam artikel Stephen Toub membahas itu.
svick
@vick Saya bingung tentang sampel terakhir itu. Tampaknya bagi saya itu hanya batch banyak tugas untuk membuat lebih banyak tugas bagi saya, tetapi mereka semua memulai secara massal.
Luke Puplett
2
@LukePuplett Ini membuat doptugas dan masing-masing dari mereka kemudian memproses beberapa bagian dari kumpulan input secara seri.
svick
4
@Afshin_Zavvar: Jika Anda menelepon Task.Runtanpa awaithasil, maka itu hanya melempar api-dan-lupa bekerja ke kolam utas. Itu hampir selalu merupakan kesalahan.
Stephen Cleary
74

Anda dapat menggunakan ParallelForEachAsyncmetode ekstensi dari Paket NuGet AsyncEnumerator :

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Serge Semenov
sumber
1
Ini paket Anda? Saya telah melihat Anda memposting ini di beberapa tempat sekarang? : D Oh, tunggu .. namamu ada di paket: D +1
Piotr Kula
17
@pumkin, ya, itu milikku. Saya telah melihat masalah ini berulang-ulang, jadi saya memutuskan untuk menyelesaikannya dengan cara sesederhana mungkin dan membebaskan orang lain dari kesulitan juga :)
Serge Semenov
Terima kasih .. itu pasti masuk akal dan membantu saya keluar waktu besar!
Piotr Kula
2
Anda memiliki salah ketik: maxDegreeOfParallelism>maxDegreeOfParalellism
Shiran Dror
3
Ejaan yang benar memang maxDegreeOfParallelism, namun ada sesuatu dalam komentar @ ShiranDror - dalam paket Anda Anda menyebut variabel maxDegreeOfParalellism karena kesalahan (dan karena itu kode kutip Anda tidak akan dikompilasi sampai Anda mengubahnya ..)
BornToCode
17

Dengan SemaphoreSlimAnda dapat mencapai kontrol paralelisme.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  try
  {
     await throttler.WaitAsync();
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
Felipe l
sumber
3

Implementasi ringan saya dari ParallelForEach async.

Fitur:

  1. Throttling (tingkat paralelisme maksimum).
  2. Penanganan pengecualian (pengecualian agregasi akan dilakukan saat selesai).
  3. Memori efisien (tidak perlu menyimpan daftar tugas).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.SetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Contoh penggunaan:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
nicolay.anykienko
sumber
2

Saya telah membuat metode ekstensi untuk ini yang memanfaatkan SemaphoreSlim dan juga memungkinkan untuk mengatur tingkat paralelisme maksimum

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Penggunaan sampel:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
sumber
'menggunakan' tidak akan membantu. foreach loop akan menunggu semaphone tanpa batas. Coba saja kode sederhana ini yang mereproduksi masalah: tunggu Enumerable.Range (1, 4) .ForEachAsyncConcurrent (async (i) => {Console.WriteLine (i); melempar Pengecualian baru ("pengecualian tes");}, maxDegreeOfParallelism: 2);
nicolay.anykienko
@ nicolay.anykienko Anda benar tentang # 2. Masalah memori itu dapat diatasi dengan menambahkan taskWithThrottler.RemoveAll (x => x.IsCompleted);
askids
1
Saya sudah mencobanya dalam kode saya dan jika saya maxDegreeOfParallelism bukan null kode kebuntuan. Di sini Anda dapat melihat semua kode untuk direproduksi: stackoverflow.com/questions/58793118/...
Massimo Savazzi