Bagaimana cara membatasi jumlah operasi asinkron I / O serentak?

115
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Inilah masalahnya, ini memulai 1000+ permintaan web secara bersamaan. Adakah cara mudah untuk membatasi jumlah permintaan http asinkron ini secara bersamaan? Sehingga tidak lebih dari 20 halaman web yang diunduh pada waktu tertentu. Bagaimana melakukannya dengan cara yang paling efisien?

Coder Duka
sumber
2
Apa bedanya dengan pertanyaan Anda sebelumnya ?
svick
1
stackoverflow.com/questions/9290498/… Dengan parameter ParallelOptions.
Chris Disley
4
@ChrisDisley, ini hanya akan memparalelkan peluncuran permintaan.
pembelanjaan
@svick benar, apa bedanya? btw, saya suka jawabannya di sana stackoverflow.com/a/10802883/66372
eglasius
3
Selain HttpClientitu IDisposable, dan Anda harus membuangnya, terutama ketika Anda akan menggunakan 1000+ dari mereka. HttpClientbisa digunakan sebagai tunggal untuk beberapa permintaan.
Shimmy Weitzhandler

Jawaban:

161

Anda pasti dapat melakukan ini di versi terbaru asinkron untuk .NET, menggunakan .NET 4.5 Beta. Posting sebelumnya dari 'usr' menunjuk ke artikel bagus yang ditulis oleh Stephen Toub, tetapi berita yang kurang diumumkan adalah async semaphore benar-benar berhasil masuk ke rilis Beta .NET 4.5

Jika Anda melihat SemaphoreSlimkelas kami yang kami cintai (yang seharusnya Anda gunakan karena lebih berkinerja daripada aslinya Semaphore), kelas ini sekarang menawarkan WaitAsync(...)serangkaian kelebihan beban, dengan semua argumen yang diharapkan - interval waktu tunggu, token pembatalan, semua teman penjadwalan Anda yang biasa: )

Stephen juga menulis entri blog yang lebih baru tentang barang .NET 4.5 baru yang keluar dengan beta, lihat Yang Baru untuk Paralelisme di .NET 4.5 Beta .

Terakhir, berikut ini beberapa contoh kode tentang cara menggunakan SemaphoreSlim untuk pelambatan metode asinkron:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Terakhir, tetapi mungkin yang layak disebutkan adalah solusi yang menggunakan penjadwalan berbasis TPL. Anda dapat membuat tugas terikat delegasi di TPL yang belum dimulai, dan mengizinkan penjadwal tugas kustom untuk membatasi konkurensi. Faktanya, ada contoh MSDN untuk itu di sini:

Lihat juga TaskScheduler .

Theo Yaung
sumber
3
Bukankah sebuah paralelisme. Fase dengan derajat paralelisme yang terbatas merupakan pendekatan yang lebih baik? msdn.microsoft.com/en-us/library/…
GreyCloud
2
Mengapa Anda tidak membuang AndaHttpClient
Shimmy Weitzhandler
4
@GreyCloud: Parallel.ForEachbekerja dengan kode sinkron. Ini memungkinkan Anda memanggil kode asynchronous.
Josh Noe
2
@Theonarch Anda salah . Selain itu selalu kebiasaan yang baik untuk membungkus semua IDisposabledalam usingatau try-finallypernyataan, dan menjamin mereka miliki.
Shimmy Weitzhandler
29
Mengingat seberapa populer jawaban ini, ada baiknya menunjukkan bahwa HttpClient dapat dan harus menjadi satu contoh umum daripada satu contoh per permintaan.
Rupert Rawnsley
15

Jika Anda memiliki IEnumerable (mis. String URL) dan Anda ingin melakukan operasi terikat I / O dengan masing-masing ini (mis. Membuat permintaan http asinkron) secara bersamaan DAN secara opsional Anda juga ingin menyetel jumlah maksimum konkuren Permintaan I / O secara real time, berikut cara melakukannya. Dengan cara ini Anda tidak menggunakan thread pool dkk, metode ini menggunakan semaphoreslim untuk mengontrol permintaan I / O bersamaan maksimum yang mirip dengan pola jendela geser satu permintaan selesai, meninggalkan semaphore dan yang berikutnya masuk.

