Efficient Circular Buffer

efficient circular buffer?

I would use collections.deque with a maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

There is a recipe in the docs for deque that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.

improving C circular buffer efficiency

As "Oli Charlesworth" suggested - you'd be able to simplify things if your buffer size is a power of 2. I'd like to write the read/write function bodies, so that the intent is more clear.

#define BUFF_SIZE (4U)
#define BUFF_SIZE_MASK (BUFF_SIZE-1U)

struct buffer {
float buff[BUFF_SIZE];
unsigned writeIndex;
};

void write(struct buffer *buffer, float value) {
buffer->buff[(++buffer->writeIndex) & BUFF_SIZE_MASK] = value;
}

float readn(struct buffer *buffer, unsigned Xn){
return buffer->buff[(buffer->writeIndex - Xn) & BUFF_SIZE_MASK];
}

Some explanations. Note that there's no branching (if) at all. We don't limit the array index to the array bounds, instead we're AND-ing it with the mask.

Efficient circular buffer in C++ which will be passed to C-style array function parameter

Thank you for the answer Werner. When I run this solution on Repl.it, I get:

it took an average of 21us and a max of 57382us

For comparison, my original idea with the same buffer size has the following result:

it took an average of 19us and a max of 54129us

This means that my initial approach indeed was naive :)

In the meantime, while waiting for the answer, I've come up with following solution:

#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>

void foo(double* arr, int size)
{
for (uint k = 0; k < size; k++)
std::cout << arr[k] << ", ";

std::cout << std::endl;
}

int main()
{
const int buffer_size = 20;
std::array<double, buffer_size*2> buffer{};
int buffer_idx = buffer_size;

for (double data = 0.0; data < 100.0; data += 1.0)
{
buffer.at(buffer_idx - buffer_size) = data;
buffer.at(buffer_idx++) = data;

foo(buffer.data() + buffer_idx - buffer_size, buffer_size);

buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
}
}

Since the size of the buffer is not a problem, I allocate twice the memory needed and insert data at two places, offset by the buffer size. When I reach the end, I just go back like the typewriter. The idea is that I fake the circular buffer by storing one more copy of data so it can read data as if it crossed full circle.

For buffer size of 50000, this gives me the following result which exactly what I wanted:

it took an average of 0us and a max of 23us

How would you code an efficient Circular Buffer in Java or C#?

I would use an array of T, a head and tail pointer, and add and get methods.

Like: (Bug hunting is left to the user)

// Hijack these for simplicity
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;

public class CircularBuffer<T> {

private T[] buffer;

private int tail;

private int head;

@SuppressWarnings("unchecked")
public CircularBuffer(int n) {
buffer = (T[]) new Object[n];
tail = 0;
head = 0;
}

public void add(T toAdd) {
if (head != (tail - 1)) {
buffer[head++] = toAdd;
} else {
throw new BufferOverflowException();
}
head = head % buffer.length;
}

public T get() {
T t = null;
int adjTail = tail > head ? tail - buffer.length : tail;
if (adjTail < head) {
t = (T) buffer[tail++];
tail = tail % buffer.length;
} else {
throw new BufferUnderflowException();
}
return t;
}

public String toString() {
return "CircularBuffer(size=" + buffer.length + ", head=" + head + ", tail=" + tail + ")";
}

public static void main(String[] args) {
CircularBuffer<String> b = new CircularBuffer<String>(3);
for (int i = 0; i < 10; i++) {
System.out.println("Start: " + b);
b.add("One");
System.out.println("One: " + b);
b.add("Two");
System.out.println("Two: " + b);
System.out.println("Got '" + b.get() + "', now " + b);

b.add("Three");
System.out.println("Three: " + b);
// Test Overflow
// b.add("Four");
// System.out.println("Four: " + b);

System.out.println("Got '" + b.get() + "', now " + b);
System.out.println("Got '" + b.get() + "', now " + b);
// Test Underflow
// System.out.println("Got '" + b.get() + "', now " + b);

// Back to start, let's shift on one
b.add("Foo");
b.get();
}
}
}

