Перейти к основному содержимому

Введение в Атомикс

· 5 мин. чтения

1. Обзор

Большинству распределенных приложений требуется, чтобы какой-либо компонент с отслеживанием состояния был непротиворечивым и отказоустойчивым. Atomix — это встраиваемая библиотека, помогающая добиться отказоустойчивости и согласованности для распределенных ресурсов.

Он предоставляет богатый набор API для управления своими ресурсами, такими как коллекции, группы и инструменты для параллелизма.

Для начала нам нужно добавить следующую зависимость Maven в наш pom:

<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-all</artifactId>
<version>1.0.8</version>
</dependency>

Эта зависимость обеспечивает транспорт на основе Netty, необходимый узлам для связи друг с другом.

2. Начальная загрузка кластера

Чтобы начать работу с Atomix, нам нужно сначала загрузить кластер.

Atomix состоит из набора реплик, которые используются для создания распределенных ресурсов с отслеживанием состояния. Каждая реплика поддерживает копию состояния каждого ресурса, существующего в кластере.

Реплики в кластере бывают двух типов: активные и пассивные.

Изменения состояния распределенных ресурсов распространяются через активные реплики, в то время как пассивные реплики синхронизируются для обеспечения отказоустойчивости.

2.1. Начальная загрузка встроенного кластера

Чтобы загрузить кластер с одним узлом, нам нужно сначала создать экземпляр AtomixReplica :

AtomixReplica replica = AtomixReplica.builder(
new Address("localhost", 8700))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();

Здесь реплика настроена с Storage and Transport . Фрагмент кода для объявления хранилища:

Storage storage = Storage.builder()
.withDirectory(new File("logs"))
.withStorageLevel(StorageLevel.DISK)
.build();

Как только реплика объявлена и настроена с хранилищем и транспортом, мы можем загрузить ее, просто вызвав bootstrap() , которая возвращает CompletableFuture , который можно использовать для блокировки до тех пор, пока сервер не будет загружен, вызвав связанный блокирующий метод join() :

CompletableFuture<AtomixReplica> future = replica.bootstrap();
future.join();

До сих пор мы построили кластер с одним узлом. Теперь мы можем добавить к нему больше узлов.

Для этого нам нужно создать другие реплики и соединить их с существующим кластером; нам нужно создать новый поток для вызова метода join(Address) :

AtomixReplica replica2 = AtomixReplica.builder(
new Address("localhost", 8701))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();

replica2
.join(new Address("localhost", 8700))
.join();

AtomixReplica replica3 = AtomixReplica.builder(
new Address("localhost", 8702))
.withStorage(storage)
.withTransport(new NettyTransport())
.build();

replica3.join(
new Address("localhost", 8700),
new Address("localhost", 8701))
.join();

Теперь у нас есть загруженный кластер из трех узлов. В качестве альтернативы мы можем загрузить кластер, передав список адресов в метод начальной загрузки (List<Address>) :

List<Address> cluster = Arrays.asList(
new Address("localhost", 8700),
new Address("localhost", 8701),
new Address("localhsot", 8702));

AtomixReplica replica1 = AtomixReplica
.builder(cluster.get(0))
.build();
replica1.bootstrap(cluster).join();

AtomixReplica replica2 = AtomixReplica
.builder(cluster.get(1))
.build();

replica2.bootstrap(cluster).join();

AtomixReplica replica3 = AtomixReplica
.builder(cluster.get(2))
.build();

replica3.bootstrap(cluster).join();

Нам нужно создать новый поток для каждой реплики.

2.2. Начальная загрузка автономного кластера

Сервер Atomix можно запустить как автономный сервер, который можно загрузить с Maven Central. Проще говоря — это Java-архив, который можно запустить через терминал, предоставив

Проще говоря — это Java-архив, который можно запустить через терминал, указав параметр host: port во флаге адреса и используя флаг -bootstrap .

Вот команда для начальной загрузки кластера:

java -jar atomix-standalone-server.jar 
-address 127.0.0.1:8700 -bootstrap -config atomix.properties

Здесь atomix.properties — это файл конфигурации для настройки хранилища и транспорта. Чтобы создать многоузловой кластер, мы можем добавить узлы в существующий кластер, используя флаг -join .

Формат для него:

java -jar atomix-standalone-server.jar 
-address 127.0.0.1:8701 -join 127.0.0.1:8700

3. Работа с клиентом

Atomix поддерживает создание клиента для удаленного доступа к своему кластеру через API AtomixClient .

Поскольку клиенты не должны сохранять состояние, AtomixClient не имеет никакого хранилища. Нам просто нужно настроить транспорт при создании клиента, так как транспорт будет использоваться для связи с кластером.

Создадим клиент с транспортом:

AtomixClient client = AtomixClient.builder()
.withTransport(new NettyTransport())
.build();

Теперь нам нужно подключить клиента к кластеру.

Мы можем объявить список адресов и передать список в качестве аргумента методу connect() клиента:

client.connect(cluster)
.thenRun(() -> {
System.out.println("Client is connected to the cluster!");
});

4. Работа с ресурсами

Истинная сила Atomix заключается в мощном наборе API для создания распределенных ресурсов и управления ими. Ресурсы реплицируются и сохраняются в кластере, а также поддерживаются реплицированным конечным автоматом , который управляется базовой реализацией Raft Consensus Protocol.

Распределенные ресурсы могут создаваться и управляться одним из методов get() . Мы можем создать экземпляр распределенного ресурса из AtomixReplica .

Учитывая , что реплика является экземпляром AtomixReplica , фрагмент кода для создания ресурса распределенной карты и установки для него значения:

replica.getMap("map")
.thenCompose(m -> m.put("bar", "Hello world!"))
.thenRun(() -> System.out.println("Value is set in Distributed Map"))
.join();

Здесь метод join() будет блокировать программу до тех пор, пока ресурс не будет создан и ему не будет присвоено значение. Мы можем получить тот же объект с помощью AtomixClient и получить значение с помощью метода get("bar") .

Мы можем использовать метод get() в конце, чтобы дождаться результата:

String value = client.getMap("map"))
.thenCompose(m -> m.get("bar"))
.thenApply(a -> (String) a)
.get();

5. Согласованность и отказоустойчивость

Atomix используется для критически важных небольших наборов данных, для которых непротиворечивость является гораздо большей проблемой, чем доступность.

Он обеспечивает строгую настраиваемую согласованность за счет линеаризации как для чтения, так и для записи . В линеаризуемости после фиксации записи все клиенты гарантированно будут осведомлены о результирующем состоянии.

Согласованность в кластере Atomix гарантируется базовым алгоритмом консенсуса Raft, где избранный лидер будет иметь все записи, которые ранее были успешными.

Все новые записи будут проходить через лидера кластера и синхронно реплицироваться на большую часть сервера перед завершением.

Для обеспечения отказоустойчивости большинство серверов кластера должны быть в рабочем состоянии . Если выйдет из строя меньшинство узлов, узлы будут помечены как неактивные и будут заменены пассивными узлами или резервными узлами.

В случае сбоя лидера оставшиеся серверы в кластере начнут новые выборы лидера. При этом кластер будет недоступен.

В случае раздела, если лидер находится на стороне разделения, не имеющей кворума, он уходит в отставку, и новый лидер избирается на стороне с кворумом.

И, если лидер на стороне большинства, так и будет продолжаться без изменений. Когда разделение будет разрешено, узлы на стороне, не входящей в кворум, присоединятся к кворуму и соответствующим образом обновят свой журнал.

6. Заключение

Как и ZooKeeper, Atomix предоставляет надежный набор библиотек для решения проблем распределенных вычислений.

И, как всегда, полный исходный код для этой задачи доступен на GitHub .