penggunaan: await ForEachAsync (urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
Dogu Arslan
sumber
tidak, Anda tidak perlu secara eksplisit membuang SemaphoreSlim dalam implementasi dan penggunaan ini karena digunakan secara internal di dalam metode dan metode tersebut tidak mengakses properti AvailableWaitHandle dalam hal ini kami perlu membuang atau membungkusnya dalam blok using.
Dogu Arslan
1
Hanya memikirkan praktik dan pelajaran terbaik yang kami ajarkan kepada orang lain. A usingakan bagus.
AgentFire
nah contoh ini dapat saya ikuti, tetapi mencoba mencari tahu apa cara terbaik untuk melakukan ini, pada dasarnya memiliki throttler tetapi Func saya akan mengembalikan daftar, yang pada akhirnya saya inginkan dalam daftar akhir semua selesai ketika selesai ... yang mungkin perlu dikunci dalam daftar, apakah Anda punya saran.
Seabizkit
Anda dapat sedikit memperbarui metode ini sehingga mengembalikan daftar tugas sebenarnya dan Anda menunggu Task.WhenAll dari dalam kode panggilan Anda. Setelah Task.WhenAll selesai, Anda dapat menghitung setiap tugas dalam daftar dan menambahkan daftarnya ke daftar akhir. Ubah tanda tangan metode menjadi 'public static IEnumerable <Task <TOut>> ForEachAsync <TIn, TOut> (IEnumerable <TIn> inputEnumerable, Func <TIn, Task <TOut>> asyncProcessor, int? MaxDegreeOfParallelism = null)'
Dogu Arslan
7

Sayangnya, .NET Framework kehilangan kombinator terpenting untuk mengatur tugas-tugas asinkron paralel. Tidak ada hal bawaan seperti itu.

Lihatlah kelas AsyncSemaphore yang dibangun oleh Stephen Toub yang paling terhormat. Yang Anda inginkan disebut semafor, dan Anda memerlukan versi asinkronnya.

usr
sumber
12
Perhatikan bahwa "Sayangnya, .NET Framework kehilangan kombinator paling penting untuk mengatur tugas asinkron paralel. Tidak ada hal bawaan seperti itu." tidak lagi benar pada .NET 4.5 Beta. SemaphoreSlim sekarang menawarkan fungsionalitas WaitAsync (...) :)
Theo Yaung
Haruskah SemaphoreSlim (dengan metode asinkron barunya) lebih disukai daripada AsyncSemphore, atau apakah implementasi Toub masih memiliki beberapa keuntungan?
Todd Menier
Menurut pendapat saya, tipe built-in harus lebih disukai karena kemungkinan akan teruji dan dirancang dengan baik.
usr
4
Stephen menambahkan komentar sebagai tanggapan atas pertanyaan di entri blognya yang mengonfirmasi bahwa menggunakan SemaphoreSlim untuk .NET 4.5 pada umumnya adalah cara yang tepat.
jdasilva
7

Ada banyak jebakan dan penggunaan langsung dari semaphore bisa menjadi rumit dalam kasus kesalahan, jadi saya akan menyarankan untuk menggunakan AsyncEnumerator NuGet Package alih-alih menemukan kembali roda:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
Serge Semenov
sumber
4

Contoh Theo Yaung memang bagus, tapi ada varian tanpa daftar tugas menunggu.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
vitidev.dll
sumber
4
Perhatikan, ada satu bahaya menggunakan pendekatan ini - setiap pengecualian yang terjadi di ProccessUrlatau subfungsinya akan benar-benar diabaikan. Mereka akan dimasukkan ke dalam Tasks, tetapi tidak diserap kembali ke penelepon asli Check(...). Secara pribadi, itulah mengapa saya masih menggunakan Tasks dan fungsi kombinatornya seperti WhenAlldan WhenAny- untuk mendapatkan propagasi kesalahan yang lebih baik. :)
Theo Yaung
3

SemaphoreSlim bisa sangat membantu di sini. Berikut metode ekstensi yang saya buat.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Contoh Penggunaan:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
sumber
0

Pertanyaan lama, jawaban baru. @vitidev memiliki blok kode yang digunakan kembali hampir utuh dalam proyek yang saya ulas. Setelah berdiskusi dengan beberapa kolega, seseorang bertanya "Mengapa Anda tidak menggunakan metode TPL bawaan saja?" ActionBlock terlihat seperti pemenang di sana. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . Mungkin tidak akan berakhir dengan mengubah kode yang ada tetapi pasti akan melihat untuk mengadopsi nuget ini dan menggunakan kembali praktik terbaik Mr. Softy untuk paralelisme throttled.

Tidak Ada Pengembalian Dana Tidak Ada Pengembalian
sumber
0

Berikut adalah solusi yang memanfaatkan sifat malas LINQ. Ini secara fungsional setara dengan jawaban yang diterima ), tetapi menggunakan pekerja-tugas daripada a SemaphoreSlim, dengan cara ini mengurangi jejak memori dari seluruh operasi. Pada awalnya mari kita membuatnya bekerja tanpa pembatasan. Langkah pertama adalah mengubah url kami menjadi tugas yang tak terhitung banyaknya.

