Buffer circular

Un anillo mostrando, conceptualmente, un buffer circular. El esquema muestra claramente que el buffer no tiene final, y se puede mover alrededor de todo el buffer. Sin embargo, como la memoria nunca puede ser creada físicamente como un anillo, se usa generalmente una representación lineal.

Un buffer circular, buffer cíclico o buffer de anillo es una estructura de datos que utiliza un buffer único o array ordinario y que adopta su nombre por la forma en que se ponen o sacan sus elementos. Estos buffers son de tamaño fijo, internamente es como si estuviera conectado de extremo a extremo.

Usos

Un ejemplo de uso de un buffer circular de escritura puede ser en multimedia. Si este tipo de buffer es usado como un buffer delimitador en el problema del productor-consumidor, es probablemente deseado por el productor (por ejemplo, un generador de audio) sobrescribir los datos antiguos si el consumidor (como ser la tarjeta de audio) no está momentáneamente disponible para un mantenimiento. Otro ejemplo es el método de síntesis de guías de ondas digitales, el cual usa buffers circulares para simular de forma eficiente el sonido de la vibración de los instrumentos de cuerda o viento. El atributo preciado de los buffers circulares es que no se tiene la necesidad de mover los elementos por la cola en el momento de que uno de ellos es consumido. Por otra parte, si se usara un buffer no circular sería necesario modificar todos los elementos cuando uno sea consumido. En otras palabras, el buffer circular es bien visto como un buffer FIFO (primero en entrar es el primero en salir); mientras que un buffer no circular representaría un buffer LIFO (último en entrar es el primero en salir). Una buena estrategia de implementación para una cola con tamaño máximo es mediante el uso de buffers circulares; en este caso todas las operaciones de la cola se realizan en tiempo constante. Sin embargo, expandir un buffer circular requiere cambio de memoria, lo cual es costoso.

¿Cómo funciona?

Un buffer circular trabaja básicamente con dos índices para acceder a los elementos del buffer, que aquí llamaremos Inpointer y Outpointer. Ambos índices tienen avance incremental y cíclico, es decir, se incrementan de uno en uno y luego de apuntar al último elemento del buffer vuelven a apuntar al primero.

Al inicio los dos índices apuntan al primer elemento del buffer. Veamos cómo y cuándo se incrementan:

  • Cada nuevo dato a guardar en el buffer será depositado en la casilla actualmente apuntada por Inpointer. A continuación Inpointer se incrementa en uno.
  • Por otro lado, cada dato que salga del buffer será el de la casilla actualmente apuntada por Outpointer. A continuación Outpointer se incremente en uno.

Estos buffers tienen un comportamiento FIFO ("First In - First Out", "Primero en entrar - primero en salir").

Una consecuencia de la memoria intermedia circular es que cuando está lleno y se realiza la posterior escritura, entonces comienza a sobrescribir los datos más antiguos.

Para saber si en el buffer hay espacio para meter más datos o si hay al menos un dato para sacar, se debe usar la diferencia entre las posiciones de los punteros. Otra posible opción es utilizar una variable adicional que se incremente con cada dato ingresado y se decremente con cada dato extraído.

Un buffer circular comienza vacío y con un tamaño predefinido. Por ejemplo, este es un buffer de 7 elementos:

Asuma que un 1 es escrito en el medio del buffer (la locación exacta no importa en un buffer circular):

Luego, asuma que dos elementos más son añadidos — 2 & 3 — los cuales quedan añadidos después del 1:

Si dos elementos son eliminados del buffer, los valores más viejos dentro del buffer son borrados. Los dos elementos borrados, en este caso, son 1 & 2; quedando el buffer con solamente un 3:

Si el buffer tiene 7 elementos se encuentra lleno:

Una consecuencia de los buffers circulares es que cuando se encuentra lleno y se realiza una nueva escritura, se comienza a sobrescribir los datos antiguos. En este caso, se agregan dos elementos más — A & B — y sobrescriben el 3 & 4:

