Apa cara terbaik untuk membatasi konkurensi saat menggunakan Promise.all () ES6?

99

Saya memiliki beberapa kode yang mengulang daftar yang ditanyakan dari database dan membuat permintaan HTTP untuk setiap elemen dalam daftar itu. Daftar itu terkadang bisa menjadi jumlah yang cukup besar (dalam ribuan), dan saya ingin memastikan saya tidak masuk ke server web dengan ribuan permintaan HTTP bersamaan.

Versi singkat dari kode ini sekarang terlihat seperti ini ...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Kode ini berjalan di Node 4.3.2. Untuk mengulangi, dapatkah Promise.alldikelola sehingga hanya sejumlah Janji yang sedang diproses pada waktu tertentu?

Chris
sumber
1
kemungkinan duplikat Batas konkurensi janji sedang dijalankan
Bergi
3
Jangan lupa bahwa hal Promise.allitu mengatur perkembangan janji - janji melakukannya sendiri, Promise.alltunggu saja.
Bergi
1
Hindari Promiseantipattern konstruktor !
Bergi

Jawaban:

51

Perhatikan itu Promise.all()tidak memicu janji untuk memulai pekerjaan mereka, membuat janji itu sendiri tidak.

Dengan pemikiran tersebut, salah satu solusinya adalah memeriksa kapan pun sebuah janji diputuskan apakah janji baru harus dimulai atau apakah Anda sudah mencapai batasnya.

Namun, sebenarnya tidak perlu menemukan kembali roda di sini. Salah satu perpustakaan yang bisa Anda gunakan untuk tujuan ini adalahes6-promise-pool . Dari contoh mereka:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Timo
sumber
25
Sangat disayangkan bahwa es6-promise-pool memperbarui Promise alih-alih menggunakannya. Saya menyarankan solusi ringkas ini sebagai gantinya (jika Anda sudah menggunakan ES6 atau ES7) github.com/rxaviers/async-pool
Rafael Xavier
3
Melihat keduanya, async-pool terlihat jauh lebih baik! Lebih lurus ke depan dan lebih ringan.
Tak berujung
2
Saya juga menemukan p-limit sebagai implementasi yang paling sederhana. Lihat contoh saya di bawah ini. stackoverflow.com/a/52262024/8177355
Matthew Rideout
2
Saya pikir tiny-asyc-pool adalah solusi yang jauh lebih baik, tidak mengganggu, dan agak alami untuk membatasi konkurensi janji.
Cerah Tambi
74

Batas-P

Saya telah membandingkan batasan konkurensi promise dengan skrip kustom, bluebird, es6-promise-pool, dan p-limit. Saya percaya bahwa p-limit memiliki implementasi yang paling sederhana dan dipreteli untuk kebutuhan ini. Lihat dokumentasi mereka .

Persyaratan

Agar kompatibel dengan async misalnya

Contoh Saya

Dalam contoh ini, kita perlu menjalankan fungsi untuk setiap URL dalam array (seperti, mungkin permintaan API). Ini namanya fetchData(). Jika kami memiliki larik ribuan item untuk diproses, konkurensi pasti akan berguna untuk menghemat CPU dan sumber daya memori.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

Hasil log konsol adalah larik data respons promise Anda yang telah diselesaikan.

Matthew Rideout
sumber
4
Terima kasih untuk yang ini! Yang ini jauh lebih sederhana
Yohanes
3
Sejauh ini perpustakaan terbaik yang pernah saya lihat untuk membatasi permintaan secara bersamaan. Dan contoh yang bagus, terima kasih!
Chris Livdahl
2
Terima kasih telah melakukan perbandingan. Pernahkah Anda membandingkan dengan github.com/rxaviers/async-pool ?
ahong
1
Mudah digunakan, pilihan bagus.
drmrbrewer
22

Menggunakan Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
kaviar yang melambat
sumber
2
Ini adalah solusi yang diremehkan. Cintai kesederhanaan.
Brannon
8
Ini menjalankan fungsi dalam batch, bukan pool, di mana satu fungsi langsung dipanggil saat fungsi lainnya selesai.
cltsang
Suka solusi ini!
prasun
butuh beberapa saat untuk memahami apa yang dilakukannya dengan kurangnya konteks di sekitarnya, seperti itu menjadi batch daripada kumpulan misalnya. Anda menyusun ulang array setiap kali Anda menyambungkan dari awal atau tengah. (browser harus mengindeks ulang semua item) kinerja teoritis alternatif yang lebih baik adalah mengambil barang dari akhir alih-alih arr.splice(-100)jika pesanan dosis tidak mather, mungkin Anda dapat membalikkan susunan: P
Endless
Sangat berguna untuk dijalankan secara berkelompok. Catatan: kelompok berikutnya tidak akan dimulai sampai kelompok saat ini 100% selesai.
Casey Dwayne
21

Jika Anda mengetahui cara kerja iterator dan cara penggunaannya, Anda tidak memerlukan pustaka tambahan, karena akan sangat mudah untuk membuat konkurensi Anda sendiri. Izinkan saya menunjukkan:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