memory efficient way of implementing a circular buffer using mmap

Your code had some bugs.

The second mmap call used sizeof(N_buff) [which is always 4] instead of: sizeof(*shared_data->queue) * N_buff

It is possible to do a single mmap for all the data [see below].


Here's the annotated and corrected code:

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/sem.h>
#include <sys/mman.h>

struct shared {
sem_t P;
sem_t C;
sem_t M;
int prod_status;
char **queue;
int buffer_size;
int queue_start;
// pointing to buffer index after the last element in the buffer
int queue_after_last;
int queue_count;
};

int
main(int agrc, char *argv[])
{
int N_buff = atoi(argv[1]);
struct shared *shared_data;

shared_data = mmap(NULL, sizeof(struct shared), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);

// printf("hello\n");
shared_data->buffer_size = N_buff;
// NOTE/BUG: sizeof(N_buff) is _always_ 4
#if 0
shared_data->queue = mmap(NULL, sizeof(N_buff), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
#else
shared_data->queue = mmap(NULL, sizeof(*shared_data->queue) * N_buff,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
#endif
// NOTE: sizeof(char) is _always_ 1
for (int i = 0; i < N_buff; i++) {
shared_data->queue[i] = mmap(NULL, 70,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
}

shared_data->queue_start = 0;
shared_data->queue_after_last = 0;
shared_data->queue_count = 0;
shared_data->prod_status = 1;

sem_init(&shared_data->P, 1, N_buff);
sem_init(&shared_data->C, 1, 0);
sem_init(&shared_data->M, 1, 1);
}

Here's some cleaned up code for a single mmap [I've compiled but not tested it]:

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/sem.h>
#include <sys/mman.h>

#define PERQUEUE 70

struct shared {
sem_t P;
sem_t C;
sem_t M;
int prod_status;
char **queue;
int buffer_size;
int queue_start;
// pointing to buffer index after the last element in the buffer
int queue_after_last;
int queue_count;
};

int
main(int agrc, char *argv[])
{
int N_buff = atoi(argv[1]);
struct shared *shared_data = NULL;
size_t total_size = 0;

total_size += sizeof(struct shared);
total_size += sizeof(*shared_data->queue) * N_buff;
total_size += sizeof(PERQUEUE * N_buff);

void *mapbase = mmap(NULL, total_size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);

void *mapcur = mapbase;

shared_data = mapcur;
mapcur += sizeof(struct shared);

// printf("hello\n");
shared_data->buffer_size = N_buff;

shared_data->queue = mapcur;
mapcur += sizeof(*shared_data->queue) * N_buff;

for (int i = 0; i < N_buff; i++) {
shared_data->queue[i] = mapcur;
mapcur += PERQUEUE;
}

shared_data->queue_start = 0;
shared_data->queue_after_last = 0;
shared_data->queue_count = 0;
shared_data->prod_status = 1;

sem_init(&shared_data->P, 1, N_buff);
sem_init(&shared_data->C, 1, 0);
sem_init(&shared_data->M, 1, 1);

// stuff ...

munmap(mapbase,total_size);
}

How to solve this with a circular buffer?

int *p_dynamic = array[0];
int *p_start = array[0];
int *p_end = array[L_array - 1];

You have not told us what these variables are intended to do, so we do not know if you are using them in a sensible way for their intended purposes.

if (--p_dynamic < p_start)

When p_dynamic points to array[0], the behavior of --p_dynamic is not defined by the C standard. The standard defines pointer arithmetic only from elements 0 of an array to just beyond the last element of the array.

int K = 3;
int i = some_int;
int output;

output = array[i] + array[i+1] + array[K];

Again, you have not told us the intent, so we do not know what you are trying to achieve here. Why is K 3? Is that a fixed constant in every use of your code or just an example? Does it have any relationship to i? Do you mean array[K] to mean the actual element array[3] will always be added, or do you mean that the element at offset 3 from the current start of the circular buffer will be added?

if (p_dynamic[K] > p_end &&
p_dynamic[i] < p_end &&
p_dynamic[i+1] < p_end)

This makes no sense because p_dynamic is a pointer to an int, so p_dynamic[K] is an int, but p_end is a pointer to an int, so p_dynamic[K] > p_end attempts to compare an int to a pointer. I suspect you are attempting to ask whether the element p_dynamic[K] would be beyond the end of the array. You might do that with p_dynamic + K > p_end except, as with --p_dynamic, if it is beyond the end of the array, the pointer arithmetic is not defined by the C standard.

Simplify your code by making a function to help you access array elements. Make a structure that describes the circular buffer. To start simply, I will assume you have a fixed buffer size of Size eleemnts.

typedef struct
{
int array[Size];
ptrdiff_t current;
} CircularBuffer;

The member current will be the index (subscript) of the element that is currently oldest in the buffer, the current “start” of the circular buffer. (The ptrdiff_t type is defined in the header <stddef.h>. It may be used for array subscripts.

Then make a function that gives you a pointer to the element with index i:

int *CBElement(CircularBuffer *cb, ptrdiff_t i)
{
ptrdiff_t index = i + current;
if (Size <= index) index -= Size;
return &cb->array[index];
}

Then you can refer to array element K, for example, as *CBElement(&cb, K):

output = *CBElement(&cb, i) + *CBElement(&cb, i+1) + *CBElement(&cb, K);

You can also put new values in the array:

*CBElement(&cb, i) = NewValue;

To advance the buffer, update the current member in the obvious way and put a new value in the new last element (*CBElement(&cb, Size-1)).

This should simplify the task of getting the code working. After it is working, you can consider optimizations and refinements of the buffer implementation.

Mongo as circular buffer

From https://en.wikipedia.org/wiki/Circular_buffer:

a circular buffer [...] is a data structure that uses a single, fixed-size buffer as if it were connected end-to-end.

I'm afraid the "Capped collections work in a way similar to circular buffers" you quoted uses precisely this definition of the circular buffer.

The capped collection is capped by size and/or number of document. The old documents are not removed by timer but by new documents. Think about it like the new documents overwrite the old ones.

Unfortunately this feature makes it impossible to delete documents from the collection https://docs.mongodb.com/manual/core/capped-collections/#document-deletion. Neither by TTL nor explicitly. And since there is no formal deletion, there is no deletion event in the change stream.

To put it simple, if you need to retrieve documents evicted from the buffer you need to implement it yourself.

TTL index may work for your, but it is time bound, not size bound. It will issue a deletion event to the changestream, but three are few things to consider:

  • you will need to maintain changestream client running to ensure you catch all events.
  • TTL index process comes with the cost. Every minute Mongodb runs the TTL Monitor thread to delete outdated documents. It consumes resources. Not as much as sqlite but still system performance may degrade and documents may not be deleted exactly after specified amount of time if it's busy with some other operations.

It would be advisable to take control and select/delete documents yourself. I understand you already have some implementation that uses sqlite, so it's just a matter of adjusting it to use mongodb instead.

db.collection.find({}).sort({_id:-1}).limit(1)

Will return you the oldest document. It uses default index and should perform well.

Simple circular buffer: Need to extend function to overwrite old data

If you just want to overwrite the oldest data just use the index modulo the size of the buffer:

#define BUFFER_SIZE 10  // size of myData

void writeDataToRingbuffer(RingBuffer *pBuf, MyData_Struct_t *Data)
{
(void)memcpy(&pBuf->myData[pBuf->write_idx % BUFFER_SIZE], Data,
sizeof(MyData_Struct_t)); //myData will be read later by another function

pBuf->write_idx += 1;
pBuf->NrOfMessagesinTheQue++;
}

With this method you can also combine write_idx and NrOfMessageinTheQue since you never have to reset the index to 0 now.



Related Topics



Leave a reply



Submit