Pemrograman jaringan Async menggunakan Ekstensi Reaktif

25

Setelah melakukan beberapa socketpemrograman async "level rendah" (lebih-atau-kurang) beberapa tahun yang lalu (dalam mode Asynchronous Pattern (EAP) berbasis Kejadian ) dan baru-baru ini memindahkan "naik" ke TcpListener(Asynchronous Programming Model (APM) ) dan kemudian mencoba untuk pindah ke async/await(Pola Asynchronous Berbasis Tugas (TAP) ) Saya sudah cukup banyak dengan terus harus repot dengan semua ini 'pipa tingkat rendah'. Jadi saya membayangkan; mengapa tidak mencobanya RX( Ekstensi Reaktif ) karena mungkin lebih pas untuk domain masalah saya.

Banyak kode yang saya tulis harus dengan banyak klien yang menghubungkan melalui Tcp ke aplikasi saya yang kemudian memulai komunikasi dua arah (async). Klien atau server mungkin pada suatu saat memutuskan pesan perlu dikirim dan melakukannya, jadi ini bukan request/responsepengaturan klasik Anda tetapi lebih dari "jalur" real-time, dua arah, terbuka untuk kedua pihak untuk mengirim apa pun yang mereka inginkan , kapan pun mereka mau. (Jika ada yang memiliki nama yang layak untuk menggambarkan ini, saya akan senang mendengarnya!).

"Protokol" berbeda per aplikasi (dan tidak benar-benar relevan dengan pertanyaan saya). Namun saya punya pertanyaan awal:

  1. Mengingat bahwa hanya satu "server" yang berjalan, tetapi harus melacak banyak (biasanya ribuan) koneksi (misalnya klien) yang masing-masing memiliki (karena tidak ada deskripsi yang lebih baik) "mesin negara" mereka sendiri untuk melacak internal mereka menyatakan dll, pendekatan mana yang Anda inginkan? EAP / TAP / APM? Apakah RX bahkan dianggap sebagai opsi? Jika tidak, mengapa?

Jadi, saya harus bekerja dengan Async karena a) ini bukan protokol permintaan / respons jadi saya tidak dapat memiliki utas / klien dalam panggilan "menunggu pesan" atau "mengirim pesan" - memblokir panggilan (namun, jika pengiriman dilakukan memblokir untuk klien itu hanya saya yang bisa hidup dengannya) dan b) Saya perlu menangani banyak koneksi bersamaan. Saya melihat tidak ada cara melakukan ini (andal) menggunakan panggilan memblokir.

Sebagian besar aplikasi saya terkait VoiP; baik itu pesan SIP dari klien SIP atau pesan PBX (terkait) dari aplikasi seperti FreeSwitch / OpenSIPS dll. tetapi Anda dapat, dalam bentuk yang paling sederhana, mencoba membayangkan server "obrolan" mencoba menangani banyak "obrolan" klien. Sebagian besar protokol berbasis teks (ASCII).

Jadi, setelah menerapkan banyak permutasi yang berbeda dari teknik-teknik yang disebutkan di atas, saya ingin menyederhanakan pekerjaan saya dengan membuat objek yang bisa saya instantiate, beri tahu IPEndpointuntuk mendengarkan dan memberitahukannya kapan saja sesuatu yang menarik terjadi (yang biasanya saya lakukan gunakan event untuk, jadi beberapa EAP biasanya dicampur dengan dua teknik lainnya). Kelas tidak perlu repot untuk 'memahami' protokol; itu hanya harus menangani string masuk / keluar. Dan dengan demikian, dengan mata saya pada RX berharap bahwa (pada akhirnya) akan menyederhanakan pekerjaan, saya menciptakan "biola" baru dari awal:

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
        f.Start();
        Console.ReadKey();
        f.Stop();
        Console.ReadKey();
    }
}

public class FiddleServer
{
    private TcpListener _listener;
    private ConcurrentDictionary<ulong, FiddleClient> _clients;
    private ulong _currentid = 0;

    public IPEndPoint LocalEP { get; private set; }

    public FiddleServer(IPEndPoint localEP)
    {
        this.LocalEP = localEP;
        _clients = new ConcurrentDictionary<ulong, FiddleClient>();
    }

    public void Start()
    {
        _listener = new TcpListener(this.LocalEP);
        _listener.Start();
        Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
            //OnNext
            tcpclient =>
            {
                //Create new FSClient with unique ID
                var fsclient = new FiddleClient(_currentid++, tcpclient);
                //Keep track of clients
                _clients.TryAdd(fsclient.ClientId, fsclient);
                //Initialize connection
                fsclient.Send("connect\n\n");

                Console.WriteLine("Client {0} accepted", fsclient.ClientId);
            },
            //OnError
            ex =>
            {

            },
            //OnComplete
            () =>
            {
                Console.WriteLine("Client connection initialized");
                //Accept new connections
                _listener.AcceptTcpClientAsync();
            }
        );
        Console.WriteLine("Started");
    }

    public void Stop()
    {
        _listener.Stop();
        Console.WriteLine("Stopped");
    }

    public void Send(ulong clientid, string rawmessage)
    {
        FiddleClient fsclient;
        if (_clients.TryGetValue(clientid, out fsclient))
        {
            fsclient.Send(rawmessage);
        }
    }
}

public class FiddleClient
{
    private TcpClient _tcpclient;

    public ulong ClientId { get; private set; }

    public FiddleClient(ulong id, TcpClient tcpclient)
    {
        this.ClientId = id;
        _tcpclient = tcpclient;
    }

    public void Send(string rawmessage)
    {
        Console.WriteLine("Sending {0}", rawmessage);
        var data = Encoding.ASCII.GetBytes(rawmessage);
        _tcpclient.GetStream().WriteAsync(data, 0, data.Length);    //Write vs WriteAsync?
    }
}

