Nesting menunggu dalam Paralel. FOREach

183

Dalam aplikasi metro, saya perlu melakukan sejumlah panggilan WCF. Ada sejumlah besar panggilan yang harus dibuat, jadi saya harus melakukannya secara paralel. Masalahnya adalah bahwa loop paralel keluar sebelum panggilan WCF semuanya selesai.

Bagaimana Anda menolak ini agar berfungsi seperti yang diharapkan?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Darthg8r
sumber

Jawaban:

172

Seluruh ide di belakang Parallel.ForEach()adalah bahwa Anda memiliki satu set utas dan setiap utas memproses bagian dari koleksi. Seperti yang Anda perhatikan, ini tidak berfungsi async- await, di mana Anda ingin melepaskan utas selama panggilan async.

Anda bisa "memperbaikinya" dengan memblokir ForEach()utas, tetapi itu mengalahkan seluruh titik async- await.

Apa yang Anda bisa lakukan adalah dengan menggunakan TPL Dataflow bukan Parallel.ForEach(), yang mendukung asynchronous Taskdengan baik.

Secara khusus, kode Anda dapat ditulis menggunakan TransformBlockyang mengubah setiap id menjadi Customermenggunakan asynclambda. Blok ini dapat dikonfigurasi untuk dieksekusi secara paralel. Anda akan menautkan blok itu ke ActionBlockyang menulis masing Customer- masing ke konsol. Setelah Anda mengatur jaringan blokir, Anda dapat Post()setiap id ke TransformBlock.

Dalam kode:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Meskipun Anda mungkin ingin membatasi paralelisme TransformBlockke beberapa konstanta kecil. Selain itu, Anda dapat membatasi kapasitas TransformBlockdan menambahkan item ke asynchronous menggunakan SendAsync(), misalnya jika koleksi terlalu besar.

Sebagai manfaat tambahan bila dibandingkan dengan kode Anda (jika berhasil) adalah bahwa penulisan akan dimulai segera setelah satu item selesai, dan tidak menunggu sampai semua pemrosesan selesai.

svick
sumber
2
Tinjauan singkat async, ekstensi reaktif, TPL dan TPL DataFlow - vantsuyoshi.wordpress.com/2012/01/05/... bagi mereka seperti saya yang mungkin perlu kejelasan.
Norman H
1
Saya cukup yakin jawaban ini TIDAK memparalelkan pemrosesan. Saya yakin Anda perlu melakukan Paralel. Demi mencapai id dan mempostingnya ke getCustomerBlock. Setidaknya itulah yang saya temukan ketika saya menguji saran ini.
JasonLind
4
@JasonLind Benar-benar melakukannya. Menggunakan Parallel.ForEach()untuk Post()item secara paralel seharusnya tidak memiliki efek nyata.
svick
1
@svick Ok saya menemukannya, The ActionBlock juga harus paralel. Saya melakukannya sedikit berbeda, saya tidak memerlukan transformasi jadi saya hanya menggunakan bufferblock dan melakukan pekerjaan saya di ActionBlock. Saya bingung dari jawaban lain pada jalinan.
JasonLind
2
Maksud saya menentukan MaxDegreeOfParallelism pada ActionBlock seperti yang Anda lakukan pada TransformBlock dalam contoh Anda
JasonLind
125

Jawaban svick adalah (seperti biasa) sangat baik.

Namun, saya menemukan Dataflow lebih bermanfaat ketika Anda benar-benar memiliki sejumlah besar data untuk ditransfer. Atau ketika Anda membutuhkan asyncantrian yang kompatibel.

Dalam kasus Anda, solusi yang lebih sederhana adalah dengan menggunakan asyncparalelisme-gaya:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Stephen Cleary
sumber
13
Jika Anda ingin membatasi paralelisme secara manual (yang kemungkinan besar Anda lakukan dalam kasus ini), melakukannya dengan cara ini akan lebih rumit.
svick
1
Tapi Anda benar bahwa Dataflow bisa sangat rumit (misalnya bila dibandingkan dengan Parallel.ForEach()). Tapi saya pikir saat ini pilihan terbaik untuk melakukan hampir semua asyncpekerjaan dengan koleksi.
svick
1
@ JamesManning bagaimana cara ParallelOptionsmembantu? Ini hanya berlaku untuk Parallel.For/ForEach/Invoke, yang mana OP didirikan tidak ada gunanya di sini.
Ohad Schneider
1
@StephenCleary Jika GetCustomermetode ini mengembalikan Task<T>, Haruskah orang menggunakan Select(async i => { await repo.GetCustomer(i);});?
Shyju
5
@ Batmaci: Parallel.ForEachtidak mendukung async.
Stephen Cleary
81