Como una alternativa, se puede hace que las rutinas que administran el buffer no permitan que los datos se sobrescriban y retornen un error o una excepción.

Finalmente, si dos elementos son borrados ahora, se va a retornar 5 & 6 y no 3 & 4, dado que A & B sobrescribió el 3 & 4:

Mecánica del buffer circular

Punteros de Inicio/Fin

Generalmente, un buffer circular requiere tres punteros:

  • uno para el buffer que está actualmente en memoria
  • uno para apuntar el comienzo de los datos válidos
  • y otro que apunte al final de los datos válidos.

Como alternativa para lenguajes que no soportan punteros, se puede utilizar un buffer de tamaño fijo, con dos enteros que contengan los índices de comienzo y fin de los datos válidos.

Existen varias maneras de etiquetar los punteros y la semántica puede variar; una manera de hacerlo es la siguiente:

Esta imagen muestra un buffer parcialmente lleno:

Esta imagen muestra un buffer completo con dos elementos que han sido sobrescritos:

Una nota sobre la segunda imagen es que después de que cada elemento es sobrescrito el puntero de comienzo es incrementado también.

¿Qué dificultades presenta?

Un buffer circular puede presentar ciertos problemas que hay que saber controlar. En concreto las situaciones problemáticas si no son controladas son en el caso de que el buffer se encuentre totalmente vacío o totalmente lleno, en estos casos el índice de escritura coincidirá con el de lectura, lo que provocará fallos tales como que se lea información incorrecta del buffer o que se escriba información destruyendo información útil del buffer, provocando que ésta no pueda ser leída.

Posibles métodos para controlar estos problemas:

  • Dejar una posición como mínimo entre los índices de escritura y lectura.
  • Utilizar un contador de llenado del buffer.
  • Utilizar dos contadores uno de lecturas y otro de escrituras.

Dejar una posición como mínimo entre los índices de escritura y lectura.

Este método consiste en permitir realizar escrituras en el buffer hasta que el índice de escritura llegue a la posición anterior del índice de lectura, lo mismo ocurre para la operación de lectura.

Las Ventajas son:

  • Simple y robusto.
  • Solo necesitas los dos punteros.

Las desventajas son:

  • Nunca se puede usar el buffer entero.
  • Solo está permitido acceder a un elemento por vez, dado que no se puede saber fácilmente cuantos elementos hay al lado de cada uno en memoria.

Un ejemplo de implementación en C: (Mantiene una ranura abierta)

#include <stdio.h>
#include <string.h>
#include <malloc.h>

/*!
 * Circular Buffer Example (Keep one slot open)
 * Compile: gcc cbuf.c -o cbuf.exe
 */

/**< Tamaño de buffer */
#define BUFFER_SIZE 10
#define NUM_OF_ELEMS (BUFFER_SIZE-1)

/**< Tipos de buffer circular */
typedef unsigned char INT8U;
typedef INT8U KeyType;
typedef struct
{
    INT8U writePointer; /**< puntero de escritura */
    INT8U readPointer;  /**< puntero de lectura */
    INT8U size;         /**< tamaño del buffer circular */
    KeyType keys[0];    /**< elemento del buffer circular */
} CircularBuffer;

/**< Inicialización del buffer circular */
CircularBuffer* CircularBufferInit(CircularBuffer** pQue, int size)
{
    int sz = size*sizeof(KeyType)+sizeof(CircularBuffer);
    *pQue = (CircularBuffer*) malloc(sz);
    if(*pQue)
    {
        printf("Init CircularBuffer: keys[%d] (%d)\n", size, sz);
        (*pQue)->size=size;
        (*pQue)->writePointer = 0;
        (*pQue)->readPointer  = 0;
    }
    return *pQue;
}

inline int CircularBufferIsFull(CircularBuffer* que)
{
    return (((que->writePointer + 1) % que->size) == que->readPointer);
}

inline int CircularBufferIsEmpty(CircularBuffer* que)
{
    return (que->readPointer == que->writePointer);
}

