Перейти к основному содержимому

Введение в большую очередь

· 4 мин. чтения

1. Обзор

В этом уроке мы кратко рассмотрим Big Queue , Java-реализацию постоянной очереди .

Мы немного поговорим о его архитектуре, а затем научимся использовать его на быстрых и практических примерах.

2. Использование

Нам нужно добавить зависимость bigqueue в наш проект:

<dependency>
<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.0</version>
</dependency>

Нам также нужно добавить его репозиторий:

<repository>
<id>github.release.repo</id>
<url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>

Если мы привыкли работать с базовыми очередями, адаптироваться к Big Queue не составит труда, поскольку его API очень похож.

2.1. Инициализация

Мы можем инициализировать нашу очередь, просто вызвав ее конструктор:

@Before
public void setup() {
String queueDir = System.getProperty("user.home");
String queueName = "foreach-queue";
bigQueue = new BigQueueImpl(queueDir, queueName);
}

Первый аргумент — домашний каталог для нашей очереди.

Второй аргумент представляет имя нашей очереди. Он создаст папку в домашнем каталоге нашей очереди, где мы сможем сохранять данные.

Мы должны не забыть закрыть нашу очередь, когда закончим, чтобы предотвратить утечку памяти:

bigQueue.close();

2.2. Вставка

Мы можем добавить элементы в хвост, просто вызвав метод enqueue :

@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}

assertEquals(100, bigQueue.size());
}

Следует отметить, что Big Queue поддерживает только тип данных byte[] , поэтому мы несем ответственность за сериализацию наших записей при вставке.

2.3. Чтение

Как и следовало ожидать, чтение данных с помощью метода dequeue так же просто :

@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
bigQueue.enqueue(String.valueOf("new_record").getBytes());

String record = new String(bigQueue.dequeue());

assertEquals("new_record", record);
}

Мы также должны быть осторожны, чтобы правильно десериализовать наши данные при чтении.

Чтение из пустой очереди вызывает NullPointerException .

Мы должны проверить, что в нашей очереди есть значения, используя метод isEmpty :

if(!bigQueue.isEmpty()){
// read
}

Чтобы очистить нашу очередь без необходимости просматривать каждую запись, мы можем использовать метод removeAll :

bigQueue.removeAll();

2.4. Подглядывание

При просмотре мы просто читаем запись, не потребляя ее:

@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}

String firstRecord = new String(bigQueue.peek());

assertEquals("1", firstRecord);
assertEquals(100, bigQueue.size());
}

2.5. Удаление использованных записей

Когда мы вызываем метод dequeue , записи удаляются из нашей очереди, но остаются на диске.

Это потенциально может заполнить наш диск ненужными данными.

К счастью, мы можем удалить использованные записи с помощью метода gc :

bigQueue.gc();

Точно так же, как сборщик мусора в Java очищает объекты, на которые нет ссылок, из кучи, gc очищает использованные записи с нашего диска.

3. Архитектура и функции

Что интересно в Big Queue, так это то, что его кодовая база чрезвычайно мала — всего 12 исходных файлов, занимающих около 20 КБ дискового пространства.

На высоком уровне это просто постоянная очередь, которая отлично справляется с обработкой больших объемов данных.

3.1. Обработка больших объемов данных

Размер очереди ограничен только общим доступным дисковым пространством. Каждая запись в нашей очереди сохраняется на диске, чтобы быть устойчивой к сбоям.

Нашим узким местом будет дисковый ввод-вывод, а это означает, что SSD значительно улучшит среднюю пропускную способность по сравнению с жестким диском.

3.2. Чрезвычайно быстрый доступ к данным

Если мы взглянем на его исходный код, то заметим, что очередь поддерживается файлом с отображением памяти. Доступная часть нашей очереди (голова) хранится в оперативной памяти, поэтому доступ к записям будет чрезвычайно быстрым.

Даже если наша очередь станет чрезвычайно большой и займет терабайты дискового пространства, мы все равно сможем считывать данные с временной сложностью O(1).

Если нам нужно читать много сообщений, а скорость является критической проблемой, мы должны рассмотреть возможность использования SSD вместо жесткого диска, так как перемещение данных с диска в память будет намного быстрее.

3.3. Преимущества

Большим преимуществом является его способность вырастать очень большими в размерах. Мы можем масштабировать его до теоретической бесконечности, просто добавив больше памяти, отсюда и его название «Большой».

В параллельной среде Big Queue может производить и потреблять около 166 МБ/с данных на обычном компьютере.

Если наш средний размер сообщения составляет 1 КБ, он может обрабатывать 166 тыс. сообщений в секунду.

Он может обрабатывать до 333 тысяч сообщений в секунду в однопоточной среде — впечатляет!

3.4. Недостатки

Наши сообщения остаются на диске даже после того, как мы их израсходовали, поэтому нам нужно позаботиться о сборе мусора, когда он нам больше не нужен.

Мы также несем ответственность за сериализацию и десериализацию наших сообщений.

4. Вывод

В этом кратком руководстве мы узнали о большой очереди и о том, как мы можем использовать ее в качестве масштабируемой и постоянной очереди.

Как всегда, код доступен на Github .