Antrian ukuran tetap yang secara otomatis memindahkan nilai lama ke enque baru

121

Saya menggunakan ConcurrentQueueuntuk struktur data bersama yang tujuannya adalah menahan objek N terakhir yang diteruskan ke sana (jenis riwayat).

Asumsikan kita memiliki browser dan ingin memiliki 100 URL terakhir yang diramban. Saya ingin antrian yang secara otomatis menjatuhkan (dequeue) entri terlama (pertama) pada penyisipan entri baru (enqueue) ketika kapasitas penuh (100 alamat dalam riwayat).

Bagaimana saya bisa melakukannya dengan menggunakan System.Collections?

Xaqron
sumber
Itu tidak dimaksudkan khusus untuk Anda, tetapi untuk siapa saja yang menemukan pertanyaan ini dan mungkin merasa berguna. btw, itu berbicara tentang C # juga. Apakah Anda berhasil membaca semua jawaban (dalam 2 menit) dan menemukan bahwa tidak ada kode C # di sana? Bagaimanapun, saya sendiri tidak yakin, dan karenanya ini adalah komentar ...
Anda bisa membungkus metode dalam kunci. Mengingat bahwa mereka cepat, Anda cukup mengunci seluruh larik. Ini mungkin penipuan. Mencari implementasi buffer melingkar dengan kode C # mungkin menemukan sesuatu untuk Anda. Bagaimanapun, semoga berhasil.

Jawaban:

111

Saya akan menulis kelas pembungkus yang di Enqueue akan memeriksa Hitungan dan kemudian Dequeue ketika hitungan melebihi batas.

 public class FixedSizedQueue<T>
 {
     ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }
Richard Schneider
sumber
4
qbersifat pribadi untuk objek, sehingga lockakan mencegah utas lain mengakses secara bersamaan.
Richard Schneider
14
Mengunci bukanlah ide yang baik. Seluruh tujuan dari koleksi bersamaan BCL adalah untuk menyediakan konkurensi bebas kunci untuk alasan kinerja. Penguncian kode Anda membahayakan manfaat itu. Sebenarnya saya tidak melihat alasan Anda perlu mengunci deq.
KFL
2
@KFL, perlu dikunci karena Countdan TryDequeuemerupakan dua operasi independen yang tidak disinkronkan oleh BCL Bersamaan.
Richard Schneider
9
@RichardSchneider Jika Anda perlu menangani masalah konkurensi sendiri, sebaiknya menukar ConcurrentQueue<T>objek dengan Queue<T>objek yang lebih ringan.
0b101010
6
Jangan tentukan antrian Anda sendiri, cukup gunakan yang diwariskan. Jika Anda melakukan apa yang Anda lakukan, Anda sebenarnya tidak dapat melakukan apa pun dengan nilai antrian, semua fungsi lain tetapi yang baru Enqueuemasih akan memanggil antrian asli. Dengan kata lain, meskipun jawaban ini ditandai sebagai diterima, namun benar-benar rusak.
Gábor
104

Saya akan memilih sedikit varian ... memperpanjang ConcurrentQueue agar dapat menggunakan ekstensi Linq di FixedSizeQueue

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}
Dave Lawrence
sumber
1
apa yang terjadi jika seseorang secara statis mengetahui instance sebagai ConcurrentQueue <T>, mereka baru saja menghindari kata kunci 'baru' Anda.
mhand
6
@mhand Jika 'seseorang' ingin melakukan itu; maka mereka akan memilih untuk menggunakan objek ConcurrentQueue <T> untuk memulai ... Ini adalah kelas penyimpanan khusus. Tidak ada yang mencari ini untuk dikirimkan ke kerangka .NET. Anda telah berusaha menciptakan masalah demi masalah itu.
Dave Lawrence
9
maksud saya adalah alih-alih subclassing mungkin Anda hanya harus membungkus antrian ... ini memberlakukan perilaku yang diinginkan dalam semua kasus. Juga, karena ini adalah kelas penyimpanan khusus, mari kita membuatnya sepenuhnya khusus, hanya mengekspos operasi yang kita butuhkan, subclassing adalah alat yang salah di sini IMHO.
mhand
3
@ tangan Ya, saya mengerti apa yang Anda katakan .. Saya bisa membungkus antrian dan mengekspos pencacah antrian untuk menggunakan ekstensi Linq.
Dave Lawrence
1
saya setuju dengan @mhand Anda tidak boleh mewarisi ConcurrentQueue karena metode Enqueue tidak virtual. Anda harus membuat proxy antrian dan mengimplementasikan seluruh antarmuka jika diinginkan.
Chris Marisic
29

Bagi siapa saja yang merasa berguna, berikut adalah beberapa kode kerja berdasarkan jawaban Richard Schneider di atas:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}
Tod Thomson
sumber
1
Memberi suara untuk alasan yang disebutkan (mengunci saat menggunakan ConcurrentQueue itu buruk) selain tidak mengimplementasikan antarmuka yang diperlukan untuk ini menjadi koleksi yang sebenarnya.
Josh
11