inline int CircularBufferEnque(CircularBuffer* que, KeyType k)
{
    int isFull = CircularBufferIsFull(que);
    if(!isFull)
    {
        que->keys[que->writePointer] = k;
        que->writePointer++;
        que->writePointer %= que->size;
    }
    return isFull;
}

inline int CircularBufferDeque(CircularBuffer* que, KeyType* pK)
{
    int isEmpty =  CircularBufferIsEmpty(que);
    if(!isEmpty)
    {
        *pK = que->keys[que->readPointer];
        que->readPointer++;
        que->readPointer %= que->size;
    }
    return(isEmpty);
}

inline int CircularBufferPrint(CircularBuffer* que)
{
    int i=0;
    int isEmpty =  CircularBufferIsEmpty(que);
    int isFull  =  CircularBufferIsFull(que);
    printf("\n==Q: w:%d r:%d f:%d e:%d\n",
           que->writePointer, que->readPointer,  isFull, isEmpty);
    for(i=0; i< que->size; i++)
    {
        printf("%d ", que->keys[i]);
    }
    printf("\n");
    return(isEmpty);
}

int main(int argc, char *argv[])
{
    CircularBuffer* que;
    KeyType a = 101;
    int isEmpty, i;

    CircularBufferInit(&que, BUFFER_SIZE);
    CircularBufferPrint(que);

    for(i=1; i<=3; i++)
    {
        a=10*i;
        printf("\n\n===\nTest: Insert %d-%d\n", a, a+NUM_OF_ELEMS-1);
        while(! CircularBufferEnque(que, a++));

        //CircularBufferPrint(que);
        printf("\nRX%d:", i);
        a=0;
        isEmpty = CircularBufferDeque(que, &a);
        while (!isEmpty)
        {
            printf("%02d ", a);
            a=0;
            isEmpty = CircularBufferDeque(que, &a);
        }
        //CircularBufferPrint(que);
    }

    free(que);
    return 0;
}

Un ejemplo de implementación en C: (Usando todas las ranuras)

#include <stdio.h>
#include <string.h>
#include <malloc.h>

/*!
 * Circular Buffer Example
 * Compile: gcc cbuf.c -o cbuf.exe
 */

/**< Buffer Size */
#define BUFFER_SIZE 16

/**< Tipos de buffer circular */
typedef unsigned char INT8U;
typedef INT8U KeyType;
typedef struct
{
   INT8U writePointer; /**< Puntero escritura*/
   INT8U readPointer;  /**< Puntero lectura */
   INT8U size;         /**< tamaño del buffer circular */
   KeyType keys[0];    /**< Elemento del buffer circular */
} CircularBuffer;

/**< Inicialización del buffer circular */
CircularBuffer* CircularBufferInit(CircularBuffer** pQue, int size)
{
	int sz = size*sizeof(KeyType)+sizeof(CircularBuffer);
        *pQue = (CircularBuffer*) malloc(sz);
        if(*pQue)
        {
            printf("Init CircularBuffer: keys[%d] (%d)\n", size, sz);
            (*pQue)->size=size;
            (*pQue)->writePointer = 0;
            (*pQue)->readPointer  = 0;
        }
        return *pQue;
}

inline int CircularBufferIsFull(CircularBuffer* que)
{
     return ((que->writePointer + 1) % que->size == que->readPointer);
}