Kami dapat menggunakan iterator yang sama dan membagikannya ke seluruh pekerja.

Jika Anda telah menggunakan .entries()alih-alih .values()Anda akan mendapatkan array 2D [[index, value]]yang akan saya tunjukkan di bawah ini dengan konkurensi 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

Manfaatnya adalah Anda dapat memiliki fungsi generator alih-alih menyiapkan semuanya sekaligus.


Catatan: perbedaan dari ini dibandingkan dengan example async-pool adalah ia memunculkan dua pekerja, jadi jika satu pekerja melontarkan kesalahan karena suatu alasan di katakan indeks 5, itu tidak akan menghentikan pekerja lain untuk melakukan sisanya. Jadi Anda beralih dari melakukan 2 konkurensi menjadi 1. (jadi tidak akan berhenti di situ) Jadi saran saya adalah Anda menangkap semua kesalahan di dalam doWorkfungsi

Tak ada habisnya
sumber
Ini luar biasa! Terima kasih Endless!
pengguna3413723
Ini jelas merupakan pendekatan yang keren! Pastikan saja konkurensi Anda tidak melebihi panjang daftar tugas Anda (jika Anda tetap peduli dengan hasil) karena Anda mungkin akan mendapatkan tambahan!
Kris Oye
Sesuatu yang mungkin lebih keren nanti adalah ketika Streams mendapatkan Readable.f dari dukungan (iterator) . Chrome telah membuat aliran dapat ditransfer . sehingga Anda dapat membuat aliran yang dapat dibaca dan mengirimkannya ke pekerja web, dan semuanya akan menggunakan iterator dasar yang sama.
Tak berujung
16

Bluebird's Promise.map dapat menggunakan opsi konkurensi untuk mengontrol berapa banyak promise yang harus dijalankan secara paralel. Terkadang lebih mudah daripada .allkarena Anda tidak perlu membuat array promise.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Jingshao Chen
sumber
bluebird adalah parut jika Anda membutuhkan janji yang lebih cepat dan ~ 18kb sampah ekstra jika Anda hanya menggunakannya untuk satu hal;)
Endless
1
Semua tergantung seberapa penting satu hal itu bagi Anda dan apakah ada cara lain yang lebih cepat / mudah yang lebih baik. Sebuah trade off yang khas. Saya akan memilih kemudahan penggunaan dan fungsi di atas beberapa kb, tetapi YMMV.
Jingshao Chen
11

Alih-alih menggunakan promise untuk membatasi permintaan http, gunakan http.Agent.maxSockets bawaan node . Ini menghilangkan persyaratan untuk menggunakan pustaka atau menulis kode penggabungan Anda sendiri, dan memiliki keuntungan tambahan lebih banyak kontrol atas apa yang Anda batasi.

agent.maxSockets

Secara default diatur ke Infinity. Menentukan berapa banyak soket bersamaan yang dapat dibuka agen per asal. Origin bisa berupa kombinasi 'host: port' atau 'host: port: localAddress'.

Sebagai contoh:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

Jika membuat beberapa permintaan ke asal yang sama, mungkin juga menguntungkan Anda untuk menyetel keepAliveke true (lihat dokumen di atas untuk info lebih lanjut).

tcooc
sumber
11
Tetap saja, membuat ribuan penutupan dengan segera dan soket penyatuan tampaknya tidak terlalu efisien?
Bergi
3

Saya menyarankan perpustakaan async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Deskripsi:

Jalankan beberapa fungsi memunculkan janji & asinkron dengan konkurensi terbatas menggunakan ES6 / ES7 asli

asyncPool menjalankan beberapa fungsi yang mengembalikan janji & async dalam kumpulan konkurensi terbatas. Ia segera menolak begitu salah satu janji ditolak. Itu terselesaikan ketika semua janji selesai. Ini memanggil fungsi iterator sesegera mungkin (di bawah batas konkurensi).

Pemakaian:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
Venryx
sumber
1
Bekerja untuk saya. Terima kasih. Ini perpustakaan yang bagus.
Cerah Tambi
2

Ini dapat diatasi menggunakan rekursi.

Idenya adalah pada awalnya Anda mengirim jumlah permintaan maksimum yang diizinkan dan setiap permintaan ini harus terus dikirim secara rekursif setelah diselesaikan.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
Anton Fil
sumber
2

Berikut adalah solusi ES7 saya untuk copy-paste ramah dan fitur lengkap Promise.all()/ map()alternatif, dengan batas konkurensi.

Mirip dengan Promise.all()itu mempertahankan urutan pengembalian serta fallback untuk nilai-nilai pengembalian yang tidak menjanjikan.

Saya juga menyertakan perbandingan implementasi yang berbeda karena ini menggambarkan beberapa aspek yang terlewatkan oleh beberapa solusi lain.

Pemakaian

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Penerapan

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Perbandingan

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Kesimpulan