Untuk apa nilainya, berikut adalah buffer melingkar ringan dengan beberapa metode yang ditandai untuk penggunaan yang aman dan tidak aman.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

Saya suka menggunakan Foo()/SafeFoo()/UnsafeFoo()konvensi:

  • Foometode panggilan UnsafeFoosebagai default.
  • UnsafeFoo metode mengubah status secara bebas tanpa kunci, mereka hanya memanggil metode tidak aman lainnya.
  • SafeFoometode memanggil UnsafeFoometode di dalam kunci.

Ini sedikit bertele-tele, tetapi itu membuat kesalahan yang jelas, seperti memanggil metode yang tidak aman di luar kunci dalam metode yang seharusnya aman untuk thread, lebih jelas.

Juliet
sumber
5

Inilah pendapat saya tentang Antrian ukuran tetap

Ini menggunakan Queue biasa, untuk menghindari overhead sinkronisasi saat Countproperti digunakan ConcurrentQueue. Ini juga mengimplementasikan IReadOnlyCollectionsehingga metode LINQ dapat digunakan. Sisanya sangat mirip dengan jawaban lain di sini.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Ali Zahid
sumber
3

Hanya untuk bersenang-senang, berikut adalah implementasi lain yang saya yakin dapat menjawab sebagian besar kekhawatiran para pemberi komentar. Secara khusus, keamanan thread dicapai tanpa penguncian dan implementasinya disembunyikan oleh kelas pembungkus.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}
erdomke
sumber
1
Ini rusak jika digunakan secara bersamaan - bagaimana jika utas didahului setelah memanggil _queue.Enqueue(obj)tetapi sebelumnya Interlocked.Increment(ref _count), dan utas lainnya memanggil .Count? Itu akan salah menghitung. Saya belum memeriksa masalah lainnya.
KFL
3

Versi saya hanyalah subclass dari yang normal Queue.. tidak ada yang istimewa kecuali melihat semua orang berpartisipasi dan masih sesuai dengan judul topik saya sebaiknya taruh di sini. Itu juga mengembalikan yang dequeue untuk berjaga-jaga.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
5argon
sumber
2

Mari tambahkan satu jawaban lagi. Mengapa ini terjadi pada orang lain?

1) Kesederhanaan. Mencoba menjamin ukuran itu baik dan bagus tetapi mengarah pada kerumitan yang tidak diperlukan yang dapat menunjukkan masalahnya sendiri.

2) Menerapkan IReadOnlyCollection, artinya Anda dapat menggunakan Linq di atasnya dan menyebarkannya ke berbagai hal yang diharapkan IEnumerable.

3) Tidak ada penguncian. Banyak solusi di atas menggunakan kunci, yang salah pada koleksi tanpa kunci.

4) Mengimplementasikan kumpulan metode, properti, dan antarmuka yang sama dengan yang dilakukan ConcurrentQueue, termasuk IProducerConsumerCollection, yang penting jika Anda ingin menggunakan koleksi dengan BlockingCollection.

Implementasi ini berpotensi berakhir dengan lebih banyak entri daripada yang diharapkan jika TryDequeue gagal, tetapi frekuensi kemunculannya tampaknya tidak sebanding dengan kode khusus yang pasti akan menghambat kinerja dan menyebabkan masalah tak terduga sendiri.

Jika Anda benar-benar ingin menjamin suatu ukuran, menerapkan Prune () atau metode serupa sepertinya merupakan ide terbaik. Anda dapat menggunakan kunci baca ReaderWriterLockSlim dengan metode lain (termasuk TryDequeue) dan mengambil kunci tulis hanya saat memangkas.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}
Josh
sumber
2

Hanya karena belum ada yang mengatakannya .. Anda dapat menggunakan LinkedList<T>dan menambahkan pengaman utas:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

Satu hal yang perlu diperhatikan adalah urutan enumerasi default adalah LIFO dalam contoh ini. Tapi itu bisa diganti jika perlu.

Brandon
sumber
1

Untuk kesenangan coding Anda, saya kirimkan kepada Anda ' ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

Contoh Penggunaan:

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Chris Hayes
sumber
1
Saya suka implementasi ini tetapi perhatikan bahwa ketika tidak ada yang ditambahkan, ia mengembalikan default (T)
Daniel Leach
Jika Anda menggunakan kunci dengan cara ini, Anda harus menggunakan ReaderWriterLockSlim untuk memprioritaskan pembaca Anda.
Josh
1

Itu tergantung pada penggunaan. Saya perhatikan bahwa beberapa solusi di atas dapat melebihi ukuran bila digunakan dalam lingkungan multip-threaded. Pokoknya kasus penggunaan saya adalah untuk menampilkan 5 kejadian terakhir dan ada beberapa utas yang menulis kejadian ke dalam antrian dan satu utas lainnya membaca darinya dan menampilkannya dalam Kontrol Winform. Jadi inilah solusi saya.