inline int CircularBufferIsEmpty(CircularBuffer* que)
{
     return ((que->readPointer == que->writePointer);
}

inline int CircularBufferEnque(CircularBuffer* que, KeyType k)
{
        int isFull = CircularBufferIsFull(que);
        que->keys[que->writePointer] = k;
        que->writePointer++;
        que->writePointer %= que->size;
        return isFull;
}

inline int CircularBufferDeque(CircularBuffer* que, KeyType* pK)
{
        int isEmpty =  CircularBufferIsEmpty(que);
        *pK = que->keys[que->readPointer];
        que->readPointer++;
        que->readPointer %= que->size;
        return(isEmpty);
}

int main(int argc, char *argv[])
{
        CircularBuffer* que;
        KeyType a = 0;
        int isEmpty;
        CircularBufferInit(&que, BUFFER_SIZE);

        while(! CircularBufferEnque(que, a++));

        do {
            isEmpty = CircularBufferDeque(que, &a);
            printf("%02d ", a);
        } while (!isEmpty);
        printf("\n");
        free(que);
        return 0;
}

Utilizar un contador de llenado del buffer.

Esta técnica consiste en mantener un contador de llenado que se incrementa con cada operación de escritura y se decrementa en cada operación de lectura, esto permite conocer en todo momento el estado de uso del buffer, dándonos la posibilidad de no permitir la lectura si el contador de llenado esta a cero y de no permitir la escritura si el contador de llenado esta en el valor de máxima capacidad del buffer.

Utilizar dos contadores uno de lecturas y otro de escrituras.

Este método consiste en utilizar dos contadores, uno para lectura y otro para escritura, dichos contadores se irán incrementando por cada operación correspondiente, de forma que cuando realizamos una escritura aumenta el contador de escritura y similar para el contador de lectura. Manteniendo este sistema, podemos comprobar el estado en el que se encuentra el buffer en cualquier momento.

POR EJEMPLO: el buffer podría estar en los siguientes estados:

  • vacío: el contador de escritura y lectura coinciden.
  • saturado: el contador de lectura es mayor que el de escritura o bien el contador de escritura es mayor que el de lectura más la capacidad máxima del buffer.
  • lleno: el contador de escritura es igual que el contador de lectura más la capacidad máxima del buffer.

Optimización

Una implementación del buffer circular puede ser optimizada asignando al buffer dos regiones contiguas de memoria virtual. (Naturalmente, el tamaño del buffer debe ser múltiplo del tamaño de página del sistema.) La lectura y escritura en el buffer circular debe realizarse con gran eficiencia en cuanto al acceso directo a memoria.

Ejemplo de implementación POSIX

#include <sys/mman.h>
#include <stdlib.h>
#include <unistd.h>

#define report_exceptional_condition() abort ()

struct ring_buffer
{
  void *address;

  unsigned long count_bytes;
  unsigned long write_offset_bytes;
  unsigned long read_offset_bytes;
};

//Warning order should be at least 12 for Linux
void
ring_buffer_create (struct ring_buffer *buffer, unsigned long order)
{
  char path[] = "/dev/shm/ring-buffer-XXXXXX";
  int file_descriptor;
  void *address;
  int status;

  file_descriptor = mkstemp (path);
  if (file_descriptor < 0)
    report_exceptional_condition ();

  status = unlink (path);
  if (status)
    report_exceptional_condition ();

  buffer->count_bytes = 1UL << order;
  buffer->write_offset_bytes = 0;
  buffer->read_offset_bytes = 0;

  status = ftruncate (file_descriptor, buffer->count_bytes);
  if (status)
    report_exceptional_condition ();

  buffer->address = mmap (NULL, buffer->count_bytes << 1, PROT_NONE,
                          MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);

  if (buffer->address == MAP_FAILED)
    report_exceptional_condition ();

  address =
    mmap (buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
          MAP_FIXED | MAP_SHARED, file_descriptor, 0);

  if (address != buffer->address)
    report_exceptional_condition ();

  address = mmap (buffer->address + buffer->count_bytes,
                  buffer->count_bytes, PROT_READ | PROT_WRITE,
                  MAP_FIXED | MAP_SHARED, file_descriptor, 0);

  if (address != buffer->address + buffer->count_bytes)
    report_exceptional_condition ();

  status = close (file_descriptor);
  if (status)
    report_exceptional_condition ();
}

void
ring_buffer_free (struct ring_buffer *buffer)
{
  int status;

  status = munmap (buffer->address, buffer->count_bytes << 1);
  if (status)
    report_exceptional_condition ();
}

void *
ring_buffer_write_address (struct ring_buffer *buffer)
{
  /*** void pointer arithmetic is a constraint violation. ***/
  return buffer->address + buffer->write_offset_bytes;
}

void
ring_buffer_write_advance (struct ring_buffer *buffer,
                           unsigned long count_bytes)
{
  buffer->write_offset_bytes += count_bytes;
}

void *
ring_buffer_read_address (struct ring_buffer *buffer)
{
  return buffer->address + buffer->read_offset_bytes;
}

void
ring_buffer_read_advance (struct ring_buffer *buffer,
                          unsigned long count_bytes)
{
  buffer->read_offset_bytes += count_bytes;

  if (buffer->read_offset_bytes >= buffer->count_bytes)
    {
      buffer->read_offset_bytes -= buffer->count_bytes;
      buffer->write_offset_bytes -= buffer->count_bytes;
    }
}

unsigned long
ring_buffer_count_bytes (struct ring_buffer *buffer)
{
  return buffer->write_offset_bytes - buffer->read_offset_bytes;
}

unsigned long
ring_buffer_count_free_bytes (struct ring_buffer *buffer)
{
  return buffer->count_bytes - ring_buffer_count_bytes (buffer);
}

void
ring_buffer_clear (struct ring_buffer *buffer)
{
  buffer->write_offset_bytes = 0;
  buffer->read_offset_bytes = 0;
}
//---------------------------------------------------------------------------
// template class Queue
//---------------------------------------------------------------------------
template <class T> class Queue {

        T *qbuf;   // buffer data
        int qsize; // 
        int head;  // index begin data
        int tail;  // index stop data

        inline void Free()
        {
                if (qbuf != 0)
                {
                        delete []qbuf;
                        qbuf= 0;
                }
                qsize= 1;
                head= tail= 0;
        }

public:
        Queue()
        {
                qsize= 32;
                qbuf= new T[qsize];
                head= tail= 0;
        }

        Queue(const int size): qsize(1), qbuf(0), head(0), tail(0)
        {
                if ((size <= 0) || (size & (size - 1)))
                {
                        throw "Value is not degree of two";
                }

                qsize= size;
                qbuf= new T[qsize];
                head= tail= 0;
        }

        ~Queue()
        {
                Free();
        }

        void Enqueue(const T &p)
        {
                if (IsFull()) 
                {
                        throw "Queue is full";
                }

                qbuf[tail]= p;
                tail= (tail + 1) & (qsize - 1);
        }

        // Retrieve the item from the queue
        void Dequeue(T &p)
        {
                if (IsEmpty())
                {
                        throw "Queue is empty";
                }

                p= qbuf[head];
                head= (head + 1) & (qsize - 1);
        }

        // Get i-element with not delete
        void Peek(const int i, T &p) const
        {
                int j= 0;
                int k= head;
                while (k != tail)
                {
                        if (j == i) break;
                        j++;

                        k= (k + 1) & (qsize - 1);
                }
                if (k == tail) throw "Out of range";
                p= qbuf[k];
        }

        // Size must by: 1, 2, 4, 8, 16, 32, 64, ..
        void Resize(const int size)
        {
                if ((size <= 0) || (size & (size - 1)))
                {
                        throw "Value is not degree of two";
                }

                Free();
                qsize= size;
                qbuf= new T[qsize];
                head= tail= 0;
        }

        inline void Clear(void) { head= tail= 0; }

        inline int  GetCapacity(void) const { return (qsize - 1); }

        // Count elements
        inline int  GetBusy(void) const   { return ((head > tail) ? qsize : 0) + tail - head; }

        // true - if queue if empty
        inline bool IsEmpty(void) const { return (head == tail); }

        // true - if queue if full
        inline bool IsFull(void) const  { return ( ((tail + 1) & (qsize - 1)) == head ); }

};
//---------------------------------------------------------------------------
// Use:
        Queue <int> Q;
        Q.Enqueue(5);
        Q.Enqueue(100);
        Q.Enqueue(13);
        int len= Q.GetBusy();
        int val;
        Q.Dequeue(val);

//---------------------------------------------------------------------------