asyncPool() harus menjadi solusi terbaik karena memungkinkan permintaan baru untuk dimulai segera setelah permintaan sebelumnya selesai.

asyncBatch() disertakan sebagai perbandingan karena implementasinya lebih mudah dipahami, tetapi performanya harus lebih lambat karena semua permintaan dalam batch yang sama harus diselesaikan untuk memulai batch berikutnya.

Dalam contoh yang dibuat-buat ini, vanilla non-limited Promise.all()tentu saja yang tercepat, sementara yang lain bisa tampil lebih diinginkan dalam skenario kemacetan dunia nyata.

Memperbarui

Pustaka async-pool yang telah disarankan orang lain mungkin merupakan alternatif yang lebih baik untuk implementasi saya karena bekerja hampir sama dan memiliki implementasi yang lebih ringkas dengan penggunaan Promise.race () yang cerdas: https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js

Semoga jawaban saya tetap bisa menjadi nilai pendidikan.

Adelost
sumber
1

Ini dia contoh dasar untuk streaming dan 'p-limit'. Ini mengalirkan http read stream ke mongo db.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
gosuer1921
sumber
0

Jadi saya mencoba membuat beberapa contoh yang ditampilkan berfungsi untuk kode saya, tetapi karena ini hanya untuk skrip impor dan bukan kode produksi, menggunakan paket npm batch- promise merupakan cara termudah bagi saya.

CATATAN: Memerlukan waktu proses untuk mendukung Promise atau menjadi polyfill.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee akan dipanggil setelah setiap batch.

Menggunakan:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Agusti Fernandez Pardo
sumber
0

Rekursi adalah jawabannya jika Anda tidak ingin menggunakan pustaka eksternal

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
Juan
sumber
0

Inilah yang saya gunakan Promise.race, di dalam kode saya di sini

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

Jika Anda ingin melihat contoh: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Alex
sumber
2
Saya tidak akan menyebut konkurensi itu ... Itu lebih seperti eksekusi batch ... Anda melakukan 4 tugas, menunggu semua selesai dan kemudian melakukan 4. berikutnya jika salah satu diselesaikan lebih awal, Anda masih menunggu 3 lainnya selesai , yang harus Anda gunakan adalahPromise.race
Endless
0
  • @tokopedia lumayan keren. Tidak tahu tentang itu dan akan memanfaatkannya di masa depan.
  • Saya juga menikmati jawaban @MatthewRideout , tetapi menggunakan pustaka eksternal !!

Jika memungkinkan, saya mencoba mengembangkan hal-hal semacam ini sendiri, daripada pergi ke perpustakaan. Anda akhirnya mempelajari banyak konsep yang sebelumnya tampak menakutkan.

Apa pendapat kalian tentang upaya ini:
(Saya telah banyak memikirkannya dan menurut saya ini berhasil, tetapi tunjukkan jika tidak atau ada sesuatu yang secara fundamental salah)

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

Pendekatan ini menyediakan API yang bagus, mirip dengan kumpulan thread di scala / java.
Setelah membuat satu contoh kumpulan dengan const cappedPool = new Pool(2), Anda memberikan janji padanya dengan sederhana cappedPool.add(() => myPromise).
Jelas kita harus memastikan bahwa promise tidak segera dimulai dan itulah mengapa kita harus "menyediakannya dengan malas" dengan bantuan fungsi.

Yang terpenting, perhatikan bahwa hasil dari metode tersebut add adalah Promise yang akan diselesaikan / diselesaikan dengan nilai promise asli Anda ! Ini membuat penggunaan yang sangat intuitif.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)
Carlos Teixeira
sumber
0

Sayangnya tidak ada cara untuk melakukannya dengan asli Promise.all, jadi Anda harus kreatif.

Ini adalah cara tercepat tercepat yang dapat saya temukan tanpa menggunakan pustaka luar.

Itu menggunakan fitur javascript yang lebih baru yang disebut iterator. Iterator pada dasarnya melacak item apa yang telah diproses dan apa yang belum.

Untuk menggunakannya dalam kode, Anda membuat larik fungsi asinkron. Setiap fungsi async meminta iterator yang sama untuk item berikutnya yang perlu diproses. Setiap fungsi memproses itemnya sendiri secara asynchronous, dan setelah selesai meminta iterator untuk yang baru. Setelah iterator kehabisan item, semua fungsi selesai.

Terima kasih kepada @Endless untuk inspirasinya.

var items = [
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
    for(let [index, url] of cursor){
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    }
})

pengguna3413723
sumber
Penasaran mengapa ini ditandai. Ini sangat mirip dengan apa yang saya pikirkan.
Kris Oye
0

Begitu banyak solusi bagus. Saya memulai dengan solusi elegan yang diposting oleh @Endless dan berakhir dengan metode ekstensi kecil ini yang tidak menggunakan pustaka eksternal apa pun juga tidak berjalan dalam batch (meskipun mengasumsikan Anda memiliki fitur seperti async, dll):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Kris Oye
sumber