Fixed Size Queue Which Automatically Dequeues Old Values Upon New Enques

Fixed size queue which automatically dequeues old values upon new enques

I would write a wrapper class that on Enqueue would check the Count and then Dequeue when the count exceeds the limit.

 public class FixedSizedQueue<T>
{
ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();

public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}

Limit size of Queue T in .NET?

I've knocked up a basic version of what I'm looking for, it's not perfect but it'll do the job until something better comes along.

public class LimitedQueue<T> : Queue<T>
{
public int Limit { get; set; }

public LimitedQueue(int limit) : base(limit)
{
Limit = limit;
}

public new void Enqueue(T item)
{
while (Count >= Limit)
{
Dequeue();
}
base.Enqueue(item);
}
}

Thread-safe fixed-size circular buffer with sequence ids

Based on the metrics that you provided in the question, you have plenty of options. The anticipated usage of the CircularBuffer<T> is not really that heavy. Wrapping a lock-protected Queue<T> should work pretty well. The cost of copying the contents of the queue into an array on each enumeration (copying 10,000 elements a few times per second) is unlikely to be noticeable. Modern machines can do such things in the blink of an eye. You'd have to enumerate the collection thousands of times per second for this to start (slightly) becoming an issue.

For the sake of variety I'll propose a different structure as internal storage: the ImmutableQueue<T> class. Its big plus is that it can be enumerated freely by multiple threads concurrently. You don't have to worry about concurrent mutations, because this collection is immutable. Nobody can change it after it has been created, ever.

The way that you update this collection is by creating a new collection and discarding the previous one. This collection has methods Enqueue and Dequeue that don't mutate the existing collection, but instead they return a new collection with the desirable mutation. This sounds extremely inefficient, but actually it's not. The new collection reuses most of the internal parts of the existing collection. Of course it's much more expensive compared to mutating a Queue<T>, probably around 10 times more expensive, but you hope that you'll get even more back in return by how cheap and non-contentious is to enumerate it.

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
private readonly object _locker = new();
private readonly int _capacity;
private ImmutableQueue<T> _queue = ImmutableQueue<T>.Empty;
private int _count = 0;
private long _lastId = 0;

public ConcurrentCircularBuffer(int capacity) => _capacity = capacity;

public void Enqueue(T item)
{
lock (_locker)
{
item.Id = ++_lastId;
_queue = _queue.Enqueue(item);
if (_count < _capacity)
_count++;
else
_queue = _queue.Dequeue();
}
}

public IEnumerator<T> GetEnumerator()
{
var enumerator = Volatile.Read(ref _queue).GetEnumerator();
while (enumerator.MoveNext())
yield return enumerator.Current;
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

The class that implements the IQueueItem interface should be implemented like this:

public class QueueItem : IQueueItem
{
private long _id;

public long Id
{
get => Volatile.Read(ref _id);
set => Volatile.Write(ref _id, value);
}
}

Otherwise it might be possible for a thread to see an IQueueItem instance with uninitialized Id. For an explanation you can read this article by Igor Ostrovsky. I am not 100% sure that it's possible, but neither I can guarantee that it's impossible. Even with the Volatile in place, it still looks fragile to me to delegate the responsibility of initializing the Id to an external component.

Is there a fixed size Deque which removes old elements in Java?

I can use something like this (need also rewrite other insert methods like push, pushLast...) but would like to hear other available solutions (if they exist).

public class ConcurrentFixedSizeLinkedDeque<T> extends ConcurrentLinkedDeque<T> {

private int sizeLimit = Integer.MAX_VALUE;

public ConcurrentFixedSizeLinkedDeque() {
}

public ConcurrentFixedSizeLinkedDeque(Collection<? extends T> c) {
super(c);
}

public ConcurrentFixedSizeLinkedDeque(int sizeLimit) {
if(sizeLimit<0) sizeLimit=0;
this.sizeLimit = sizeLimit;
}

public ConcurrentFixedSizeLinkedDeque(Collection<? extends T> c, int sizeLimit) {
super(c);
if(sizeLimit<0) sizeLimit=0;
this.sizeLimit = sizeLimit;
}

public int getSizeLimit() {
return sizeLimit;
}

public void setSizeLimit(int sizeLimit) {
this.sizeLimit = sizeLimit;
}

@Override
public void addFirst(T e){
while(size()>=this.sizeLimit){
pollLast();
}
super.addFirst(e);
}

@Override
public void addLast(T e){
while(size()>=this.sizeLimit){
pollFirst();
}
super.addLast(e);
}
}

Fixed-length Queue which removes first element when an element is appended at end (FIFO)

Answering my own question:

I have tried using collections.deque() and queue.Queue(), and deque is such an implementation

d = deque(maxlen=5)
d.extend([1,2,3,4,5])
print(d)
# deque([1, 2, 3, 4, 5], maxlen=5)
d.append(10)
print(d)
# deque([2, 3, 4, 5, 10], maxlen=5)

C# Concurrent, Fixed-Size Queue with Ability to Reference Individual Item

Use queue implementation based on old plain System.Array. Your points:

1 - Array has fixed length.

2 - Can access individual elements in O(1) by definition.

3 - This will be queue, so it FIFO.

4 - Array will be concurrent safe, just use lock for Enqueue and Dequeue methods.

5 - Implement them easily.

Fixed Size Persisted to Disk Queue using queuelib

So after looking through the source code of queuelib it appears that what queuelib actually does when you add and remove records from the persistent disk storage, what it is actually doing is keeping track of an internal offset and adding or subtracting from it while the Queue remains open.

This means that every record that you add to the queue is written to the file and remains written to the file until the queue is closed at which point it discards any data that you previously popped from the data structure.

So one possible solution would be to close and then reopen the queue once in a while.

For example,

from queuelib import FifoDiskQueue
import numpy as np

path = "C:\\temp"
data = FifoDiskQueue(path)

counter = 0
while True:
frame = np.random.rand(2,3).tobytes()
data.push(frame)
if len(data) >= 10:
data.pop()
counter += 1
if counter % 1000 == 0:
data.close()
data = FifoDiskQueue(path)


Update

There is also the chunksize argument that could be lowered which essentially would do the same thing as the previous solution when set to a low number. The default is 100,000. Setting this to a lower value is the better solution. The chunksize is a reference to the number of records kept in the storage before it closes the file internally, so a lower number limits the maximum size a file can get before it is closed and popped data is discarded.

For example:

from queuelib import FifoDiskQueue
import numpy as np

path = "C:\\temp"
data = FifoDiskQueue(path, chunksize=1000)
while True:
frame = np.random.rand(2,3).tobytes()
data.push(frame)
if len(data) >= 10:
data.pop()

C++ Create fixed size queue

You could inherit from queue, and then reimplement the push method. Here is a basic example.

#include <queue>
#include <deque>
#include <iostream>

template <typename T, int MaxLen, typename Container=std::deque<T>>
class FixedQueue : public std::queue<T, Container> {
public:
void push(const T& value) {
if (this->size() == MaxLen) {
this->c.pop_front();
}
std::queue<T, Container>::push(value);
}
};

int main() {
FixedQueue<int, 3> q;
q.push(1);
q.push(2);
q.push(3);
q.push(4);
q.push(5);
q.push(6);
q.push(7);

while (q.size() > 0)
{
std::cout << q.front() << std::endl;
q.pop();
}
}

This will print

$ g++ fixedqueue.cpp -std=c++17 -o fixedqueue && ./fixedqueue
5
6
7


Related Topics



Leave a reply



Submit