EDIT: Karena kami sudah menggunakan penguncian dalam implementasi kami, kami tidak benar-benar membutuhkan ConcurrentQueue itu dapat meningkatkan kinerja.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

EDIT: Kami tidak terlalu membutuhkan syncObjectcontoh di atas dan kami lebih suka menggunakan queueobjek karena kami tidak menginisialisasi ulang queuedalam fungsi apa pun dan itu ditandai sebagai readonly.

Mubashar
sumber
0

Jawaban yang diterima akan memiliki efek samping yang dapat dihindari.

Mekanisme Penguncian Butir Halus dan Bebas Kunci

Link di bawah ini adalah referensi yang saya gunakan ketika saya menulis contoh saya di bawah ini.

Meskipun dokumentasi dari Microsoft agak menyesatkan karena mereka menggunakan kunci, namun mereka mengunci kelas segmen. Kelas segmen itu sendiri menggunakan Interlocked.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Lib.Core
{
    // Sources: 
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
    // https://docs.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

    /// <summary>
    /// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
    /// </summary>
    /// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
    public class ConcurrentCircularBuffer<TObject>
    {
        private readonly ConcurrentQueue<TObject> _queue;

        public int Capacity { get; private set; }

        public ConcurrentCircularBuffer(int capacity)
        {
            if(capacity <= 0)
            {
                throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
            }

            // Setup the queue to the initial capacity using List's underlying implementation.
            _queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));

            Capacity = capacity;
        }

        public void Enqueue(TObject @object)
        {
            // Enforce the capacity first so the head can be used instead of the entire segment (slow).
            while (_queue.Count + 1 > Capacity)
            {
                if (!_queue.TryDequeue(out _))
                {
                    // Handle error condition however you want to ie throw, return validation object, etc.
                    var ex = new Exception("Concurrent Dequeue operation failed.");
                    ex.Data.Add("EnqueueObject", @object);
                    throw ex;
                }
            }

            // Place the item into the queue
            _queue.Enqueue(@object);
        }

        public TObject Dequeue()
        {
            if(_queue.TryDequeue(out var result))
            {
                return result;
            }

            return default;
        }
    }
}
jjhayter
sumber
0

Berikut adalah implementasi lain yang menggunakan ConcurrentQueue yang mendasari sebanyak mungkin sambil menyediakan antarmuka yang sama yang tersedia melalui ConcurrentQueue.

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
    private readonly ConcurrentQueue<TValue> _queue;

    private readonly object _syncObject = new object();

    public int LimitSize { get; }

    public FixedSizedConcurrentQueue(int limit)
    {
        _queue = new ConcurrentQueue<TValue>();
        LimitSize = limit;
    }

    public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
    {
        _queue = new ConcurrentQueue<TValue>(collection);
        LimitSize = limit;

    }

    public int Count => _queue.Count;

    bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;

    object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot; 

    public bool IsEmpty => _queue.IsEmpty;

    // Not supported until .NET Standard 2.1
    //public void Clear() => _queue.Clear();

    public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);

    void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);

    public void Enqueue(TValue obj)
    {
        _queue.Enqueue(obj);
        lock( _syncObject )
        {
            while( _queue.Count > LimitSize ) {
                _queue.TryDequeue(out _);
            }
        }
    }

    public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();

    public TValue[] ToArray() => _queue.ToArray();

    public bool TryAdd(TValue item)
    {
        Enqueue(item);
        return true;
    }

    bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);

    public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);

    public bool TryPeek(out TValue result) => _queue.TryPeek(out result);

}
Tod Cunningham
sumber
-1

Ini adalah versi antrian saya:

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

Saya merasa berguna untuk memiliki konstruktor yang dibangun di atas IEnumerable dan saya merasa berguna memiliki GetSnapshot untuk memiliki daftar aman multithread (array dalam kasus ini) dari item pada saat panggilan, yang tidak naik kesalahan jika koleksi yang mendasari berubah.

Pemeriksaan Hitung ganda untuk mencegah kunci dalam beberapa keadaan.

Tidak penting
sumber
1
Memberi suara untuk mengunci antrian. Jika Anda benar-benar ingin mengunci, ReaderWriterLockSlim adalah yang terbaik (dengan asumsi Anda berharap untuk mengambil kunci baca lebih sering daripada kunci tulis). GetSnapshot juga tidak diperlukan. Jika Anda mengimplementasikan IReadOnlyCollection <T> (yang seharusnya Anda lakukan untuk semantik IEnumerable), ToList () akan melayani fungsi yang sama.
Josh
ConcurrentQueue menangani kunci dalam implementasinya, lihat tautan di jawaban saya.
jjhayter