1. Обзор
Большинству распределенных приложений требуется, чтобы какой-либо компонент с отслеживанием состояния был непротиворечивым и отказоустойчивым. Atomix — это встраиваемая библиотека, помогающая добиться отказоустойчивости и согласованности для распределенных ресурсов.
Он предоставляет богатый набор API для управления своими ресурсами, такими как коллекции, группы и инструменты для параллелизма.
Для начала нам нужно добавить следующую зависимость Maven в наш pom:
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-all</artifactId>
<version>1.0.8</version>
</dependency>
Эта зависимость обеспечивает транспорт на основе Netty, необходимый узлам для связи друг с другом.
2. Начальная загрузка кластера
Чтобы начать работу с Atomix, нам нужно сначала загрузить кластер.
Atomix состоит из набора реплик, которые используются для создания распределенных ресурсов с отслеживанием состояния. Каждая реплика поддерживает копию состояния каждого ресурса, существующего в кластере.
Реплики в кластере бывают двух типов: активные и пассивные.
Изменения состояния распределенных ресурсов распространяются через активные реплики, в то время как пассивные реплики синхронизируются для обеспечения отказоустойчивости.
2.1. Начальная загрузка встроенного кластера
Чтобы загрузить кластер с одним узлом, нам нужно сначала создать экземпляр AtomixReplica
:
AtomixReplica replica = AtomixReplica.builder(
new Address("localhost", 8700))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
Здесь реплика настроена с Storage
and Transport
. Фрагмент кода для объявления хранилища:
Storage storage = Storage.builder()
.withDirectory(new File("logs"))
.withStorageLevel(StorageLevel.DISK)
.build();
Как только реплика объявлена и настроена с хранилищем и транспортом, мы можем загрузить ее, просто вызвав bootstrap()
, которая возвращает CompletableFuture
, который можно использовать для блокировки до тех пор, пока сервер не будет загружен, вызвав связанный блокирующий метод join() :
CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();
До сих пор мы построили кластер с одним узлом. Теперь мы можем добавить к нему больше узлов.
Для этого нам нужно создать другие реплики и соединить их с существующим кластером; нам нужно создать новый поток для вызова метода join(Address) :
AtomixReplica replica2 = AtomixReplica.builder(
new Address("localhost", 8701))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica2
.join(new Address("localhost", 8700))
.join();
AtomixReplica replica3 = AtomixReplica.builder(
new Address("localhost", 8702))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();
replica3.join(
new Address("localhost", 8700),
new Address("localhost", 8701))
.join();
Теперь у нас есть загруженный кластер из трех узлов. В качестве альтернативы мы можем загрузить кластер, передав список
адресов в метод начальной загрузки (List<Address>)
:
List<Address> cluster = Arrays.asList(
new Address("localhost", 8700),
new Address("localhost", 8701),
new Address("localhsot", 8702));
AtomixReplica replica1 = AtomixReplica
.builder(cluster.get(0))
.build();
replica1.bootstrap(cluster).join();
AtomixReplica replica2 = AtomixReplica
.builder(cluster.get(1))
.build();
replica2.bootstrap(cluster).join();
AtomixReplica replica3 = AtomixReplica
.builder(cluster.get(2))
.build();
replica3.bootstrap(cluster).join();
Нам нужно создать новый поток для каждой реплики.
2.2. Начальная загрузка автономного кластера
Сервер Atomix можно запустить как автономный сервер, который можно загрузить с Maven Central. Проще говоря — это Java-архив, который можно запустить через терминал, предоставив
Проще говоря — это Java-архив, который можно запустить через терминал, указав параметр host: port
во флаге адреса и используя флаг -bootstrap
.
Вот команда для начальной загрузки кластера:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8700 -bootstrap -config atomix.properties
Здесь atomix.properties
— это файл конфигурации для настройки хранилища и транспорта. Чтобы создать многоузловой кластер, мы можем добавить узлы в существующий кластер, используя флаг -join
.
Формат для него:
java -jar atomix-standalone-server.jar
-address 127.0.0.1:8701 -join 127.0.0.1:8700
3. Работа с клиентом
Atomix поддерживает создание клиента для удаленного доступа к своему кластеру через API AtomixClient
.
Поскольку клиенты не должны сохранять состояние, AtomixClient
не имеет никакого хранилища. Нам просто нужно настроить транспорт при создании клиента, так как транспорт будет использоваться для связи с кластером.
Создадим клиент с транспортом:
AtomixClient client = AtomixClient.builder()
.withTransport(new NettyTransport())
.build();
Теперь нам нужно подключить клиента к кластеру.
Мы можем объявить список
адресов
и передать список
в качестве аргумента методу connect()
клиента:
client.connect(cluster)
.thenRun(() -> {
System.out.println("Client is connected to the cluster!");
});
4. Работа с ресурсами
Истинная сила Atomix заключается в мощном наборе API для создания распределенных ресурсов и управления ими. Ресурсы реплицируются и сохраняются в кластере, а также поддерживаются реплицированным конечным автоматом , который управляется базовой реализацией Raft Consensus Protocol.
Распределенные ресурсы могут создаваться и управляться одним из методов get()
. Мы можем создать экземпляр распределенного ресурса из AtomixReplica
.
Учитывая , что реплика
является экземпляром AtomixReplica
, фрагмент кода для создания ресурса распределенной карты и установки для него значения:
replica.getMap("map")
.thenCompose(m -> m.put("bar", "Hello world!"))
.thenRun(() -> System.out.println("Value is set in Distributed Map"))
.join();
Здесь метод join()
будет блокировать программу до тех пор, пока ресурс не будет создан и ему не будет присвоено значение. Мы можем получить тот же объект с помощью AtomixClient
и получить значение с помощью метода get("bar")
.
Мы можем использовать метод get()
в конце, чтобы дождаться результата:
String value = client.getMap("map"))
.thenCompose(m -> m.get("bar"))
.thenApply(a -> (String) a)
.get();
5. Согласованность и отказоустойчивость
Atomix используется для критически важных небольших наборов данных, для которых непротиворечивость является гораздо большей проблемой, чем доступность.
Он обеспечивает строгую настраиваемую согласованность за счет линеаризации как для чтения, так и для записи . В линеаризуемости после фиксации записи все клиенты гарантированно будут осведомлены о результирующем состоянии.
Согласованность в кластере Atomix гарантируется базовым алгоритмом консенсуса Raft, где избранный лидер будет иметь все записи, которые ранее были успешными.
Все новые записи будут проходить через лидера кластера и синхронно реплицироваться на большую часть сервера перед завершением.
Для обеспечения отказоустойчивости большинство серверов кластера должны быть в рабочем состоянии . Если выйдет из строя меньшинство узлов, узлы будут помечены как неактивные и будут заменены пассивными узлами или резервными узлами.
В случае сбоя лидера оставшиеся серверы в кластере начнут новые выборы лидера. При этом кластер будет недоступен.
В случае раздела, если лидер находится на стороне разделения, не имеющей кворума, он уходит в отставку, и новый лидер избирается на стороне с кворумом.
И, если лидер на стороне большинства, так и будет продолжаться без изменений. Когда разделение будет разрешено, узлы на стороне, не входящей в кворум, присоединятся к кворуму и соответствующим образом обновят свой журнал.
6. Заключение
Как и ZooKeeper, Atomix предоставляет надежный набор библиотек для решения проблем распределенных вычислений.
И, как всегда, полный исходный код для этой задачи доступен на GitHub .