1. Введение
Apache Pulsar — это распределенная система обмена сообщениями с открытым исходным кодом, основанная на публикации/подписке, разработанная в Yahoo .
Он был создан для поддержки критически важных приложений Yahoo, таких как Yahoo Mail, Yahoo Finance, Yahoo Sports и т. д. Затем, в 2016 году, он был открыт в рамках Apache Software Foundation.
2. Архитектура
Pulsar — это многопользовательское высокопроизводительное решение для обмена сообщениями между серверами . Он состоит из набора брокеров и букмекеров, а также встроенного Apache ZooKeeper
для настройки и управления. Букмекеры от Apache BookKeeper
, которые обеспечивают хранение сообщений до тех пор, пока они не будут использованы.
В кластере у нас будет:
- Несколько кластерных брокеров для обработки входящих сообщений от производителей и отправки сообщений потребителям.
- Apache BookKeeper для поддержки сохранения сообщений
- Apache ZooKeeper для хранения конфигурации кластера
Чтобы лучше понять это, давайте взглянем на архитектурную схему из документации :
3. Основные характеристики
Давайте начнем с краткого обзора некоторых ключевых функций:
- Встроенная поддержка нескольких кластеров
- Поддержка георепликации сообщений между несколькими кластерами.
- Несколько режимов подписки
- Масштабируемость до миллионов тем
- Использует Apache BookKeeper для гарантии доставки сообщений.
- Низкая задержка
Теперь давайте подробно обсудим некоторые ключевые особенности.
3.1. Модель обмена сообщениями
Платформа обеспечивает гибкую модель обмена сообщениями. В общих архитектурах обмена сообщениями есть две модели обмена сообщениями, т. е. организация очереди и издатель/подписчик. Издатель/подписчик — это широковещательная система обмена сообщениями, в которой сообщение отправляется всем потребителям. С другой стороны, очередь — это двухточечная связь.
Pulsar сочетает обе концепции в одном обобщенном API . Издатель публикует сообщения в разные темы. Затем эти сообщения транслируются на все подписки.
Потребители подписываются, чтобы получать сообщения. Библиотека позволяет потребителям выбирать различные способы использования сообщений в одной и той же подписке, включая эксклюзивный, общий и отказоустойчивый. Мы подробно обсудим эти типы подписки в последующих разделах.
3.2. Режимы развертывания
Pulsar имеет встроенную поддержку развертывания в различных средах . Это означает, что мы можем использовать его на стандартных локальных компьютерах или развернуть в кластере Kubernetes, Google или AWS Cloud.
Он может быть выполнен как единый узел для целей разработки и тестирования. В этом случае все компоненты (брокер, BookKeeper и ZooKeeper) выполняются в одном процессе.
3.3. Георепликация
Библиотека обеспечивает готовую поддержку георепликации данных. Мы можем включить репликацию сообщений между несколькими кластерами, настроив разные географические регионы.
Данные сообщений реплицируются почти в реальном времени. В случае сбоя сети в кластерах данные всегда в безопасности и сохраняются в BookKeeper. Система репликации продолжает повторять попытки до тех пор, пока репликация не завершится успешно.
Функция георепликации также позволяет организации развертывать Pulsar у разных облачных провайдеров и реплицировать данные . Это помогает им избежать использования проприетарных API-интерфейсов облачных провайдеров.
3.4. Постоянство
После того, как Pulsar прочитает и подтвердит данные, это гарантирует отсутствие потери данных . Долговечность данных связана с количеством дисков, настроенных для хранения данных.
Pulsar обеспечивает надежность, используя букмекерские конторы (экземпляр Apache BookKeeper), работающие в узлах хранения. Всякий раз, когда букмекер получает сообщение, он сохраняет копию в памяти, а также записывает данные в WAL (Write Ahead Log). Этот журнал работает так же, как база данных WAL. Букмекеры работают по принципу транзакций базы данных и гарантируют, что данные не будут потеряны даже в случае сбоя машины.
Помимо вышеперечисленного, Pulsar также может выдерживать множественные сбои узлов. Библиотека реплицирует данные нескольким букмекерам, а затем отправляет сообщение с подтверждением производителю. Этот механизм гарантирует нулевую потерю данных даже в случае множественных отказов оборудования.
4. Настройка одного узла
Теперь давайте посмотрим, как настроить кластер Apache Pulsar с одним узлом.
Apache также предоставляет простой клиентский API с привязками для Java, Python и C++ . Позже мы создадим простой пример производителя и подписки Java.
4.1. Монтаж
Apache Pulsar доступен в виде бинарного дистрибутива. Начнем с его загрузки:
wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz
Когда загрузка будет завершена, мы можем разархивировать zip-файл. Разархивированная раздача будет содержать папку bin, conf, example, licenses
и lib
.
После этого нам нужно скачать встроенные коннекторы. Теперь они поставляются отдельным пакетом:
wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
Разархивируем коннекторы и скопируем папку Connectors
в папку Pulsar.
4.2. Запуск экземпляра
Чтобы запустить автономный экземпляр, мы можем выполнить:
bin/pulsar standalone
5. Java-клиент
Теперь мы создадим проект Java для создания и использования сообщений. Мы также создадим примеры для разных типов подписки.
5.1. Настройка проекта
Начнем с добавления в наш проект зависимости pulsar-client :
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.1.1-incubating</version>
</dependency>
5.2. Режиссер
Давайте продолжим, создав пример Producer .
Здесь мы создадим тему и производителя.
Во-первых, нам нужно создать PulsarClient
, который будет подключаться к службе Pulsar на определенном хосте и порту, используя собственный протокол. Многие производители и потребители могут совместно использовать один клиентский объект.
Теперь мы создадим производителя
с конкретным названием темы:
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<byte[]> producer = client.newProducer()
.topic(TOPIC_NAME)
.compressionType(CompressionType.LZ4)
.create();
Производитель отправит 5 сообщений:
IntStream.range(1, 5).forEach(i -> {
String content = String.format("hi-pulsar-%d", i);
Message<byte[]> msg = MessageBuilder.create()
.setContent(content.getBytes())
.build();
MessageId msgId = producer.send(msg);
});
5.3. Потребитель
Далее мы создадим потребителя, чтобы получать сообщения, созданные производителем. Потребителю также требуется тот же PulsarClient
для подключения к нашему серверу:
Consumer<byte[]> consumer = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME)
.subscribe();
Здесь мы создали клиент с типом подписки Shared
.
Это позволяет нескольким потребителям подключаться к одной и той же подписке и получать сообщения.
5.4. Типы подписки для потребителей
В приведенном выше примере потребителя мы создали подписку с общим
типом. Мы также можем создавать эксклюзивные
и отказоустойчивые
подписки.
Эксклюзивная
подписка позволяет подписаться только одному потребителю .
С другой стороны, отказоустойчивая подписка
позволяет пользователю определить резервного потребителя на случай, если один потребитель выйдет из строя, как показано на этой диаграмме Apache:
6. Заключение
В этой статье мы выделили особенности системы обмена сообщениями Pulsar, такие как модель обмена сообщениями, георепликация и надежные гарантии надежности.
Мы также узнали, как настроить отдельный узел и как использовать Java-клиент.
Как всегда, полную реализацию этого туториала можно найти на Github .