Seluruh ide di belakang Parallel.ForEach()
adalah bahwa Anda memiliki satu set utas dan setiap utas memproses bagian dari koleksi. Seperti yang Anda perhatikan, ini tidak berfungsi async
- await
, di mana Anda ingin melepaskan utas selama panggilan async.
Anda bisa "memperbaikinya" dengan memblokir ForEach()
utas, tetapi itu mengalahkan seluruh titik async
- await
.
Apa yang Anda bisa lakukan adalah dengan menggunakan TPL Dataflow bukan Parallel.ForEach()
, yang mendukung asynchronous Task
dengan baik.
Secara khusus, kode Anda dapat ditulis menggunakan TransformBlock
yang mengubah setiap id menjadi Customer
menggunakan async
lambda. Blok ini dapat dikonfigurasi untuk dieksekusi secara paralel. Anda akan menautkan blok itu ke ActionBlock
yang menulis masing Customer
- masing ke konsol. Setelah Anda mengatur jaringan blokir, Anda dapat Post()
setiap id ke TransformBlock
.
Dalam kode:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Meskipun Anda mungkin ingin membatasi paralelisme TransformBlock
ke beberapa konstanta kecil. Selain itu, Anda dapat membatasi kapasitas TransformBlock
dan menambahkan item ke asynchronous menggunakan SendAsync()
, misalnya jika koleksi terlalu besar.
Sebagai manfaat tambahan bila dibandingkan dengan kode Anda (jika berhasil) adalah bahwa penulisan akan dimulai segera setelah satu item selesai, dan tidak menunggu sampai semua pemrosesan selesai.
Parallel.ForEach()
untukPost()
item secara paralel seharusnya tidak memiliki efek nyata.Jawaban svick adalah (seperti biasa) sangat baik.
Namun, saya menemukan Dataflow lebih bermanfaat ketika Anda benar-benar memiliki sejumlah besar data untuk ditransfer. Atau ketika Anda membutuhkan
async
antrian yang kompatibel.Dalam kasus Anda, solusi yang lebih sederhana adalah dengan menggunakan
async
paralelisme-gaya:sumber
Parallel.ForEach()
). Tapi saya pikir saat ini pilihan terbaik untuk melakukan hampir semuaasync
pekerjaan dengan koleksi.ParallelOptions
membantu? Ini hanya berlaku untukParallel.For/ForEach/Invoke
, yang mana OP didirikan tidak ada gunanya di sini.GetCustomer
metode ini mengembalikanTask<T>
, Haruskah orang menggunakanSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
tidak mendukungasync
.Menggunakan DataFlow seperti yang disarankan svick mungkin berlebihan, dan jawaban Stephen tidak menyediakan sarana untuk mengontrol konkurensi operasi. Namun, itu dapat dicapai lebih sederhana:
The
ToArray()
panggilan dapat dioptimalkan dengan menggunakan sebuah array bukannya daftar dan menggantikan tugas selesai, tapi aku ragu itu akan membuat banyak perbedaan dalam skenario yang paling. Contoh penggunaan per pertanyaan OP:EDIT Fellow SO pengguna dan TPL ahli Eli Arbel menunjuk saya ke artikel terkait dari Stephen Toub . Seperti biasa, implementasinya elegan dan efisien:
sumber
Partitioner.Create
menggunakan partisi chunk, yang menyediakan elemen secara dinamis untuk tugas yang berbeda sehingga skenario yang Anda jelaskan tidak akan terjadi. Perhatikan juga bahwa partisi statis (ditentukan sebelumnya) mungkin lebih cepat dalam beberapa kasus karena overhead yang lebih sedikit (khususnya sinkronisasi). Untuk informasi lebih lanjut, lihat: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) akan berisi pengecualian (di dalam sebuahAggregateException
), dan akibatnya jika kata pemanggil digunakanawait
, pengecualian akan dilemparkan ke dalam situs panggilan. Namun,Task.WhenAll
masih akan menunggu semua tugas untuk diselesaikan, danGetPartitions
akan secara dinamis mengalokasikan elemen ketikapartition.MoveNext
dipanggil sampai tidak ada lagi elemen yang tersisa untuk diproses. Ini berarti bahwa kecuali Anda menambahkan mekanisme Anda sendiri untuk menghentikan pemrosesan (mis.CancellationToken
) Itu tidak akan terjadi dengan sendirinya.var current = partition.Current
sebelumnyaawait body
dan kemudian gunakancurrent
dalam lanjutan (ContinueWith(t => { ... }
).Anda dapat menghemat upaya dengan Paket NuGet AsyncEnumerator baru , yang tidak ada 4 tahun yang lalu ketika pertanyaan awalnya diposting. Ini memungkinkan Anda untuk mengontrol tingkat paralelisme:
Penafian: Saya penulis perpustakaan AsyncEnumerator, yang merupakan open source dan berlisensi di bawah MIT, dan saya memposting pesan ini hanya untuk membantu komunitas.
sumber
AsyncStreams
dan saya harus mengatakan itu sangat baik. Tidak cukup merekomendasikan perpustakaan ini.Bungkus
Parallel.Foreach
menjadiTask.Run()
dan bukannyaawait
penggunaan kata kunci[yourasyncmethod].Result
(Anda perlu melakukan hal Task.Run untuk tidak memblokir utas UI)
Sesuatu seperti ini:
sumber
Parallel.ForEach
lakukan pekerjaan paralel, yang memblokir sampai semua selesai, lalu dorong semuanya ke latar belakang untuk memiliki UI responsif. Ada masalah dengan itu? Mungkin itu terlalu banyak utas tidur, tapi ini kode pendek dan mudah dibaca.Task.Run
kapanTaskCompletionSource
lebih disukai.TaskCompletionSource
lebih disukai?await
dapat dipindahkan di depan untuk menyimpan nama variabel tambahan.Ini harusnya sangat efisien, dan lebih mudah daripada membuat seluruh TPL Dataflow berfungsi:
sumber
await
sepertivar customers = await ids.SelectAsync(async i => { ... });
:?Saya sedikit terlambat ke pesta tetapi Anda mungkin ingin mempertimbangkan untuk menggunakan GetAwaiter.GetResult () untuk menjalankan kode async Anda dalam konteks sinkronisasi tetapi sejajar seperti di bawah ini;
sumber
Metode ekstensi untuk ini yang memanfaatkan SemaphoreSlim dan juga memungkinkan untuk mengatur tingkat paralelisme maksimum
Penggunaan sampel:
sumber
Setelah memperkenalkan banyak metode pembantu, Anda akan dapat menjalankan kueri paralel dengan sintaks sederhana ini:
Apa yang terjadi di sini adalah: kami membagi kumpulan sumber menjadi 10 chunks (
.Split(DegreeOfParallelism)
), kemudian menjalankan 10 tugas masing-masing memproses itemnya satu per satu (.SelectManyAsync(...)
) dan menggabungkannya kembali ke dalam satu daftar.Layak disebutkan ada pendekatan yang lebih sederhana:
Tetapi perlu tindakan pencegahan : jika Anda memiliki koleksi sumber yang terlalu besar, itu akan menjadwalkan
Task
untuk setiap item segera, yang dapat menyebabkan hit kinerja yang signifikan.Metode ekstensi yang digunakan dalam contoh di atas terlihat sebagai berikut:
sumber