Saya menyadari bahwa, dalam "biola" ini, ada sedikit detail implementasi spesifik; dalam hal ini saya bekerja dengan FreeSwitch ESL sehingga "connect\n\n"biola di harus, ketika refactoring ke pendekatan yang lebih umum, harus dihapus.

Saya juga sadar bahwa saya perlu memperbaiki metode anonim menjadi metode instance pribadi pada kelas Server; Saya tidak yakin apa konvensi (misalnya " OnSomething" misalnya) yang akan digunakan untuk nama-metode mereka?

Ini adalah basis / titik awal / fondasi saya (yang membutuhkan "penyesuaian"). Saya punya beberapa pertanyaan tentang ini:

  1. Lihat pertanyaan di atas "1"
  2. Apakah saya di jalur yang benar? Atau apakah keputusan "desain" saya tidak adil?
  3. Concurrency-wise: akankah ini mengatasi ribuan klien (mengesampingkan / menangani pesan yang sebenarnya)
  4. Pada pengecualian: Saya tidak yakin bagaimana cara mendapatkan pengecualian yang diangkat dalam klien "naik" ke server ("RX-wise"); apa yang akan menjadi cara yang baik?
  5. Saya sekarang bisa mendapatkan klien yang terhubung dari kelas server saya (menggunakannya ClientId), dengan asumsi saya mengekspos klien dalam satu atau lain cara, dan memanggil metode langsung pada mereka. Saya juga bisa memanggil metode melalui kelas Server (misalnya, Send(clientId, rawmessage)metode (sedangkan pendekatan yang terakhir akan menjadi metode "kenyamanan" untuk dengan cepat mendapatkan pesan ke sisi lain).
  6. Saya tidak yakin ke mana (dan bagaimana) dari sini:
    • a) Saya perlu menangani pesan yang masuk; bagaimana saya mengatur ini? Saya bisa mendapatkan aliran kursus, tetapi di mana saya akan menangani mengambil byte yang diterima? Saya pikir saya perlu semacam "ObservableStream" - sesuatu yang dapat saya ikuti? Apakah saya akan memasukkan ini ke dalam FiddleClientatau FiddleServer?
    • b) Dengan asumsi saya ingin menghindari penggunaan event sampai kelas FiddleClient/ FiddleServerini diimplementasikan lebih spesifik untuk menyesuaikan aplikasi mereka menangani protokol tertentu dll. menggunakan lebih spesifik FooClient/ FooServerkelas: bagaimana saya beralih dari mendapatkan data di kelas 'Fiddle' yang mendasari ke kelas mereka mitra yang lebih spesifik?

Artikel / tautan yang sudah saya baca / baca skim / digunakan untuk referensi:

RobIII
sumber
Lihatlah perpustakaan ReactiveSockets yang ada
Flagbug
2
Saya tidak mencari perpustakaan atau tautan (meskipun untuk referensi mereka dihargai) tetapi untuk masukan / saran / bantuan pada pertanyaan saya dan pengaturan umum. Saya ingin belajar memperbaiki kode saya sendiri dan menjadi lebih mampu menentukan arah untuk mengambil ini, menimbang pro dan kontra dll. Tidak merujuk ke beberapa perpustakaan, masukkan dan lanjutkan. Saya ingin belajar dari pengalaman ini dan lebih berpengalaman dengan pemrograman Rx / jaringan.
RobIII
Tentu, tetapi karena perpustakaan adalah open source, Anda dapat melihat penerapannya di sana
Flagbug
1
Tentu, tetapi melihat kode sumber tidak menjelaskan mengapa beberapa keputusan desain dibuat. Dan karena saya relatif baru untuk Rx dalam kombinasi dengan pemrograman Jaringan, saya tidak memiliki cukup pengalaman untuk mengatakan apakah perpustakaan ini bagus, jika desainnya masuk akal, jika keputusan yang tepat dibuat dan bahkan jika itu tepat untuk saya.
RobIII
Saya pikir akan baik untuk memikirkan kembali server sebagai anggota aktif dari jabat tangan, jadi, server yang memulai koneksi alih-alih mendengarkan koneksi. Misalnya, di sini: codeproject.com/Articles/20250/Reverse-Connection-Shell

Jawaban:

1

... Saya ingin menyederhanakan pekerjaan saya dengan membuat objek yang bisa saya instantiate, beri tahu IPEndpoint mana yang harus didengarkan dan beri tahu saya kapan saja sesuatu yang menarik terjadi ...

Setelah membaca pernyataan itu saya langsung berpikir "aktor". Aktor sangat mirip dengan objek kecuali mereka hanya memiliki satu input di mana Anda menyampaikannya pesan (bukan langsung memanggil metode objek) dan mereka beroperasi secara tidak sinkron. Dalam contoh yang sangat sederhana ... Anda akan membuat aktor dan mengirimkannya pesan dengan IPEndpoint dan alamat aktor untuk mengirim hasilnya. Itu berbunyi dan bekerja di latar belakang. Anda hanya mendengar kabar darinya ketika "sesuatu yang menarik" terjadi. Anda dapat instantiate aktor sebanyak yang Anda butuhkan untuk menangani beban.

Saya tidak terbiasa dengan perpustakaan aktor di. Net meskipun saya tahu ada beberapa. Saya kenal dengan pustaka Dataflow TPL (akan ada bagian yang membahasnya di buku saya http://DataflowBook.com ) dan seharusnya mudah untuk menerapkan model aktor sederhana dengan pustaka itu.

Matt Carkci
sumber
Itu terlihat menarik. Saya akan mengatur proyek mainan untuk melihat apakah itu cocok untuk saya. Terima kasih untuk sarannya.
RobIII