Menggunakan DataFlow seperti yang disarankan svick mungkin berlebihan, dan jawaban Stephen tidak menyediakan sarana untuk mengontrol konkurensi operasi. Namun, itu dapat dicapai lebih sederhana:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

The ToArray()panggilan dapat dioptimalkan dengan menggunakan sebuah array bukannya daftar dan menggantikan tugas selesai, tapi aku ragu itu akan membuat banyak perbedaan dalam skenario yang paling. Contoh penggunaan per pertanyaan OP:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT Fellow SO pengguna dan TPL ahli Eli Arbel menunjuk saya ke artikel terkait dari Stephen Toub . Seperti biasa, implementasinya elegan dan efisien:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}
Ohad Schneider
sumber
1
@RichardPierre sebenarnya kelebihan ini Partitioner.Createmenggunakan partisi chunk, yang menyediakan elemen secara dinamis untuk tugas yang berbeda sehingga skenario yang Anda jelaskan tidak akan terjadi. Perhatikan juga bahwa partisi statis (ditentukan sebelumnya) mungkin lebih cepat dalam beberapa kasus karena overhead yang lebih sedikit (khususnya sinkronisasi). Untuk informasi lebih lanjut, lihat: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .
Ohad Schneider
1
@OhadSchneider Dalam // amati pengecualian, jika itu melempar pengecualian, apakah akan muncul ke pemanggil? Sebagai contoh, jika saya ingin seluruh enumerable untuk berhenti memproses / gagal jika ada bagian yang gagal?
Terry
3
@Terry itu akan muncul ke pemanggil dalam arti bahwa tugas paling atas (dibuat oleh Task.WhenAll) akan berisi pengecualian (di dalam sebuah AggregateException), dan akibatnya jika kata pemanggil digunakan await, pengecualian akan dilemparkan ke dalam situs panggilan. Namun, Task.WhenAllmasih akan menunggu semua tugas untuk diselesaikan, dan GetPartitionsakan secara dinamis mengalokasikan elemen ketika partition.MoveNextdipanggil sampai tidak ada lagi elemen yang tersisa untuk diproses. Ini berarti bahwa kecuali Anda menambahkan mekanisme Anda sendiri untuk menghentikan pemrosesan (mis. CancellationToken) Itu tidak akan terjadi dengan sendirinya.
Ohad Schneider
1
@ gibbocool Saya masih tidak yakin saya mengikuti. Misalkan Anda memiliki total 7 tugas, dengan parameter yang Anda tentukan dalam komentar Anda. Lebih lanjut anggap bahwa batch pertama mengambil tugas sesekali 5 detik, dan tiga tugas 1 detik. Setelah sekitar satu detik, tugas 5 detik masih akan dieksekusi sedangkan tiga tugas 1 detik akan selesai. Pada titik ini, sisa tiga tugas 1 detik akan mulai dijalankan (tugas tersebut akan diberikan oleh partisi ke tiga utas "bebas").
Ohad Schneider
2
@MichaelFreidgeim Anda dapat melakukan sesuatu seperti var current = partition.Currentsebelumnya await bodydan kemudian gunakan currentdalam lanjutan ( ContinueWith(t => { ... }).
Ohad Schneider
43

Anda dapat menghemat upaya dengan Paket NuGet AsyncEnumerator baru , yang tidak ada 4 tahun yang lalu ketika pertanyaan awalnya diposting. Ini memungkinkan Anda untuk mengontrol tingkat paralelisme:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Penafian: Saya penulis perpustakaan AsyncEnumerator, yang merupakan open source dan berlisensi di bawah MIT, dan saya memposting pesan ini hanya untuk membantu komunitas.

Serge Semenov
sumber
11
Sergey, Anda harus mengungkapkan bahwa Anda adalah seorang penulis perpustakaan
Michael Freidgeim
5
ok, tambah disclaimer. Saya tidak mencari keuntungan dari mengiklankannya, hanya ingin membantu orang;)
Serge Semenov
Perpustakaan Anda tidak kompatibel dengan .NET Core.
Corniel Nobel
2
@CornielNobel, ini kompatibel dengan .NET Core - kode sumber pada GitHub memiliki cakupan uji untuk .NET Framework dan .NET Core.
Serge Semenov
1
@SergeSemenov Saya telah menggunakan banyak perpustakaan Anda untuk itu AsyncStreamsdan saya harus mengatakan itu sangat baik. Tidak cukup merekomendasikan perpustakaan ini.
WBuck
16

