1. Обзор
В этом руководстве мы узнаем, как реализовать кольцевой буфер в Java.
2. Кольцевой буфер
Кольцевой буфер (или Circular Buffer) — это ограниченная круговая структура данных, которая используется для буферизации данных между двумя или более потоками . Поскольку мы продолжаем писать в кольцевой буфер, он циклически повторяется по мере достижения конца.
2.1. Как это работает
Кольцевой буфер реализован с использованием массива фиксированного размера, который охватывает границы .
Помимо массива, он отслеживает три вещи:
- следующий доступный слот в буфере для вставки элемента,
- следующий непрочитанный элемент в буфере,
- и конец массива — точка, в которой буфер переходит к началу массива
Механика того, как кольцевой буфер обрабатывает эти требования, зависит от реализации. Например, запись в Википедии по этому вопросу показывает метод, использующий четыре указателя.
Мы позаимствуем подход из реализации Disruptor кольцевого буфера с использованием последовательностей.
Первое, что нам нужно знать, это емкость — фиксированный максимальный размер буфера. Далее мы будем использовать две монотонно возрастающие последовательности :
- Последовательность записи: начиная с -1, увеличивается на 1 при вставке элемента
- Последовательность чтения: начиная с 0, увеличивается на 1 по мере использования элемента
Мы можем сопоставить последовательность с индексом в массиве, используя операцию mod:
arrayIndex = sequence % capacity
Операция mod оборачивает последовательность вокруг границ, чтобы получить слот в буфере :
Давайте посмотрим, как мы будем вставлять элемент:
buffer[++writeSequence % capacity] = element
Мы предварительно увеличиваем последовательность перед вставкой элемента.
Чтобы использовать элемент, мы делаем постинкремент:
element = buffer[readSequence++ % capacity]
В этом случае мы выполняем постинкремент последовательности. Использование элемента не удаляет его из буфера — он просто остается в массиве до тех пор, пока не будет перезаписан .
2.2. Пустые и полные буферы
По мере того, как мы обходим массив, мы начинаем перезаписывать данные в буфере. Если буфер заполнен, мы можем либо перезаписать самые старые данные независимо от того, использовал ли их считыватель, либо предотвратить перезапись данных, которые не были прочитаны .
Если читатель может позволить себе пропустить промежуточные или старые значения (например, бегущую строку курса акций), мы можем перезаписать данные, не дожидаясь их использования. С другой стороны, если считыватель должен использовать все значения (как в случае с транзакциями электронной коммерции), мы должны ждать (ожидание блокировки/ожидания занятости), пока в буфере не появится свободный слот.
Буфер заполнен, если размер буфера равен его емкости , где его размер равен количеству непрочитанных элементов:
size = (writeSequence - readSequence) + 1
isFull = (size == capacity)
Если последовательность записи отстает от последовательности чтения, буфер пуст :
isEmpty = writeSequence < readSequence
Буфер возвращает нулевое
значение, если он пуст.
2.2. Преимущества и недостатки
Кольцевой буфер — это эффективный буфер FIFO. Он использует массив фиксированного размера, который может быть предварительно выделен заранее, и обеспечивает эффективный шаблон доступа к памяти. Все операции с буфером выполняются за постоянное время O(1)
, включая потребление элемента, так как это не требует смещения элементов.
С другой стороны, определение правильного размера кольцевого буфера имеет решающее значение. Например, операции записи могут блокироваться на долгое время, если размер буфера недостаточен, а операции чтения выполняются медленно. Мы можем использовать динамическое изменение размера, но это потребует перемещения данных, и мы упустим большинство преимуществ, описанных выше.
3. Реализация на Java
Теперь, когда мы поняли, как работает кольцевой буфер, давайте приступим к его реализации на Java.
3.1. Инициализация
Во-первых, давайте определим конструктор, который инициализирует буфер с предопределенной емкостью:
public CircularBuffer(int capacity) {
this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
this.data = (E[]) new Object[this.capacity];
this.readSequence = 0;
this.writeSequence = -1;
}
Это создаст пустой буфер и инициализирует поля последовательности, как обсуждалось в предыдущем разделе.
3.2. Предложение
Далее мы реализуем операцию предложения
, которая вставляет элемент в буфер в следующем доступном слоте и возвращает значение true
в случае успеха. Возвращает false
, если буфер не может найти пустой слот, то есть мы не можем перезаписать непрочитанные значения .
Давайте реализуем метод offer
на Java:
public boolean offer(E element) {
boolean isFull = (writeSequence - readSequence) + 1 == capacity;
if (!isFull) {
int nextWriteSeq = writeSequence + 1;
data[nextWriteSeq % capacity] = element;
writeSequence++;
return true;
}
return false;
}
Итак, мы увеличиваем последовательность записи и вычисляем индекс в массиве для следующего доступного слота. Затем мы записываем данные в буфер и сохраняем обновленную последовательность записи.
Давайте попробуем:
@Test
public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() {
CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);
assertTrue(buffer.offer("Square"));
assertEquals(1, buffer.size());
}
3.3. Опрос
Наконец, мы реализуем операцию опроса
, которая извлекает и удаляет следующий непрочитанный элемент. Операция опроса
не удаляет элемент, а увеличивает последовательность чтения .
Давайте реализуем это:
public E poll() {
boolean isEmpty = writeSequence < readSequence;
if (!isEmpty) {
E nextValue = data[readSequence % capacity];
readSequence++;
return nextValue;
}
return null;
}
Здесь мы читаем данные в текущей последовательности чтения, вычисляя индекс в массиве. Затем мы увеличиваем последовательность и возвращаем значение, если буфер не пуст.
Давайте проверим это:
@Test
public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() {
CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);
buffer.offer("Triangle");
String shape = buffer.poll();
assertEquals("Triangle", shape);
}
4. Проблема производитель-потребитель
Мы говорили об использовании кольцевого буфера для обмена данными между двумя или более потоками, что является примером проблемы синхронизации, называемой проблемой производителя-потребителя . В Java мы можем решить проблему производитель-потребитель различными способами, используя семафоры , ограниченные очереди , кольцевые буферы и т.д.
Давайте реализуем решение на основе кольцевого буфера.
4.1. изменчивые
поля последовательности
Наша реализация кольцевого буфера не является потокобезопасной. Давайте сделаем его потокобезопасным для простого случая с одним производителем и одним потребителем.
Производитель записывает данные в буфер и увеличивает writeSequence
, в то время как потребитель только читает из буфера и увеличивает readSequence
. Таким образом, резервный массив не конфликтует, и мы можем обойтись без синхронизации.
Но нам все еще нужно убедиться, что потребитель может видеть последнее значение поля writeSequence
( видимость ) и что writeSequence
не обновляется до того, как данные будут фактически доступны в буфере ( упорядочение ).
В этом случае мы можем сделать кольцевой буфер параллельным и свободным от блокировок, сделав поля последовательности volatile
:
private volatile int writeSequence = -1, readSequence = 0;
В методе предложения
запись в изменчивое
поле writeSequence
гарантирует, что запись в буфер произойдет до обновления последовательности. В то же время гарантия изменчивой
видимости гарантирует, что потребитель всегда будет видеть самое последнее значение writeSequence
.
4.2. Режиссер
Давайте реализуем простого производителя Runnable
, который записывает в кольцевой буфер:
public void run() {
for (int i = 0; i < items.length;) {
if (buffer.offer(items[i])) {
System.out.println("Produced: " + items[i]);
i++;
}
}
}
Поток-производитель будет ожидать появления пустого слота в цикле (ожидание занятости).
4.3. Потребитель
Мы реализуем Callable
потребителя , который читает из буфера:
public T[] call() {
T[] items = (T[]) new Object[expectedCount];
for (int i = 0; i < items.length;) {
T item = buffer.poll();
if (item != null) {
items[i++] = item;
System.out.println("Consumed: " + item);
}
}
return items;
}
Поток-потребитель продолжает работу без печати, если он получает нулевое
значение из буфера.
Напишем код нашего драйвера:
executorService.submit(new Thread(new Producer<String>(buffer)));
executorService.submit(new Thread(new Consumer<String>(buffer)));
Выполнение нашей программы производителя-потребителя приводит к выводу, как показано ниже:
Produced: Circle
Produced: Triangle
Consumed: Circle
Produced: Rectangle
Consumed: Triangle
Consumed: Rectangle
Produced: Square
Produced: Rhombus
Consumed: Square
Produced: Trapezoid
Consumed: Rhombus
Consumed: Trapezoid
Produced: Pentagon
Produced: Pentagram
Produced: Hexagon
Consumed: Pentagon
Consumed: Pentagram
Produced: Hexagram
Consumed: Hexagon
Consumed: Hexagram
5. Заключение
В этом руководстве мы узнали, как реализовать кольцевой буфер, и рассмотрели, как его можно использовать для решения проблемы производитель-потребитель.
Как обычно, исходный код всех примеров доступен на GitHub .