string[] urls =
{
    "https://stackoverflow.com",
    "https://superuser.com",
    "https://serverfault.com",
    "https://meta.stackexchange.com",
    // ...
};
var httpClient = new HttpClient();
var tasks = urls.Select(async (url) =>
{
    return (Url: url, Html: await httpClient.GetStringAsync(url));
});

Langkah kedua adalah untuk awaitsemua tugas secara bersamaan menggunakan Task.WhenAllmetode ini:

var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
    Console.WriteLine($"Url: {result.Url}, {result.Html.Length:#,0} chars");
}

Keluaran:

Url: https://stackoverflow.com , 105.574 karakter
Url: https://superuser.com , 126.953 karakter
Url: https://serverfault.com , 125.963 karakter
Url: https://meta.stackexchange.com , 185.276 karakter
...

Implementasi Microsoft dari Task.WhenAllterwujud langsung yang disediakan enumerable ke array, menyebabkan semua tugas untuk mulai sekaligus. Kami tidak menginginkannya, karena kami ingin membatasi jumlah operasi asinkron serentak. Jadi kita perlu menerapkan alternatif WhenAllyang akan menghitung jumlah kita dengan lembut dan perlahan. Kami akan melakukannya dengan membuat sejumlah tugas-pekerja (sama dengan tingkat konkurensi yang diinginkan), dan setiap tugas-pekerja akan menghitung satu tugas kami yang dapat dihitung dalam satu waktu, menggunakan kunci untuk memastikan bahwa setiap url-tugas akan diproses dengan hanya satu pekerja-tugas. Kemudian kami awaitmenyelesaikan semua pekerja-tugas, dan akhirnya kami mengembalikan hasilnya. Berikut implementasinya:

public static async Task<T[]> WhenAll<T>(IEnumerable<Task<T>> tasks,
    int concurrencyLevel)
{
    if (tasks is ICollection<Task<T>>) throw new ArgumentException(
        "The enumerable should not be materialized.", nameof(tasks));
    var locker = new object();
    var results = new List<T>();
    var failed = false;
    using (var enumerator = tasks.GetEnumerator())
    {
        var workerTasks = Enumerable.Range(0, concurrencyLevel)
        .Select(async _ =>
        {
            try
            {
                while (true)
                {
                    Task<T> task;
                    int index;
                    lock (locker)
                    {
                        if (failed) break;
                        if (!enumerator.MoveNext()) break;
                        task = enumerator.Current;
                        index = results.Count;
                        results.Add(default); // Reserve space in the list
                    }
                    var result = await task.ConfigureAwait(false);
                    lock (locker) results[index] = result;
                }
            }
            catch (Exception)
            {
                lock (locker) failed = true;
                throw;
            }
        }).ToArray();
        await Task.WhenAll(workerTasks).ConfigureAwait(false);
    }
    lock (locker) return results.ToArray();
}

... dan inilah yang harus kita ubah dalam kode awal kita, untuk mencapai pelambatan yang diinginkan:

var results = await WhenAll(tasks, concurrencyLevel: 2);

Ada perbedaan terkait penanganan pengecualian. Native Task.WhenAllmenunggu semua tugas selesai dan menggabungkan semua pengecualian. Implementasi di atas berakhir segera setelah menyelesaikan tugas yang salah pertama.

Theodor Zoulias
sumber
Implementasi AC # 8 yang mengembalikan IAsyncEnumerable<T>dapat ditemukan di sini .
Theodor Zoulias
-1

Meskipun 1000 tugas mungkin diantrekan dengan sangat cepat, library Parallel Tasks hanya dapat menangani tugas bersamaan yang sama dengan jumlah inti CPU di mesin. Artinya, jika Anda memiliki mesin empat inti, hanya 4 tugas yang akan dijalankan pada waktu tertentu (kecuali Anda menurunkan MaxDegreeOfParallelism).

scottm
sumber
8
Ya, tapi itu tidak terkait dengan operasi I / O asinkron. Kode di atas akan menjalankan 1000+ unduhan simultan meskipun berjalan pada satu utas.
Grief Coder
Tidak melihat awaitkata kunci di sana. Menghapus itu seharusnya menyelesaikan masalah, benar?
scottm
2
Pustaka tersebut tentu dapat menangani lebih banyak tugas yang berjalan (dengan Runningstatus) secara bersamaan daripada jumlah inti. Ini akan menjadi kasus khusus dengan Tugas terikat I / O.
svick
@vick: ya. Apakah Anda tahu cara mengontrol tugas TPL bersamaan secara efisien (bukan utas)?
Grief Coder
-1

