melewati aliran Akka ke layanan hulu untuk diisi

9

Saya perlu memanggil layanan upstream (Azure Blob Service) untuk mendorong data ke OutputStream, yang kemudian saya perlu berbalik dan mendorongnya kembali ke klien, melalui akka. Tanpa akka (dan hanya kode servlet), saya hanya akan mendapatkan ServletOutputStream dan meneruskannya ke metode layanan azure.

Yang paling dekat yang bisa saya coba temukan, dan jelas ini salah, adalah sesuatu seperti ini

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

Idenya adalah saya memanggil layanan upstream untuk mendapatkan outputstream yang dihuni dengan memanggil blobClient.download (os);

Sepertinya fungsi lambda dipanggil dan kembali, tetapi kemudian gagal, karena tidak ada data atau sesuatu. Seolah-olah saya tidak seharusnya memiliki fungsi lambda melakukan pekerjaan, tetapi mungkin mengembalikan beberapa objek yang berfungsi? Tidak yakin.

Bagaimana caranya?

MeBigFatGuy
sumber
Apa perilaku download? Apakah ini mengalirkan data ke osdan hanya mengembalikan setelah data selesai ditulis?
Alec

Jawaban:

2

Masalah sebenarnya di sini adalah bahwa Azure API tidak dirancang untuk tekanan balik. Tidak ada cara untuk aliran output untuk memberi sinyal kembali ke Azure bahwa itu tidak siap untuk lebih banyak data. Dengan kata lain: jika Azure mendorong data lebih cepat dari yang dapat Anda konsumsi, harus ada beberapa kegagalan buffer overflow yang buruk di suatu tempat.

Menerima kenyataan ini, hal terbaik berikutnya yang dapat kita lakukan adalah:

  • Gunakan Source.lazySourceuntuk hanya mulai mengunduh data ketika ada permintaan hilir (alias. Sumber sedang dijalankan dan data sedang diminta).
  • Letakkan downloadpanggilan di beberapa utas lainnya agar terus dijalankan tanpa memblokir sumber agar tidak dikembalikan. Setelah cara untuk melakukan ini adalah dengan Future(Saya tidak yakin apa praktik terbaik Java, tetapi harus bekerja dengan baik cara baik). Meskipun awalnya tidak masalah, Anda mungkin perlu memilih konteks eksekusi selain system.dispatcher- itu semua tergantung pada apakah downloadmemblokir atau tidak.

Saya minta maaf sebelumnya jika kode Java ini salah bentuk - Saya menggunakan Akka dengan Scala, jadi ini semua dari melihat API Java Akka dan referensi sintaksis Java.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
Alec
sumber
Fantastis. terimakasih banyak. Hasil edit kecil untuk contoh Anda adalah: Futures.future (() -> {blobClient.download (pair.first ()); return pair.first ();}, system.getDispatcher ());
MeBigFatGuy
@ MeBigFatGuy Benar, terima kasih!
Alec
1

Dalam OutputStreamhal ini adalah "nilai terwujud" dari Sourcedan itu hanya akan dibuat setelah aliran dijalankan (atau "terwujud" menjadi aliran berjalan). Menjalankannya di luar kendali Anda karena Anda menyerahkan Sourceke Akka HTTP dan nantinya akan benar-benar menjalankan sumber Anda.

.mapMaterializedValue(matval -> ...)biasanya digunakan untuk mengubah nilai terwujud tetapi karena dipanggil sebagai bagian dari materialisasi Anda dapat menggunakannya untuk melakukan efek samping seperti mengirim matval dalam pesan, seperti yang sudah Anda ketahui, tidak perlu ada yang salah dengan bahkan jika itu terlihat funky. Penting untuk dipahami bahwa aliran tidak akan menyelesaikan perwujudannya dan menjadi berjalan sampai lambda itu selesai. Ini berarti masalah jika download()memblokir daripada menghentikan beberapa pekerjaan pada utas yang berbeda dan segera kembali.

Namun ada solusi lain:, Source.preMaterialize()itu mewujudkan sumber dan memberi Anda Pairnilai terwujud dan yang baru Sourceyang dapat digunakan untuk mengkonsumsi sumber yang sudah mulai:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Perhatikan bahwa ada beberapa hal tambahan untuk dipikirkan dalam kode Anda, yang paling penting jika blobClient.download(os)panggilan memblokir sampai selesai dan Anda memanggilnya dari aktor, dalam hal ini Anda harus memastikan bahwa aktor Anda tidak membuat kelaparan operator dan berhenti aktor lain dalam aplikasi Anda dari mengeksekusi (lihat Akka docs: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

johanandren
sumber
1
Terima kasih atas tanggapannya. Saya tidak melihat bagaimana ini bisa berhasil? kemana byte pergi ketika blobClient.download (os) dipanggil (jika saya memanggilnya sendiri)? Bayangkan ada terabyte data yang menunggu untuk ditulis. Sepertinya saya bahwa panggilan blobClient.download harus dipanggil dari panggilan sender.tell sehingga ini pada dasarnya adalah operasi mirip IOUtils.copy .. Menggunakan preMaterialize saya tidak bisa melihat bagaimana itu terjadi?
MeBigFatGuy
OutputStream memiliki buffer internal, ia akan mulai menerima penulisan sampai buffer itu terisi, jika downstream async belum mulai mengkonsumsi elemen pada saat itu akan memblokir utas penulisan (itulah sebabnya saya menyebutkan bahwa penting untuk menangani pemblokiran).
johanandren
1
Tetapi jika saya preMaterialize, dan mendapatkan OutputStream, maka itu adalah kode saya yang melakukan blobClient.download (os); benar? Itu berarti harus selesai sebelum saya bisa melanjutkan, yang tidak mungkin.
MeBigFatGuy
Jika unduhan (os) tidak memotong utas, Anda harus menanganinya agar diblokir dan pastikan itu tidak menghentikan operasi lain. Salah satu caranya adalah dengan memotong utas untuk melakukan pekerjaan, yang lain akan menanggapi dari aktor terlebih dahulu dan kemudian melakukan pekerjaan pemblokiran di sana, dalam hal ini Anda harus memastikan aktor tidak kelaparan aktor lain, lihat tautan di akhir Jawabanku.
johanandren
pada titik ini saya hanya mencoba membuatnya bekerja sama sekali. Bahkan tidak dapat memproses file 10 byte.
MeBigFatGuy