Saya memiliki beberapa kode yang mengulang daftar yang ditanyakan dari database dan membuat permintaan HTTP untuk setiap elemen dalam daftar itu. Daftar itu terkadang bisa menjadi jumlah yang cukup besar (dalam ribuan), dan saya ingin memastikan saya tidak masuk ke server web dengan ribuan permintaan HTTP bersamaan.
Versi singkat dari kode ini sekarang terlihat seperti ini ...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Kode ini berjalan di Node 4.3.2. Untuk mengulangi, dapatkah Promise.all
dikelola sehingga hanya sejumlah Janji yang sedang diproses pada waktu tertentu?
javascript
node.js
es6-promise
Chris
sumber
sumber
Promise.all
itu mengatur perkembangan janji - janji melakukannya sendiri,Promise.all
tunggu saja.Promise
antipattern konstruktor !Jawaban:
Perhatikan itu
Promise.all()
tidak memicu janji untuk memulai pekerjaan mereka, membuat janji itu sendiri tidak.Dengan pemikiran tersebut, salah satu solusinya adalah memeriksa kapan pun sebuah janji diputuskan apakah janji baru harus dimulai atau apakah Anda sudah mencapai batasnya.
Namun, sebenarnya tidak perlu menemukan kembali roda di sini. Salah satu perpustakaan yang bisa Anda gunakan untuk tujuan ini adalah
es6-promise-pool
. Dari contoh mereka:// On the Web, leave out this line and use the script tag above instead. var PromisePool = require('es6-promise-pool') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log('All promises fulfilled') }, function (error) { console.log('Some promise rejected: ' + error.message) })
sumber
Batas-P
Saya telah membandingkan batasan konkurensi promise dengan skrip kustom, bluebird, es6-promise-pool, dan p-limit. Saya percaya bahwa p-limit memiliki implementasi yang paling sederhana dan dipreteli untuk kebutuhan ini. Lihat dokumentasi mereka .
Persyaratan
Agar kompatibel dengan async misalnya
Contoh Saya
Dalam contoh ini, kita perlu menjalankan fungsi untuk setiap URL dalam array (seperti, mungkin permintaan API). Ini namanya
fetchData()
. Jika kami memiliki larik ribuan item untuk diproses, konkurensi pasti akan berguna untuk menghemat CPU dan sumber daya memori.const pLimit = require('p-limit'); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();
Hasil log konsol adalah larik data respons promise Anda yang telah diselesaikan.
sumber
Menggunakan
Array.prototype.splice
while (funcs.length) { // 100 at at time await Promise.all( funcs.splice(0, 100).map(f => f()) ) }
sumber
arr.splice(-100)
jika pesanan dosis tidak mather, mungkin Anda dapat membalikkan susunan: PJika Anda mengetahui cara kerja iterator dan cara penggunaannya, Anda tidak memerlukan pustaka tambahan, karena akan sangat mudah untuk membuat konkurensi Anda sendiri. Izinkan saya menunjukkan:
/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log('x:', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x's for (const y of iterator) { console.log('y:', y) } }
Kami dapat menggunakan iterator yang sama dan membagikannya ke seluruh pekerja.
Jika Anda telah menggunakan
.entries()
alih-alih.values()
Anda akan mendapatkan array 2D[[index, value]]
yang akan saya tunjukkan di bawah ini dengan konkurensi 2const sleep = t => new Promise(rs => setTimeout(rs, t)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + ': ' + item) } } const iterator = Array.from('abcdefghij').entries() const workers = new Array(2).fill(iterator).map(doWork) // ^--- starts two workers sharing the same iterator Promise.allSettled(workers).then(() => console.log('done'))
Manfaatnya adalah Anda dapat memiliki fungsi generator alih-alih menyiapkan semuanya sekaligus.
Catatan: perbedaan dari ini dibandingkan dengan example async-pool adalah ia memunculkan dua pekerja, jadi jika satu pekerja melontarkan kesalahan karena suatu alasan di katakan indeks 5, itu tidak akan menghentikan pekerja lain untuk melakukan sisanya. Jadi Anda beralih dari melakukan 2 konkurensi menjadi 1. (jadi tidak akan berhenti di situ) Jadi saran saya adalah Anda menangkap semua kesalahan di dalam
doWork
fungsisumber
Bluebird's Promise.map dapat menggunakan opsi konkurensi untuk mengontrol berapa banyak promise yang harus dijalankan secara paralel. Terkadang lebih mudah daripada
.all
karena Anda tidak perlu membuat array promise.const Promise = require('bluebird') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }
sumber
Alih-alih menggunakan promise untuk membatasi permintaan http, gunakan http.Agent.maxSockets bawaan node . Ini menghilangkan persyaratan untuk menggunakan pustaka atau menulis kode penggabungan Anda sendiri, dan memiliki keuntungan tambahan lebih banyak kontrol atas apa yang Anda batasi.
Sebagai contoh:
var http = require('http'); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);
Jika membuat beberapa permintaan ke asal yang sama, mungkin juga menguntungkan Anda untuk menyetel
keepAlive
ke true (lihat dokumen di atas untuk info lebih lanjut).sumber
Saya menyarankan perpustakaan async-pool: https://github.com/rxaviers/async-pool
npm install tiny-async-pool
Deskripsi:
Pemakaian:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
sumber
Ini dapat diatasi menggunakan rekursi.
Idenya adalah pada awalnya Anda mengirim jumlah permintaan maksimum yang diizinkan dan setiap permintaan ini harus terus dikirim secara rekursif setelah diselesaikan.
function batchFetch(urls, concurrentRequestsLimit) { return new Promise(resolve => { var documents = []; var index = 0; function recursiveFetch() { if (index === urls.length) { return; } fetch(urls[index++]).then(r => { documents.push(r.text()); if (documents.length === urls.length) { resolve(documents); } else { recursiveFetch(); } }); } for (var i = 0; i < concurrentRequestsLimit; i++) { recursiveFetch(); } }); } var sources = [ 'http://www.example_1.com/', 'http://www.example_2.com/', 'http://www.example_3.com/', ... 'http://www.example_100.com/' ]; batchFetch(sources, 5).then(documents => { console.log(documents); });
sumber
Berikut adalah solusi ES7 saya untuk copy-paste ramah dan fitur lengkap
Promise.all()
/map()
alternatif, dengan batas konkurensi.Mirip dengan
Promise.all()
itu mempertahankan urutan pengembalian serta fallback untuk nilai-nilai pengembalian yang tidak menjanjikan.Saya juga menyertakan perbandingan implementasi yang berbeda karena ini menggambarkan beberapa aspek yang terlewatkan oleh beberapa solusi lain.
Pemakaian
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay)); const args = [30, 20, 15, 10]; await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
Penerapan
async function asyncBatch(args, fn, limit = 8) { // Copy arguments to avoid side effects args = [...args]; const outs = []; while (args.length) { const batch = args.splice(0, limit); const out = await Promise.all(batch.map(fn)); outs.push(...out); } return outs; } async function asyncPool(args, fn, limit = 8) { return new Promise((resolve) => { // Copy arguments to avoid side effect, reverse queue as // pop is faster than shift const argQueue = [...args].reverse(); let count = 0; const outs = []; const pollNext = () => { if (argQueue.length === 0 && count === 0) { resolve(outs); } else { while (count < limit && argQueue.length) { const index = args.length - argQueue.length; const arg = argQueue.pop(); count += 1; const out = fn(arg); const processOut = (out, index) => { outs[index] = out; count -= 1; pollNext(); }; if (typeof out === 'object' && out.then) { out.then(out => processOut(out, index)); } else { processOut(out, index); } } } }; pollNext(); }); }
Perbandingan
// A simple async function that returns after the given delay // and prints its value to allow us to determine the response order const asyncFn = delay => new Promise(resolve => setTimeout(() => { console.log(delay); resolve(delay); }, delay)); // List of arguments to the asyncFn function const args = [30, 20, 15, 10]; // As a comparison of the different implementations, a low concurrency // limit of 2 is used in order to highlight the performance differences. // If a limit greater than or equal to args.length is used the results // would be identical. // Vanilla Promise.all/map combo const out1 = await Promise.all(args.map(arg => asyncFn(arg))); // prints: 10, 15, 20, 30 // total time: 30ms // Pooled implementation const out2 = await asyncPool(args, arg => asyncFn(arg), 2); // prints: 20, 30, 15, 10 // total time: 40ms // Batched implementation const out3 = await asyncBatch(args, arg => asyncFn(arg), 2); // prints: 20, 30, 20, 30 // total time: 45ms console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3 // Conclusion: Execution order and performance is different, // but return order is still identical
Kesimpulan
asyncPool()
harus menjadi solusi terbaik karena memungkinkan permintaan baru untuk dimulai segera setelah permintaan sebelumnya selesai.asyncBatch()
disertakan sebagai perbandingan karena implementasinya lebih mudah dipahami, tetapi performanya harus lebih lambat karena semua permintaan dalam batch yang sama harus diselesaikan untuk memulai batch berikutnya.Dalam contoh yang dibuat-buat ini, vanilla non-limited
Promise.all()
tentu saja yang tercepat, sementara yang lain bisa tampil lebih diinginkan dalam skenario kemacetan dunia nyata.Memperbarui
Pustaka async-pool yang telah disarankan orang lain mungkin merupakan alternatif yang lebih baik untuk implementasi saya karena bekerja hampir sama dan memiliki implementasi yang lebih ringkas dengan penggunaan Promise.race () yang cerdas: https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js
Semoga jawaban saya tetap bisa menjadi nilai pendidikan.
sumber
Ini dia contoh dasar untuk streaming dan 'p-limit'. Ini mengalirkan http read stream ke mongo db.
const stream = require('stream'); const util = require('util'); const pLimit = require('p-limit'); const es = require('event-stream'); const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; const pipeline = util.promisify(stream.pipeline) const outputDBConfig = { dbURL: 'yr-db-url', collection: 'some-collection' }; const limit = pLimit(3); async yrAsyncStreamingFunction(readStream) => { const mongoWriteStream = streamToMongoDB(outputDBConfig); const mapperStream = es.map((data, done) => { let someDataPromise = limit(() => yr_async_call_to_somewhere()) someDataPromise.then( function handleResolve(someData) { data.someData = someData; done(null, data); }, function handleError(error) { done(error) } ); }) await pipeline( readStream, JSONStream.parse('*'), mapperStream, mongoWriteStream ); }
sumber
Jadi saya mencoba membuat beberapa contoh yang ditampilkan berfungsi untuk kode saya, tetapi karena ini hanya untuk skrip impor dan bukan kode produksi, menggunakan paket npm batch- promise merupakan cara termudah bagi saya.
CATATAN: Memerlukan waktu proses untuk mendukung Promise atau menjadi polyfill.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee akan dipanggil setelah setiap batch.
Menggunakan:
batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from 'batch-promises'; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });
sumber
Rekursi adalah jawabannya jika Anda tidak ingin menggunakan pustaka eksternal
downloadAll(someArrayWithData){ var self = this; var tracker = function(next){ return self.someExpensiveRequest(someArrayWithData[next]) .then(function(){ next++;//This updates the next in the tracker function parameter if(next < someArrayWithData.length){//Did I finish processing all my data? return tracker(next);//Go to the next promise } }); } return tracker(0); }
sumber
Inilah yang saya gunakan
Promise.race
, di dalam kode saya di siniconst identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.race(promises) //resolve the rest }
Jika Anda ingin melihat contoh: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
sumber
Promise.race
Jika memungkinkan, saya mencoba mengembangkan hal-hal semacam ini sendiri, daripada pergi ke perpustakaan. Anda akhirnya mempelajari banyak konsep yang sebelumnya tampak menakutkan.
Apa pendapat kalian tentang upaya ini:
(Saya telah banyak memikirkannya dan menurut saya ini berhasil, tetapi tunjukkan jika tidak atau ada sesuatu yang secara fundamental salah)
class Pool{ constructor(maxAsync) { this.maxAsync = maxAsync; this.asyncOperationsQueue = []; this.currentAsyncOperations = 0 } runAnother() { if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) { this.currentAsyncOperations += 1; this.asyncOperationsQueue.pop()() .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() }) } } add(f){ // the argument f is a function of signature () => Promise this.runAnother(); return new Promise((resolve, reject) => { this.asyncOperationsQueue.push( () => f().then(resolve).catch(reject) ) }) } } //####################################################### // TESTS //####################################################### function dbCall(id, timeout, fail) { return new Promise((resolve, reject) => { setTimeout(() => { if (fail) { reject(`Error for id ${id}`); } else { resolve(id); } }, timeout) } ) } const dbQuery1 = () => dbCall(1, 5000, false); const dbQuery2 = () => dbCall(2, 5000, false); const dbQuery3 = () => dbCall(3, 5000, false); const dbQuery4 = () => dbCall(4, 5000, true); const dbQuery5 = () => dbCall(5, 5000, false); const cappedPool = new Pool(2); const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
Pendekatan ini menyediakan API yang bagus, mirip dengan kumpulan thread di scala / java.
Setelah membuat satu contoh kumpulan dengan
const cappedPool = new Pool(2)
, Anda memberikan janji padanya dengan sederhanacappedPool.add(() => myPromise)
.Jelas kita harus memastikan bahwa promise tidak segera dimulai dan itulah mengapa kita harus "menyediakannya dengan malas" dengan bantuan fungsi.
Yang terpenting, perhatikan bahwa hasil dari metode tersebut
add
adalah Promise yang akan diselesaikan / diselesaikan dengan nilai promise asli Anda ! Ini membuat penggunaan yang sangat intuitif.const resultPromise = cappedPool.add( () => dbCall(...)) resultPromise .then( actualResult => { // Do something with the result form the DB } )
sumber
Sayangnya tidak ada cara untuk melakukannya dengan asli Promise.all, jadi Anda harus kreatif.
Ini adalah cara tercepat tercepat yang dapat saya temukan tanpa menggunakan pustaka luar.
Itu menggunakan fitur javascript yang lebih baru yang disebut iterator. Iterator pada dasarnya melacak item apa yang telah diproses dan apa yang belum.
Untuk menggunakannya dalam kode, Anda membuat larik fungsi asinkron. Setiap fungsi async meminta iterator yang sama untuk item berikutnya yang perlu diproses. Setiap fungsi memproses itemnya sendiri secara asynchronous, dan setelah selesai meminta iterator untuk yang baru. Setelah iterator kehabisan item, semua fungsi selesai.
Terima kasih kepada @Endless untuk inspirasinya.
var items = [ "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", ]; var concurrency = 5 Array(concurrency).fill(items.entries()).map(async (cursor) => { for(let [index, url] of cursor){ console.log("getting url is ", index, url); // run your async task instead of this next line var text = await fetch(url).then(res => res.text()); console.log("text is", text.slice(0,20)); } })
sumber
Begitu banyak solusi bagus. Saya memulai dengan solusi elegan yang diposting oleh @Endless dan berakhir dengan metode ekstensi kecil ini yang tidak menggunakan pustaka eksternal apa pun juga tidak berjalan dalam batch (meskipun mengasumsikan Anda memiliki fitur seperti async, dll):
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; };
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; }; const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => { let n = (i + 1) * 5; setTimeout(() => { console.log(`Did nothing for ${n} seconds`); resolve(n); }, n * 1000); })); var results = Promise.allWithLimit(demoTasks);
sumber