Saya perlu memanggil layanan upstream (Azure Blob Service) untuk mendorong data ke OutputStream, yang kemudian saya perlu berbalik dan mendorongnya kembali ke klien, melalui akka. Tanpa akka (dan hanya kode servlet), saya hanya akan mendapatkan ServletOutputStream dan meneruskannya ke metode layanan azure.
Yang paling dekat yang bisa saya coba temukan, dan jelas ini salah, adalah sesuatu seperti ini
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Idenya adalah saya memanggil layanan upstream untuk mendapatkan outputstream yang dihuni dengan memanggil blobClient.download (os);
Sepertinya fungsi lambda dipanggil dan kembali, tetapi kemudian gagal, karena tidak ada data atau sesuatu. Seolah-olah saya tidak seharusnya memiliki fungsi lambda melakukan pekerjaan, tetapi mungkin mengembalikan beberapa objek yang berfungsi? Tidak yakin.
Bagaimana caranya?
sumber
download
? Apakah ini mengalirkan data keos
dan hanya mengembalikan setelah data selesai ditulis?Jawaban:
Masalah sebenarnya di sini adalah bahwa Azure API tidak dirancang untuk tekanan balik. Tidak ada cara untuk aliran output untuk memberi sinyal kembali ke Azure bahwa itu tidak siap untuk lebih banyak data. Dengan kata lain: jika Azure mendorong data lebih cepat dari yang dapat Anda konsumsi, harus ada beberapa kegagalan buffer overflow yang buruk di suatu tempat.
Menerima kenyataan ini, hal terbaik berikutnya yang dapat kita lakukan adalah:
Source.lazySource
untuk hanya mulai mengunduh data ketika ada permintaan hilir (alias. Sumber sedang dijalankan dan data sedang diminta).download
panggilan di beberapa utas lainnya agar terus dijalankan tanpa memblokir sumber agar tidak dikembalikan. Setelah cara untuk melakukan ini adalah denganFuture
(Saya tidak yakin apa praktik terbaik Java, tetapi harus bekerja dengan baik cara baik). Meskipun awalnya tidak masalah, Anda mungkin perlu memilih konteks eksekusi selainsystem.dispatcher
- itu semua tergantung pada apakahdownload
memblokir atau tidak.Saya minta maaf sebelumnya jika kode Java ini salah bentuk - Saya menggunakan Akka dengan Scala, jadi ini semua dari melihat API Java Akka dan referensi sintaksis Java.
sumber
Dalam
OutputStream
hal ini adalah "nilai terwujud" dariSource
dan itu hanya akan dibuat setelah aliran dijalankan (atau "terwujud" menjadi aliran berjalan). Menjalankannya di luar kendali Anda karena Anda menyerahkanSource
ke Akka HTTP dan nantinya akan benar-benar menjalankan sumber Anda..mapMaterializedValue(matval -> ...)
biasanya digunakan untuk mengubah nilai terwujud tetapi karena dipanggil sebagai bagian dari materialisasi Anda dapat menggunakannya untuk melakukan efek samping seperti mengirim matval dalam pesan, seperti yang sudah Anda ketahui, tidak perlu ada yang salah dengan bahkan jika itu terlihat funky. Penting untuk dipahami bahwa aliran tidak akan menyelesaikan perwujudannya dan menjadi berjalan sampai lambda itu selesai. Ini berarti masalah jikadownload()
memblokir daripada menghentikan beberapa pekerjaan pada utas yang berbeda dan segera kembali.Namun ada solusi lain:,
Source.preMaterialize()
itu mewujudkan sumber dan memberi AndaPair
nilai terwujud dan yang baruSource
yang dapat digunakan untuk mengkonsumsi sumber yang sudah mulai:Perhatikan bahwa ada beberapa hal tambahan untuk dipikirkan dalam kode Anda, yang paling penting jika
blobClient.download(os)
panggilan memblokir sampai selesai dan Anda memanggilnya dari aktor, dalam hal ini Anda harus memastikan bahwa aktor Anda tidak membuat kelaparan operator dan berhenti aktor lain dalam aplikasi Anda dari mengeksekusi (lihat Akka docs: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).sumber