Menggunakan SignalR dengan failover pesan Redis menggunakan BookSleeve's ConnectionUtils.Connect ()

112

Saya mencoba membuat skenario failover bus pesan Redis dengan aplikasi SignalR.

Pada awalnya, kami mencoba failover penyeimbang beban perangkat keras sederhana, yang hanya memantau dua server Redis. Aplikasi SignalR menunjuk ke titik akhir HLB tunggal. Saya kemudian gagal dalam satu server, tetapi tidak berhasil mendapatkan pesan apa pun di server Redis kedua tanpa mendaur ulang kumpulan aplikasi SignalR. Mungkin ini karena perlu mengeluarkan perintah pengaturan ke bus pesan Redis yang baru.

Mulai SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBusmenggunakan Booksleeve RedisConnection()untuk dihubungkan ke satu Redis untuk pub / sub.

Saya membuat kelas baru, RedisMessageBusCluster()yang menggunakan Booksleeve ConnectionUtils.Connect()untuk dihubungkan ke salah satu kelompok server Redis.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve memiliki mekanismenya sendiri untuk menentukan master, dan secara otomatis akan beralih ke server lain, dan sekarang saya mengujinya dengan SignalR.Chat.

Di web.config, saya mengatur daftar server yang tersedia:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Kemudian di Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Saya menambahkan dua metode tambahan untuk Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Sekarang masalahnya adalah ketika saya mengaktifkan beberapa breakpoint, hingga setelah nama pengguna ditambahkan, lalu nonaktifkan semua breakpoint, aplikasi berfungsi seperti yang diharapkan. Namun, dengan breakpoint dinonaktifkan sejak awal, tampaknya ada beberapa kondisi balapan yang mungkin gagal selama proses koneksi.

Jadi, di RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

Saya mencoba menambahkan a Task.Wait, dan bahkan tambahan Sleep()(tidak ditunjukkan di atas) - yang sedang menunggu / etc, tetapi masih mendapatkan kesalahan.

Kesalahan berulang tampaknya berada di Booksleeve.MessageQueue.cs~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Di mana pengecualian antrian tertutup dilemparkan.

Saya memperkirakan masalah lain: Karena koneksi Redis dibuat, Application_Start()mungkin ada beberapa masalah dalam "menyambungkan kembali" ke server lain. Namun, menurut saya ini berlaku saat menggunakan singular RedisConnection(), di mana hanya ada satu koneksi untuk dipilih. Namun, dengan intorduksi ConnectionUtils.Connect()saya ingin mendengar dari @dfowleratau orang SignalR lainnya tentang bagaimana skenario ini ditangani di SignalR.

ElHaix
sumber
Saya akan melihat, tetapi: hal pertama yang terjadi adalah Anda tidak perlu menelepon Openkarena koneksi yang Anda miliki seharusnya sudah terbuka. Saya tidak bisa langsung melihat, karena saya sedang bersiap-siap untuk penerbangan
Marc Gravell
Saya yakin ada dua masalah di sini. 1) bagaimana Booksleeve menangani kegagalan; 2) Bagaimana SignalR menggunakan kursor untuk melacak klien. Saat bus pesan baru diinisialisasi, semua kursor dari mb1 tidak keluar di mb2. Oleh karena itu, saat menyetel ulang kumpulan aplikasi SignalR, itu akan mulai berfungsi - bukan sebelumnya, yang jelas bukan opsi yang layak.
ElHaix
2
Tautan yang menjelaskan bagaimana SignalR menggunakan kursor: stackoverflow.com/questions/13054592/…
ElHaix
Coba gunakan versi terbaru bus pesan redis. Ini mendukung melewati pabrik koneksi dan menangani mencoba kembali untuk menghubungkan ketika server turun.
davidfowl
Apakah Anda memiliki link untuk catatan rilis? Terima kasih.
ElHaix

Jawaban:

13

Tim SignalR sekarang telah menerapkan dukungan untuk pabrik koneksi kustom dengan StackExchange.Redis , penerus BookSleeve, yang mendukung koneksi Redundan melalui ConnectionMultiplexer.

Masalah awal yang dihadapi adalah meskipun saya membuat metode ekstensi sendiri di BookSleeve untuk menerima kumpulan server, gagal-gagal tidak dimungkinkan.

Sekarang, dengan evolusi BookSleeve ke StackExchange.Redis, sekarang kita dapat mengonfigurasi kumpulan server / port langsung saat Connectinisialisasi.

Implementasi baru jauh lebih sederhana daripada jalan yang saya jalani, dalam membuat UseRedisClustermetode, dan back-end pluming sekarang mendukung fail-over yang sebenarnya:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis juga memungkinkan konfigurasi manual tambahan seperti yang diuraikan di Automatic and Manual Configurationbagian dokumentasi:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

Intinya, kemampuan untuk menginisialisasi lingkungan skala-out SignalR kami dengan kumpulan server sekarang memecahkan masalah awal.

ElHaix
sumber
Haruskah saya menghargai jawaban Anda dengan 500 rep bounty? ;)
nicael
Nah, jika Anda yakin itu sekarang jawabannya :)
ElHaix
@ElHaix sejak Anda mengajukan pertanyaan, Anda mungkin paling memenuhi syarat untuk mengatakan apakah jawaban Anda meyakinkan atau apakah itu hanya sepotong teka-teki - Saya sarankan untuk menambahkan kalimat untuk menunjukkan apakah dan mungkin bagaimana itu memecahkan masalah Anda
Lars Höppner
Begitu? Hadiah penghargaan? Atau saya bisa menunggu sampai menarik lebih banyak perhatian.
nicael
Apakah saya melewatkan sesuatu atau ini hanya di cabang fitur, bukan di paket nuget utama (2.1)? Selain itu, sepertinya di cabang bug- stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ), belum ada cara di kelas RedisScaleoutConfiguration untuk menyediakan multiplekser Anda sendiri.
Steve