Perhitungan paralel harus digunakan untuk mempercepat operasi yang terikat dengan CPU. Di sini kita berbicara tentang operasi terikat I / O. Implementasi Anda harus murni asinkron , kecuali jika Anda membanjiri single core yang sibuk pada CPU multi-core Anda.

EDIT Saya suka saran yang dibuat oleh usr untuk menggunakan "semafor asinkron" di sini.

GregC
sumber
Poin bagus! Padahal setiap tugas di sini akan berisi kode asinkron dan sinkronisasi (halaman diunduh secara asinkron kemudian diproses secara sinkron). Saya mencoba mendistribusikan porsi sinkronisasi kode di seluruh CPU dan pada saat yang sama membatasi jumlah operasi I / O asinkron bersamaan.
Grief Coder
Mengapa? Karena meluncurkan 1000+ permintaan http secara bersamaan mungkin bukan tugas yang sesuai dengan kapasitas jaringan pengguna.
pembelanjaan
Ekstensi paralel juga dapat digunakan sebagai cara untuk menjalankan operasi I / O multipleks tanpa harus mengimplementasikan solusi asinkron murni secara manual. Yang saya setujui bisa dianggap ceroboh, tetapi selama Anda membatasi jumlah operasi bersamaan, hal itu mungkin tidak akan terlalu membebani threadpool.
Sean U
3
Saya rasa jawaban ini tidak memberikan jawaban. Menjadi murni asinkron saja tidak cukup di sini: Kami benar-benar ingin membatasi IO fisik dengan cara yang tidak memblokir.
usr
1
Hmm .. tidak yakin saya setuju ... ketika mengerjakan proyek besar, jika satu terlalu banyak pengembang mengambil pandangan ini, Anda akan kelaparan meskipun kontribusi masing-masing pengembang dalam isolasi tidak cukup untuk membalikkan keadaan. Mengingat bahwa hanya ada satu ThreadPool, meskipun Anda memperlakukannya dengan agak hormat ... jika semua orang melakukan hal yang sama, masalah dapat terjadi. Karena itu saya selalu menyarankan agar tidak menjalankan barang lama di ThreadPool.
pembelanjaan
-1

Gunakan MaxDegreeOfParallelism, yang merupakan opsi yang dapat Anda tentukan di Parallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
Sean U
sumber
4
Saya tidak berpikir ini berhasil. GetStringAsync(url)dimaksudkan untuk dipanggil dengan await. Jika Anda memeriksa jenisnya var html, itu adalah Task<string>, bukan hasilnya string.
Neal Ehardt
2
@NealEht benar. Parallel.ForEach(...)ditujukan untuk menjalankan blok kode sinkron secara paralel (misalnya pada utas yang berbeda).
Theo Yaung
-1

Pada dasarnya Anda akan ingin membuat Tindakan atau Tugas untuk setiap URL yang ingin Anda tekan, memasukkannya ke dalam Daftar, dan kemudian memproses daftar itu, membatasi jumlah yang dapat diproses secara paralel.

Entri blog saya menunjukkan cara melakukan ini dengan Tasks dan dengan Tindakan, dan memberikan contoh proyek yang dapat Anda unduh dan jalankan untuk melihat keduanya beraksi.

Dengan Actions

Jika menggunakan Actions, Anda dapat menggunakan fungsi .Net Parallel.Invoke bawaan. Di sini kami membatasinya untuk menjalankan maksimal 20 utas secara paralel.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

Dengan Tasks

Dengan Tasks tidak ada fungsi bawaan. Namun, Anda dapat menggunakan yang saya sediakan di blog saya.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

Dan kemudian membuat daftar Tugas Anda dan memanggil fungsi untuk menjalankannya, katakanlah maksimal 20 secara bersamaan, Anda dapat melakukan ini:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
anjing mematikan
sumber
Saya pikir Anda hanya menentukan initialCount untuk SemaphoreSlim dan Anda perlu menentukan parameter ke-2 yaitu maxCount di konstruktor SemaphoreSlim.
Jay Shah
Saya ingin setiap tanggapan dari setiap tugas diproses menjadi Daftar. Bagaimana saya bisa mendapatkan Hasil atau tanggapan kembali
venkat
-1

ini bukan praktik yang baik karena mengubah variabel global. ini juga bukan solusi umum untuk asinkron. tetapi mudah untuk semua contoh HttpClient, jika hanya itu yang Anda cari. Anda cukup mencoba:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;
symbiont
sumber