Misalkan saya memiliki beberapa masa depan dan perlu menunggu sampai salah satu dari mereka gagal atau semuanya berhasil.
Sebagai contoh: Misalkan ada 3 futures: f1
, f2
, f3
.
Jika
f1
berhasil danf2
gagal saya tidak menungguf3
(dan mengembalikan kegagalan ke klien).Jika
f2
gagal saatf1
danf3
masih berjalan saya tidak menunggu mereka (dan mengembalikan kegagalan )Jika
f1
berhasil kemudianf2
berhasil saya terus menungguf3
.
Bagaimana Anda akan menerapkannya?
scala
concurrency
future
Michael
sumber
sumber
Jawaban:
Anda bisa menggunakan pemahaman-untuk sebagai berikut:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
Dalam contoh ini, kontrak berjangka 1, 2, dan 3 dimulai secara paralel. Kemudian, dalam pemahaman kita menunggu sampai hasil 1 dan kemudian 2 dan kemudian 3 tersedia. Jika 1 atau 2 gagal, kami tidak akan menunggu 3 lagi. Jika ketiganya berhasil, maka
aggFut
val akan menahan tupel dengan 3 slot, sesuai dengan hasil dari 3 futures.Sekarang jika Anda membutuhkan perilaku di mana Anda ingin berhenti menunggu jika mengatakan fut2 gagal terlebih dahulu, semuanya menjadi sedikit lebih rumit. Dalam contoh di atas, Anda harus menunggu fut1 selesai sebelum menyadari fut2 gagal. Untuk mengatasinya, Anda bisa mencoba sesuatu seperti ini:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Sekarang ini berfungsi dengan benar, tetapi masalahnya berasal dari mengetahui mana yang
Future
harus dihapus dariMap
ketika seseorang telah berhasil diselesaikan. Selama Anda memiliki cara untuk mengkorelasikan hasil dengan Masa Depan yang melahirkan hasil tersebut, maka sesuatu seperti ini berhasil. Itu hanya secara rekursif terus menghapus Futures yang sudah selesai dari Peta dan kemudian memanggilFuture.firstCompletedOf
yang tersisaFutures
sampai tidak ada yang tersisa, mengumpulkan hasil di sepanjang jalan. Ini tidak bagus, tetapi jika Anda benar-benar membutuhkan perilaku yang Anda bicarakan, maka ini, atau sesuatu yang serupa bisa berhasil.sumber
fut2
sebelumnya gagalfut1
? Akankah kita masih menunggufut1
dalam kasus itu? Jika kita mau, itu bukan yang saya inginkan.onFailure
handler untukfut2
gagal cepat, danonSuccess
padaaggFut
ke pegangan sukses. Sukses diaggFut
implisitfut2
berhasil diselesaikan, jadi Anda hanya memiliki satu penangan yang dipanggil.Anda dapat menggunakan promise, dan mengirimkannya ke kegagalan pertama, atau kesuksesan agregat akhir yang diselesaikan:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Kemudian Anda dapat
Await
pada hasil ituFuture
jika Anda ingin memblokir, atau hanyamap
menjadi sesuatu yang lain.Perbedaan dengan for pemahaman adalah bahwa di sini Anda mendapatkan kesalahan dari yang pertama gagal, sedangkan dengan untuk pemahaman Anda mendapatkan kesalahan pertama dalam urutan traversal dari kumpulan masukan (bahkan jika yang lain gagal terlebih dahulu). Sebagai contoh:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
Dan:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
sumber
Berikut adalah solusi tanpa menggunakan aktor.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
sumber
Anda bisa melakukan ini dengan masa depan saja. Inilah salah satu implementasinya. Perhatikan bahwa itu tidak akan menghentikan eksekusi lebih awal! Dalam hal ini, Anda perlu melakukan sesuatu yang lebih canggih (dan mungkin menerapkan sendiri interupsi tersebut). Tetapi jika Anda tidak ingin terus menunggu sesuatu yang tidak akan berhasil, kuncinya adalah tetap menunggu hal pertama selesai, dan berhenti ketika tidak ada yang tersisa atau Anda mencapai pengecualian:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Berikut ini contohnya dalam tindakan ketika semuanya berfungsi dengan baik:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Tetapi ketika terjadi kesalahan:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
sumber
Untuk tujuan ini saya akan menggunakan aktor Akka. Berbeda dengan pemahaman-untuk, itu gagal segera setelah masa depan gagal, jadi ini sedikit lebih efisien dalam pengertian itu.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Kemudian, buat aktornya, kirim pesan kepadanya (sehingga ia akan tahu ke mana harus mengirim balasannya) dan menunggu balasannya.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
sumber
Pertanyaan ini telah dijawab tetapi saya memposting solusi kelas nilai saya (kelas nilai ditambahkan di 2.10) karena tidak ada satu pun di sini. Silakan mengkritik.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture adalah pembungkus Future tanpa overhead yang mengubah peta / flatMap Future default dari do-this-then-that menjadi menggabungkan-semua-dan-gagal-jika-ada-gagal. Pemakaian:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
Dalam contoh di atas, f1, f2 dan f3 akan berjalan secara bersamaan dan jika ada yang gagal dalam urutan apa pun, tupel di masa depan akan segera gagal.
sumber
Anda mungkin ingin memeriksa API Masa Depan Twitter. Terutama metode Future.collect. Itu persis seperti yang Anda inginkan: https://twitter.github.io/scala_school/finagle.html
Kode sumber Future.scala tersedia di sini: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
sumber
Anda dapat menggunakan ini:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
sumber