Bungkus Parallel.Foreachmenjadi Task.Run()dan bukannya awaitpenggunaan kata kunci[yourasyncmethod].Result

(Anda perlu melakukan hal Task.Run untuk tidak memblokir utas UI)

Sesuatu seperti ini:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
ofcoursedude
sumber
3
Apa masalahnya dengan ini? Saya akan melakukannya persis seperti ini. Mari Parallel.ForEachlakukan pekerjaan paralel, yang memblokir sampai semua selesai, lalu dorong semuanya ke latar belakang untuk memiliki UI responsif. Ada masalah dengan itu? Mungkin itu terlalu banyak utas tidur, tapi ini kode pendek dan mudah dibaca.
ygoe
@LonelyPixel Satu-satunya masalah saya adalah ia memanggil Task.Runkapan TaskCompletionSourcelebih disukai.
Gusdor
1
@ Goddor Penasaran - mengapa TaskCompletionSourcelebih disukai?
Seafish
@ Seafish Sebuah pertanyaan bagus yang saya harap bisa saya jawab. Pasti hari yang berat: D
Gusdor
Hanya pembaruan singkat. Saya sedang mencari persis ini sekarang, gulir ke bawah untuk menemukan solusi paling sederhana dan menemukan komentar saya sendiri lagi. Saya menggunakan persis kode ini dan berfungsi seperti yang diharapkan. Ini hanya mengasumsikan bahwa ada versi Sync dari panggilan Async asli dalam loop. awaitdapat dipindahkan di depan untuk menyimpan nama variabel tambahan.
ygoe
7

Ini harusnya sangat efisien, dan lebih mudah daripada membuat seluruh TPL Dataflow berfungsi:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
John Gietzen
sumber
Bukankah seharusnya contoh penggunaan menggunakan awaitseperti var customers = await ids.SelectAsync(async i => { ... });:?
Paccc
5

Saya sedikit terlambat ke pesta tetapi Anda mungkin ingin mempertimbangkan untuk menggunakan GetAwaiter.GetResult () untuk menjalankan kode async Anda dalam konteks sinkronisasi tetapi sejajar seperti di bawah ini;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
Teoman shipahi
sumber
5

Metode ekstensi untuk ini yang memanfaatkan SemaphoreSlim dan juga memungkinkan untuk mengatur tingkat paralelisme maksimum

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Penggunaan sampel:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
sumber
5

Setelah memperkenalkan banyak metode pembantu, Anda akan dapat menjalankan kueri paralel dengan sintaks sederhana ini:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

Apa yang terjadi di sini adalah: kami membagi kumpulan sumber menjadi 10 chunks ( .Split(DegreeOfParallelism)), kemudian menjalankan 10 tugas masing-masing memproses itemnya satu per satu ( .SelectManyAsync(...)) dan menggabungkannya kembali ke dalam satu daftar.

Layak disebutkan ada pendekatan yang lebih sederhana:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Tetapi perlu tindakan pencegahan : jika Anda memiliki koleksi sumber yang terlalu besar, itu akan menjadwalkan Taskuntuk setiap item segera, yang dapat menyebabkan hit kinerja yang signifikan.

Metode ekstensi yang digunakan dalam contoh di atas terlihat sebagai berikut:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
Vitaliy Ulantikov
sumber