Perpustakaan Akka Streams sudah dilengkapi dengan banyak dokumentasi . Namun, masalah utama bagi saya adalah menyediakan terlalu banyak materi - saya merasa sangat kewalahan dengan jumlah konsep yang harus saya pelajari. Banyak contoh yang ditampilkan di sana terasa sangat berat dan tidak mudah diterjemahkan ke kasus penggunaan dunia nyata dan karenanya cukup esoteris. Saya pikir itu memberikan terlalu banyak detail tanpa menjelaskan bagaimana membangun semua blok bangunan bersama dan bagaimana sebenarnya membantu menyelesaikan masalah tertentu.
Ada sumber, sink, aliran, tahapan grafik, grafik parsial, materialisasi, grafik DSL dan banyak lagi dan saya tidak tahu harus mulai dari mana. The panduan cepat ini dimaksudkan untuk menjadi tempat awal tapi saya tidak mengerti. Itu hanya melempar konsep yang disebutkan di atas tanpa menjelaskannya. Selain itu, contoh kode tidak dapat dieksekusi - ada bagian yang hilang yang membuat saya hampir tidak mungkin mengikuti teks.
Adakah yang bisa menjelaskan sumber konsep, tenggelam, mengalir, tahapan grafik, grafik parsial, materialisasi dan mungkin beberapa hal lain yang saya lewatkan dengan kata-kata sederhana dan dengan contoh mudah yang tidak menjelaskan setiap detail tunggal (dan yang mungkin tidak diperlukan pula di awal mula)?
sumber
Jawaban:
Jawaban ini didasarkan pada
akka-stream
versi2.4.2
. API dapat sedikit berbeda di versi lain. Ketergantungan dapat dikonsumsi oleh sbt :Baiklah, mari kita mulai. API Akka Streams terdiri dari tiga jenis utama. Berbeda dengan Aliran Reaktif , tipe ini jauh lebih kuat dan karenanya lebih kompleks. Diasumsikan bahwa untuk semua contoh kode definisi berikut sudah ada:
The
import
pernyataan yang diperlukan untuk deklarasi tipe.system
mewakili sistem aktor Akka danmaterializer
mewakili konteks evaluasi aliran. Dalam kasus kami, kami menggunakan aActorMaterializer
, yang berarti aliran dievaluasi di atas aktor. Kedua nilai ditandai sebagaiimplicit
, yang memberikan kompilator Scala kemungkinan untuk menyuntikkan kedua dependensi ini secara otomatis kapan pun mereka dibutuhkan. Kami juga mengimporsystem.dispatcher
, yang merupakan konteks eksekusi untukFutures
.API Baru
Akka Streams memiliki properti-properti utama ini:
Materializer
.Source
,Sink
danFlow
. Blok penyusun membentuk grafik yang evaluasinya didasarkan padaMaterializer
dan perlu dipicu secara eksplisit.Berikut ini pengantar yang lebih dalam tentang bagaimana menggunakan tiga jenis utama akan diberikan.
Sumber
A
Source
adalah pencipta data, berfungsi sebagai sumber input ke aliran. Masing-masingSource
memiliki saluran output tunggal dan tidak ada saluran input. Semua data mengalir melalui saluran output ke apa pun yang terhubung ke InternetSource
.Gambar diambil dari boldradius.com .
A
Source
dapat dibuat dengan berbagai cara:Dalam kasus di atas kami mengumpankan
Source
dengan data hingga, yang berarti mereka akan berakhir pada akhirnya. Orang tidak boleh lupa, bahwa Streaming Reaktif malas dan asinkron secara default. Ini berarti seseorang harus secara eksplisit meminta evaluasi aliran. Dalam Akka Streams ini dapat dilakukan melaluirun*
metode. TidakrunForeach
ada bedanya denganforeach
fungsi terkenal - melaluirun
penambahan itu membuat eksplisit bahwa kami meminta evaluasi aliran. Karena data terbatas membosankan, kami melanjutkan dengan data tak terbatas:Dengan
take
metode ini kita dapat membuat titik berhenti buatan yang mencegah kita mengevaluasi tanpa batas. Karena dukungan aktor sudah ada di dalamnya, kami juga dapat dengan mudah memberi makan aliran dengan pesan yang dikirim ke aktor:Kita dapat melihat bahwa
Futures
dieksekusi secara asinkron pada berbagai utas, yang menjelaskan hasilnya. Dalam contoh di atas tidak diperlukan buffer untuk elemen yang masuk dan oleh karena ituOverflowStrategy.fail
kita dapat mengonfigurasi bahwa aliran harus gagal pada buffer overflow. Terutama melalui antarmuka aktor ini, kami dapat memberi makan aliran melalui sumber data apa pun. Tidak masalah jika data dibuat oleh utas yang sama, oleh yang berbeda, oleh proses lain atau jika mereka berasal dari sistem jarak jauh melalui Internet.Wastafel
A
Sink
pada dasarnya kebalikan dari aSource
. Ini adalah titik akhir dari sebuah aliran dan karenanya mengkonsumsi data. ASink
memiliki saluran input tunggal dan tidak ada saluran output.Sinks
sangat diperlukan ketika kita ingin menentukan perilaku pengumpul data dengan cara yang dapat digunakan kembali dan tanpa mengevaluasi aliran. Metode yang sudah dikenalrun*
tidak mengizinkan properti ini bagi kami, oleh karena itu lebih disukai untuk menggunakannyaSink
.Gambar diambil dari boldradius.com .
Contoh singkat
Sink
aksi dalam:Menghubungkan a
Source
ke aSink
dapat dilakukan denganto
metode ini. Ia mengembalikan apa yang disebutRunnableFlow
, yang nantinya akan kita lihat bentuk khusus dari aFlow
- aliran yang dapat dieksekusi hanya dengan memanggilrun()
metodenya.Gambar diambil dari boldradius.com .
Tentu saja mungkin untuk meneruskan semua nilai yang diterima oleh aktor:
Mengalir
Sumber data dan sink sangat bagus jika Anda memerlukan koneksi antara aliran Akka dan sistem yang sudah ada tetapi orang tidak dapat melakukan apa pun dengan mereka. Flows adalah bagian terakhir yang hilang dalam abstraksi dasar Akka Streams. Mereka bertindak sebagai penghubung antara aliran yang berbeda dan dapat digunakan untuk mengubah elemen-elemennya.
Gambar diambil dari boldradius.com .
Jika a
Flow
terhubung keSource
yang baruSource
adalah hasilnya. Demikian juga,Flow
terhubung keSink
menciptakan yang baruSink
. DanFlow
terhubung dengan aSource
danSink
hasil dalam aRunnableFlow
. Oleh karena itu, mereka duduk di antara input dan saluran output tetapi dengan sendirinya tidak sesuai dengan salah satu rasa selama mereka tidak terhubung dengan salah satuSource
atau aSink
.Gambar diambil dari boldradius.com .
Untuk mendapatkan pemahaman yang lebih baik
Flows
, kita akan melihat beberapa contoh:Melalui
via
metode ini kita dapat menghubungkan aSource
dengan aFlow
. Kita perlu menentukan tipe input karena kompiler tidak dapat menyimpulkannya untuk kita. Seperti yang sudah kita lihat dalam contoh sederhana ini, arusinvert
dandouble
sepenuhnya independen dari produsen dan konsumen data. Mereka hanya mengubah data dan meneruskannya ke saluran keluaran. Ini berarti bahwa kami dapat menggunakan kembali aliran di antara banyak aliran:s1
dans2
mewakili aliran yang sama sekali baru - mereka tidak membagikan data apa pun melalui blok bangunan mereka.Streaming Data Tidak Terbatas
Sebelum melanjutkan, pertama-tama kita harus meninjau kembali beberapa aspek kunci dari Aliran Reaktif. Sejumlah elemen tanpa batas dapat tiba di titik mana pun dan dapat mengalirkan aliran di berbagai negara. Selain dari aliran yang dapat dijalankan, yang merupakan keadaan biasa, aliran dapat dihentikan baik melalui kesalahan atau melalui sinyal yang menunjukkan bahwa tidak ada data lebih lanjut akan datang. Aliran dapat dimodelkan dengan cara grafis dengan menandai peristiwa pada timeline seperti halnya di sini:
Gambar diambil dari Pengantar Pemrograman Reaktif yang Anda lewatkan .
Kita telah melihat aliran yang bisa dijalankan dalam contoh-contoh bagian sebelumnya. Kami mendapatkan
RunnableGraph
setiap kali aliran benar-benar dapat terwujud, yang berarti bahwaSink
terhubung ke aSource
. Sejauh ini kami selalu terwujud dengan nilaiUnit
, yang dapat dilihat pada jenis:Untuk
Source
danSink
tipe tipe kedua dan untukFlow
tipe tipe ketiga menunjukkan nilai material. Sepanjang jawaban ini, makna penuh dari materialisasi tidak akan dijelaskan. Namun, perincian lebih lanjut tentang materialisasi dapat ditemukan di dokumentasi resmi . Untuk saat ini satu-satunya hal yang perlu kita ketahui adalah bahwa nilai yang terwujud adalah apa yang kita dapatkan ketika kita menjalankan stream. Karena kami hanya tertarik pada efek samping sejauh ini, kami mendapatUnit
nilai terwujudnya. Pengecualian untuk ini adalah materialisasi dari wastafel, yang menghasilkan aFuture
. Itu memberi kami kembaliFuture
, karena nilai ini dapat menunjukkan ketika aliran yang terhubung ke wastafel telah berakhir. Sejauh ini, contoh kode sebelumnya bagus untuk menjelaskan konsep tetapi mereka juga membosankan karena kita hanya berurusan dengan aliran terbatas atau dengan yang tak terbatas sangat sederhana. Untuk membuatnya lebih menarik, berikut ini aliran asinkron dan tidak terikat penuh akan dijelaskan.Contoh ClickStream
Sebagai contoh, kami ingin memiliki aliran yang menangkap peristiwa klik. Untuk membuatnya lebih menantang, katakanlah kita juga ingin mengelompokkan acara klik yang terjadi dalam waktu singkat setelah satu sama lain. Dengan cara ini kami dapat dengan mudah menemukan klik dua kali lipat, tiga kali lipat, atau sepuluh kali lipat. Selain itu, kami ingin memfilter semua klik tunggal. Ambil napas dalam-dalam dan bayangkan bagaimana Anda akan memecahkan masalah itu secara imperatif. Saya yakin tidak ada yang bisa menerapkan solusi yang berfungsi dengan benar pada percobaan pertama. Secara reaktif masalah ini sepele untuk dipecahkan. Sebenarnya, solusinya sangat sederhana dan mudah untuk diterapkan sehingga kita bahkan dapat mengekspresikannya dalam diagram yang secara langsung menggambarkan perilaku kode:
Gambar diambil dari Pengantar Pemrograman Reaktif yang Anda lewatkan .
Kotak abu-abu adalah fungsi yang menggambarkan bagaimana satu aliran diubah menjadi aliran lain. Dengan
throttle
fungsi yang kami kumpulkan klik dalam 250 milidetik,map
danfilter
fungsi harus jelas. Bola warna mewakili suatu peristiwa dan panah menggambarkan bagaimana mereka mengalir melalui fungsi kita. Kemudian dalam langkah-langkah pemrosesan, kami mendapatkan semakin sedikit elemen yang mengalir melalui aliran kami, karena kami mengelompokkannya dan menyaringnya. Kode untuk gambar ini akan terlihat seperti ini:Seluruh logika hanya dapat direpresentasikan dalam empat baris kode! Di Scala, kita bisa menulisnya lebih pendek:
Definisi
clickStream
sedikit lebih kompleks tetapi ini hanya terjadi karena contoh program berjalan pada JVM, di mana menangkap peristiwa klik tidak mudah dilakukan. Komplikasi lain adalah bahwa Akka secara default tidak menyediakanthrottle
fungsi. Sebaliknya kami harus menulisnya sendiri. Karena fungsi ini (seperti halnya untukmap
ataufilter
fungsi) dapat digunakan kembali di kasus penggunaan yang berbeda saya tidak menghitung garis ini untuk jumlah baris yang kami butuhkan untuk melaksanakan logika. Namun dalam bahasa imperatif, adalah normal bahwa logika tidak dapat digunakan kembali dengan mudah dan bahwa langkah-langkah logis yang berbeda terjadi semuanya di satu tempat alih-alih diterapkan secara berurutan, yang berarti bahwa kita mungkin akan salah mengubah kode kita dengan logika pelambatan. Contoh kode lengkap tersedia sebagaiinti dan tidak akan dibahas di sini lebih jauh.Contoh SimpleWebServer
Yang seharusnya dibahas adalah contoh lain. Meskipun aliran klik adalah contoh yang bagus untuk membiarkan Akka Streams menangani contoh dunia nyata, ia tidak memiliki kekuatan untuk menunjukkan eksekusi paralel dalam aksi. Contoh berikutnya harus mewakili server web kecil yang dapat menangani beberapa permintaan secara paralel. Server web harus dapat menerima koneksi masuk dan menerima urutan byte dari mereka yang mewakili tanda ASCII yang dapat dicetak. Urutan atau string byte ini harus dipisah di semua karakter-baris baru menjadi bagian-bagian yang lebih kecil. Setelah itu, server akan menanggapi klien dengan masing-masing garis yang dipisah. Atau, itu bisa melakukan sesuatu yang lain dengan garis dan memberikan token jawaban khusus, tetapi kami ingin tetap sederhana dalam contoh ini dan karena itu tidak memperkenalkan fitur mewah. Ingat, server harus dapat menangani beberapa permintaan secara bersamaan, yang pada dasarnya berarti bahwa tidak ada permintaan yang diizinkan untuk memblokir permintaan lain dari eksekusi lebih lanjut. Memecahkan semua persyaratan ini bisa sulit dengan cara yang sangat penting - dengan Akka Streams, kita seharusnya tidak perlu lebih dari beberapa baris untuk menyelesaikan semua ini. Pertama, mari kita lihat ikhtisar server itu sendiri:
Pada dasarnya, hanya ada tiga blok bangunan utama. Yang pertama harus menerima koneksi masuk. Yang kedua perlu menangani permintaan yang masuk dan yang ketiga perlu mengirim respons. Menerapkan ketiga blok bangunan ini hanya sedikit lebih rumit daripada menerapkan aliran klik:
Fungsi ini
mkServer
mengambil (selain dari alamat dan port server) juga sistem aktor dan materializer sebagai parameter implisit. Aliran kontrol server diwakili olehbinding
, yang mengambil sumber koneksi masuk dan meneruskannya ke wastafel koneksi masuk. Di dalamconnectionHandler
, yang merupakan wastafel kami, kami menangani setiap koneksi dengan aliranserverLogic
, yang akan dijelaskan nanti.binding
mengembalikan aFuture
, yang selesai ketika server sudah mulai atau gagal, yang bisa jadi kasus ketika port sudah diambil oleh proses lain. Namun kode tersebut tidak sepenuhnya mencerminkan grafik karena kami tidak dapat melihat blok penyusun yang menangani respons. Alasan untuk ini adalah bahwa koneksi sudah menyediakan logika ini dengan sendirinya. Ini adalah aliran dua arah dan bukan hanya aliran satu arah seperti aliran yang telah kita lihat dalam contoh sebelumnya. Seperti halnya untuk materialisasi, aliran kompleks seperti itu tidak akan dijelaskan di sini. The dokumentasi resmi memiliki banyak bahan untuk menutupi grafik aliran yang lebih kompleks. Untuk saat ini sudah cukup untuk mengetahui yangTcp.IncomingConnection
mewakili koneksi yang tahu bagaimana menerima permintaan dan bagaimana mengirim tanggapan. Bagian yang masih hilang adalahserverLogic
blok bangunan. Ini bisa terlihat seperti ini:Sekali lagi, kita dapat membagi logika menjadi beberapa blok bangunan sederhana yang semuanya membentuk alur program kita. Pertama kita ingin membagi urutan byte dalam garis, yang harus kita lakukan setiap kali kita menemukan karakter baris baru. Setelah itu, byte dari setiap baris perlu dikonversi ke string karena bekerja dengan byte mentah adalah rumit. Secara keseluruhan kami dapat menerima aliran biner dari protokol yang rumit, yang akan membuat bekerja dengan data mentah yang masuk sangat menantang. Setelah kami memiliki string yang dapat dibaca, kami dapat membuat jawaban. Untuk alasan kesederhanaan, jawabannya bisa apa saja dalam kasus kami. Pada akhirnya, kita harus mengonversi kembali jawaban kita ke urutan byte yang dapat dikirim melalui kabel. Kode untuk seluruh logika mungkin terlihat seperti ini:
Kita sudah tahu bahwa itu
serverLogic
adalah aliran yang membutuhkanByteString
dan harus menghasilkan aByteString
. Dengandelimiter
kita dapat membaginya menjadiByteString
bagian-bagian yang lebih kecil - dalam kasus kami itu perlu terjadi setiap kali karakter baris baru terjadi.receiver
adalah aliran yang mengambil semua urutan byte split dan mengubahnya menjadi string. Ini tentu saja konversi yang berbahaya, karena hanya karakter ASCII yang dapat dicetak yang harus dikonversi ke string tetapi untuk kebutuhan kita cukup bagus.responder
adalah komponen terakhir dan bertanggung jawab untuk membuat jawaban dan mengubah jawaban kembali ke urutan byte. Berbeda dengan gambar, kami tidak membagi komponen terakhir ini menjadi dua, karena logikanya sepele. Pada akhirnya, kami menghubungkan semua aliran melaluivia
fungsi. Pada titik ini orang mungkin bertanya apakah kami merawat properti multi-pengguna yang disebutkan di awal. Dan memang kami melakukannya meskipun itu mungkin tidak segera jelas. Dengan melihat grafik ini, seharusnya menjadi lebih jelas:The
serverLogic
komponen tidak lain adalah sebuah aliran yang berisi arus yang lebih kecil. Komponen ini mengambil input, yang merupakan permintaan, dan menghasilkan output, yang merupakan respons. Karena aliran dapat dibangun beberapa kali dan semuanya bekerja secara independen satu sama lain, kami mencapai melalui properti properti multi-pengguna kami. Setiap permintaan ditangani dalam permintaannya sendiri dan oleh karena itu permintaan jangka pendek dapat mengalahkan permintaan jangka panjang yang telah dimulai sebelumnya. Jika Anda bertanya-tanya, definisiserverLogic
yang ditunjukkan sebelumnya tentu saja dapat ditulis jauh lebih pendek dengan menguraikan sebagian besar definisi dalamnya:Tes server web mungkin terlihat seperti ini:
Agar contoh kode di atas berfungsi dengan benar, pertama-tama kita perlu menjalankan server, yang digambarkan oleh
startServer
skrip:Contoh kode lengkap dari server TCP sederhana ini dapat ditemukan di sini . Kami tidak hanya dapat menulis server dengan Akka Streams tetapi juga klien. Ini mungkin terlihat seperti ini:
Kode lengkap klien TCP dapat ditemukan di sini . Kode ini terlihat sangat mirip tetapi berbeda dengan server kami tidak perlu lagi mengelola koneksi yang masuk.
Grafik Kompleks
Pada bagian sebelumnya kita telah melihat bagaimana kita dapat membangun program sederhana dari aliran. Namun, pada kenyataannya seringkali tidak cukup hanya mengandalkan fungsi yang sudah terpasang untuk membangun aliran yang lebih kompleks. Jika kami ingin dapat menggunakan Akka Streams untuk program yang sewenang-wenang, kami perlu tahu bagaimana membangun struktur kontrol kustom kami sendiri dan aliran yang dapat digabung yang memungkinkan kami menangani kompleksitas aplikasi kami. Berita baiknya adalah Akka Streams dirancang untuk disesuaikan dengan kebutuhan pengguna dan untuk memberi Anda pengantar singkat ke bagian-bagian yang lebih kompleks dari Akka Streams, kami menambahkan beberapa fitur lagi ke contoh klien / server kami.
Satu hal yang belum bisa kami lakukan adalah menutup koneksi. Pada titik ini mulai menjadi sedikit lebih rumit karena API aliran yang telah kita lihat sejauh ini tidak memungkinkan kita untuk menghentikan aliran pada titik arbitrer. Namun, ada
GraphStage
abstraksi, yang dapat digunakan untuk membuat tahapan pemrosesan grafik sembarang dengan sejumlah port input atau output. Pertama-tama mari kita lihat sisi server, tempat kami memperkenalkan komponen baru, yang disebutcloseConnection
:API ini terlihat jauh lebih rumit daripada aliran API. Tidak heran, kita harus melakukan banyak langkah penting di sini. Sebagai gantinya, kami memiliki kontrol lebih besar atas perilaku aliran kami. Dalam contoh di atas, kami hanya menentukan satu input dan satu port output dan membuatnya tersedia untuk sistem dengan menimpa
shape
nilainya. Selanjutnya kami mendefinisikan apa yang disebutInHandler
dan aOutHandler
, yang dalam urutan ini bertanggung jawab untuk menerima dan memancarkan elemen. Jika Anda melihat dekat dengan contoh aliran klik penuh, Anda harus sudah mengenali komponen-komponen ini. DiInHandler
kami ambil elemen dan jika itu adalah string dengan karakter tunggal'q'
, kami ingin menutup aliran. Untuk memberi klien kesempatan untuk mengetahui bahwa aliran akan segera ditutup, kami memancarkan string"BYE"
dan kemudian kita segera menutup panggung sesudahnya. ThecloseConnection
komponen dapat digabungkan dengan aliran melaluivia
metode, yang diperkenalkan di bagian tentang arus.Selain bisa menutup koneksi, alangkah baiknya jika kita bisa menampilkan pesan selamat datang ke koneksi yang baru dibuat. Untuk melakukan ini kita sekali lagi harus melangkah lebih jauh:
Fungsi
serverLogic
sekarang mengambil koneksi yang masuk sebagai parameter. Di dalam tubuhnya kami menggunakan DSL yang memungkinkan kami untuk menggambarkan perilaku aliran yang kompleks. Denganwelcome
kami membuat aliran yang hanya bisa memancarkan satu elemen - pesan selamat datang.logic
adalah apa yang dijelaskanserverLogic
pada bagian sebelumnya. Satu-satunya perbedaan penting adalah bahwa kami menambahkannyacloseConnection
. Sekarang sebenarnya datang bagian yang menarik dari DSL. TheGraphDSL.create
Fungsi membuat pembangunb
yang tersedia, yang digunakan untuk mengekspresikan aliran sebagai grafik. Dengan~>
fungsi ini dimungkinkan untuk menghubungkan port input dan output satu sama lain. TheConcat
komponen yang digunakan dalam contoh dapat menggabungkan unsur-unsur dan di sini digunakan untuk tambahkan pesan selamat datang di depan unsur-unsur lain yang keluar dariinternalLogic
. Pada baris terakhir, kami hanya membuat port input dari logika server dan port output dari aliran gabungan karena semua port lain akan tetap menjadi detail implementasiserverLogic
komponen. Untuk pengantar mendalam ke grafik DSL Akka Streams, kunjungi bagian terkait di dokumentasi resmi . Contoh kode lengkap dari server TCP kompleks dan klien yang dapat berkomunikasi dengannya dapat ditemukan di sini . Setiap kali Anda membuka koneksi baru dari klien, Anda akan melihat pesan sambutan dan dengan mengetik"q"
pada klien Anda akan melihat pesan yang memberitahu Anda bahwa koneksi telah dibatalkan.Masih ada beberapa topik yang tidak dicakup oleh jawaban ini. Khususnya materialisasi dapat menakuti satu pembaca atau yang lain, tetapi saya yakin dengan materi yang dibahas di sini setiap orang harus dapat melakukan langkah selanjutnya sendiri. Seperti yang sudah dikatakan, dokumentasi resmi adalah tempat yang bagus untuk terus belajar tentang Akka Streams.
sumber