Перейти к основному содержимому

Хвостовые курсоры Spring Data MongoDB

· 5 мин. чтения

1. Введение

В этом руководстве мы собираемся обсудить, как использовать MongoDB в качестве бесконечного потока данных, используя хвостовые курсоры с Spring Data MongoDB .

2. Хвостовые курсоры

Когда мы выполняем запрос, драйвер базы данных открывает курсор для предоставления соответствующих документов. По умолчанию MongoDB автоматически закрывает курсор, когда клиент читает все результаты. Следовательно, поворот приводит к конечному потоку данных.

Однако мы можем использовать ограниченные коллекции с хвостовым курсором, который остается открытым даже после того, как клиент использует все первоначально возвращенные данные, создавая бесконечный поток данных. Этот подход удобен для приложений, работающих с потоками событий, такими как сообщения чата или биржевые обновления.

Проект Spring Data MongoDB помогает нам использовать возможности реактивной базы данных, включая хвостовые курсоры.

3. Настройка

Чтобы продемонстрировать упомянутые функции, мы реализуем простое приложение счетчика журналов. Предположим, что есть какой-то агрегатор журналов, который собирает и сохраняет все журналы в центральном месте — нашей закрытой коллекции MongoDB.

Во-первых, мы будем использовать простой объект Log :

@Document
public class Log {
private @Id String id;
private String service;
private LogLevel level;
private String message;
}

Во-вторых, мы будем хранить журналы в нашей закрытой коллекции MongoDB. Ограниченные коллекции — это коллекции фиксированного размера, которые вставляют и извлекают документы в соответствии с порядком вставки. Мы можем создать их с помощью MongoOperations.createCollection :

db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
.capped(true)
.sizeInBytes(1024)
.maxDocuments(5));

Для ограниченных коллекций мы должны определить свойство sizeInBytes . Более того, maxDocuments указывает максимальное количество документов, которое может иметь коллекция. После достижения более старые документы будут удалены из коллекции.

В-третьих, мы будем использовать соответствующую стартовую зависимость Spring Boot :

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
<versionId>2.2.2.RELEASE</versionId>
</dependency>

4. Реактивные хвостовые курсоры

Мы можем использовать хвостовые курсоры как с императивным , так и с реактивным API MongoDB. Настоятельно рекомендуется использовать реактивный вариант .

Давайте реализуем счетчик журналов уровня WARN , используя реактивный подход. Мы можем создавать бесконечные потоковые запросы с помощью метода ReactiveMongoOperations.tail .

Хвостовой курсор остается открытым и выдает данные — поток сущностей — по мере того, как новые документы поступают в ограниченную коллекцию и соответствуют запросу фильтра :

private Disposable subscription;

public WarnLogsCounter(ReactiveMongoOperations template) {
Flux<Log> stream = template.tail(
query(where("level").is(LogLevel.WARN)),
Log.class);
subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}

Как только новый документ с уровнем журнала WARN будет сохранен в коллекции, подписчик (лямбда-выражение) увеличит счетчик.

Наконец, мы должны избавиться от подписки, чтобы закрыть поток:

public void close() {
this.subscription.dispose();
}

Кроме того, обратите внимание, что хвостовые курсоры могут стать мертвыми или недействительными, если запрос изначально не возвращает совпадений. Другими словами, даже если новые сохраняемые документы соответствуют запросу фильтра, подписчик не сможет их получить. Это известное ограничение хвостовых курсоров MongoDB. Мы должны убедиться, что в закрытой коллекции есть соответствующие документы, прежде чем создавать хвостовой курсор.

5. Хвостовые курсоры с реактивным репозиторием

Проекты Spring Data предлагают абстракцию репозитория для различных хранилищ данных, включая реактивные версии.

MongoDB не является исключением. Пожалуйста, ознакомьтесь со статьей Spring Data Reactive Repositories with MongoDB для получения более подробной информации.

Более того, реактивные репозитории MongoDB поддерживают бесконечные потоки, аннотируя метод запроса с помощью @Tailable . Мы можем аннотировать любой метод репозитория, возвращающий Flux или другие реактивные типы, способные генерировать несколько элементов:

public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
@Tailable
Flux<Log> findByLevel(LogLevel level);
}

Давайте подсчитаем журналы INFO , используя этот метод хвостового репозитория:

private Disposable subscription;

public InfoLogsCounter(LogsRepository repository) {
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
this.subscription = stream.subscribe(logEntity ->
counter.incrementAndGet()
);
}

Точно так же, как и для WarnLogsCounter , мы должны избавиться от подписки, чтобы закрыть поток:

public void close() {
this.subscription.dispose();
}

6. Хвостовые курсоры с MessageListener

Тем не менее, если мы не можем использовать реактивный API, мы можем использовать концепцию обмена сообщениями Spring.

Во-первых, нам нужно создать MessageListenerContainer , который будет обрабатывать отправленные объекты SubscriptionRequest . Синхронный драйвер MongoDB создает длительную блокирующую задачу, которая прослушивает новые документы в закрытой коллекции.

Spring Data MongoDB поставляется с реализацией по умолчанию, способной создавать и выполнять экземпляры Task для TailableCursorRequest:

private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();

public ErrorLogsCounter(MongoTemplate mongoTemplate,
String collectionName) {
this.collectionName = collectionName;
this.container = new DefaultMessageListenerContainer(mongoTemplate);

container.start();
TailableCursorRequest<Log> request = getTailableCursorRequest();
container.register(request, Log.class);
}

private TailableCursorRequest<Log> getTailableCursorRequest() {
MessageListener<Document, Log> listener = message ->
counter.incrementAndGet();

return TailableCursorRequest.builder()
.collection(collectionName)
.filter(query(where("level").is(LogLevel.ERROR)))
.publishTo(listener)
.build();
}

TailableCursorRequest создает запрос, фильтрующий только журналы уровня ERROR . Каждый соответствующий документ будет опубликован в MessageListener , который увеличит счетчик.

Обратите внимание, что нам все еще нужно убедиться, что первоначальный запрос возвращает некоторые результаты. В противном случае хвостовой курсор будет немедленно закрыт.

Кроме того, мы не должны забывать останавливать контейнер, когда он нам больше не нужен:

public void close() {
container.stop();
}

7. Заключение

Ограниченные коллекции MongoDB с хвостовыми курсорами помогают нам получать информацию из базы данных непрерывным образом. Мы можем запустить запрос, который будет давать результаты до тех пор, пока не будет явно закрыт. Spring Data MongoDB предлагает нам как блокирующий, так и реактивный способ использования хвостовых курсоров.

Исходный код полного примера доступен на GitHub .