1. Обзор
В этом уроке мы кратко рассмотрим Big Queue , Java-реализацию постоянной очереди .
Мы немного поговорим о его архитектуре, а затем научимся использовать его на быстрых и практических примерах.
2. Использование
Нам нужно добавить зависимость bigqueue
в наш проект:
<dependency>
<groupId>com.leansoft</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.0</version>
</dependency>
Нам также нужно добавить его репозиторий:
<repository>
<id>github.release.repo</id>
<url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>
Если мы привыкли работать с базовыми очередями, адаптироваться к Big Queue не составит труда, поскольку его API очень похож.
2.1. Инициализация
Мы можем инициализировать нашу очередь, просто вызвав ее конструктор:
@Before
public void setup() {
String queueDir = System.getProperty("user.home");
String queueName = "foreach-queue";
bigQueue = new BigQueueImpl(queueDir, queueName);
}
Первый аргумент — домашний каталог для нашей очереди.
Второй аргумент представляет имя нашей очереди. Он создаст папку в домашнем каталоге нашей очереди, где мы сможем сохранять данные.
Мы должны не забыть закрыть нашу очередь, когда закончим, чтобы предотвратить утечку памяти:
bigQueue.close();
2.2. Вставка
Мы можем добавить элементы в хвост, просто вызвав метод enqueue
:
@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}
assertEquals(100, bigQueue.size());
}
Следует отметить, что Big Queue поддерживает только тип данных byte[]
, поэтому мы несем ответственность за сериализацию наших записей при вставке.
2.3. Чтение
Как и следовало ожидать, чтение данных с помощью метода dequeue
так же просто :
@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
bigQueue.enqueue(String.valueOf("new_record").getBytes());
String record = new String(bigQueue.dequeue());
assertEquals("new_record", record);
}
Мы также должны быть осторожны, чтобы правильно десериализовать наши данные при чтении.
Чтение из пустой очереди вызывает NullPointerException
.
Мы должны проверить, что в нашей очереди есть значения, используя метод isEmpty
:
if(!bigQueue.isEmpty()){
// read
}
Чтобы очистить нашу очередь без необходимости просматривать каждую запись, мы можем использовать метод removeAll
:
bigQueue.removeAll();
2.4. Подглядывание
При просмотре мы просто читаем запись, не потребляя ее:
@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
for (int i = 1; i <= 100; i++) {
bigQueue.enqueue(String.valueOf(i).getBytes());
}
String firstRecord = new String(bigQueue.peek());
assertEquals("1", firstRecord);
assertEquals(100, bigQueue.size());
}
2.5. Удаление использованных записей
Когда мы вызываем метод dequeue
, записи удаляются из нашей очереди, но остаются на диске.
Это потенциально может заполнить наш диск ненужными данными.
К счастью, мы можем удалить использованные записи с помощью метода gc
:
bigQueue.gc();
Точно так же, как сборщик мусора в Java очищает объекты, на которые нет ссылок, из кучи, gc
очищает использованные записи с нашего диска.
3. Архитектура и функции
Что интересно в Big Queue, так это то, что его кодовая база чрезвычайно мала — всего 12 исходных файлов, занимающих около 20 КБ дискового пространства.
На высоком уровне это просто постоянная очередь, которая отлично справляется с обработкой больших объемов данных.
3.1. Обработка больших объемов данных
Размер очереди ограничен только общим доступным дисковым пространством. Каждая запись в нашей очереди сохраняется на диске, чтобы быть устойчивой к сбоям.
Нашим узким местом будет дисковый ввод-вывод, а это означает, что SSD значительно улучшит среднюю пропускную способность по сравнению с жестким диском.
3.2. Чрезвычайно быстрый доступ к данным
Если мы взглянем на его исходный код, то заметим, что очередь поддерживается файлом с отображением памяти. Доступная часть нашей очереди (голова) хранится в оперативной памяти, поэтому доступ к записям будет чрезвычайно быстрым.
Даже если наша очередь станет чрезвычайно большой и займет терабайты дискового пространства, мы все равно сможем считывать данные с временной сложностью O(1).
Если нам нужно читать много сообщений, а скорость является критической проблемой, мы должны рассмотреть возможность использования SSD вместо жесткого диска, так как перемещение данных с диска в память будет намного быстрее.
3.3. Преимущества
Большим преимуществом является его способность вырастать очень большими в размерах. Мы можем масштабировать его до теоретической бесконечности, просто добавив больше памяти, отсюда и его название «Большой».
В параллельной среде Big Queue может производить и потреблять около 166 МБ/с данных на обычном компьютере.
Если наш средний размер сообщения составляет 1 КБ, он может обрабатывать 166 тыс. сообщений в секунду.
Он может обрабатывать до 333 тысяч сообщений в секунду в однопоточной среде — впечатляет!
3.4. Недостатки
Наши сообщения остаются на диске даже после того, как мы их израсходовали, поэтому нам нужно позаботиться о сборе мусора, когда он нам больше не нужен.
Мы также несем ответственность за сериализацию и десериализацию наших сообщений.
4. Вывод
В этом кратком руководстве мы узнали о большой очереди и о том, как мы можем использовать ее в качестве масштабируемой и постоянной очереди.
Как всегда, код доступен на Github .