Перейти к основному содержимому

Введение в бесконфликтные реплицированные типы данных

· 5 мин. чтения

1. Обзор

В этой статье мы рассмотрим бесконфликтные реплицированные типы данных (CRDT) и способы работы с ними в Java. Для наших примеров мы будем использовать реализации из библиотеки wurmloch-crdt .

Когда у нас есть кластер из N узлов-реплик в распределенной системе, мы можем столкнуться с сетевым разделом — некоторые узлы временно не могут общаться друг с другом . Эта ситуация называется разделенным мозгом.

Когда у нас в системе работает split-brain, некоторые запросы на запись — даже для одного и того же пользователя — могут уходить на разные реплики, не связанные друг с другом . Когда возникает такая ситуация, наша система по-прежнему доступна, но не является согласованной .

Нам нужно решить, что делать с несогласованными записями и данными, когда сеть между двумя разделенными кластерами снова начнет работать.

2. Бесконфликтные реплицированные типы данных спешат на помощь

Давайте рассмотрим два узла, A и B , которые отключились из-за разделения мозга.

Допустим, пользователь меняет свой логин и запрос идет к узлу А. Затем он снова решает изменить его, но на этот раз запрос идет к узлу B.

Из-за разделения мозга два узла не связаны. Нам нужно решить, как должен выглядеть логин этого пользователя, когда сеть снова заработает.

Мы можем использовать несколько стратегий: мы можем предоставить пользователю возможность разрешать конфликты (как это делается в Google Docs), или мы можем использовать CRDT для слияния данных из разрозненных реплик для нас.

3. Зависимость от Maven

Во-первых, давайте добавим в библиотеку зависимость, которая предоставляет набор полезных CRDT:

<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>

Последнюю версию можно найти на Maven Central .

4. Набор только для выращивания

Самый простой CRDT — это набор только для роста. Элементы можно только добавлять в GSet и никогда не удалять. Когда GSet расходится, его можно легко объединить, вычислив объединение двух наборов.

Во-первых, давайте создадим две реплики для имитации распределенной структуры данных и соединим эти две реплики с помощью метода connect() :

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

Получив две реплики в нашем кластере, мы можем создать GSet на первой реплике и сослаться на нее на второй реплике:

GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();

На данный момент наш кластер работает как положено, и есть активное соединение между двумя репликами. Мы можем добавить в набор два элемента из двух разных реплик и утверждать, что набор содержит одни и те же элементы на обеих репликах:

replica1.add("apple");
replica2.add("banana");

assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");

Допустим, вдруг у нас есть сетевой раздел и нет связи между первой и второй репликами. Мы можем смоделировать сетевой раздел, используя метод disconnect() :

crdtStore1.disconnect(crdtStore2);

Затем, когда мы добавляем элементы в набор данных из обеих реплик, эти изменения не видны глобально, потому что между ними нет связи:

replica1.add("strawberry");
replica2.add("pear");

assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");

Как только соединение между обоими элементами кластера снова установлено, GSet внутренне объединяется с использованием объединения обоих наборов, и обе реплики снова становятся согласованными:

crdtStore1.connect(crdtStore2);

assertThat(replica1)
.contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
.contains("apple", "banana", "strawberry", "pear");

5. Инкрементный счетчик

Счетчик только приращений — это CRDT, который агрегирует все приращения локально на каждом узле.

Когда реплики синхронизируются, после сетевого раздела результирующее значение вычисляется путем суммирования всех приращений на всех узлах — это похоже на LongAdder из java.concurrent, но на более высоком уровне абстракции.

Давайте создадим счетчик только для приращения с помощью GCounter и будем увеличивать его из обеих реплик. Мы видим, что сумма рассчитана правильно:

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();

replica1.increment();
replica2.increment(2L);

assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);

Когда мы отключаем оба члена кластера и выполняем операции локального увеличения, мы видим, что значения несовместимы:

crdtStore1.disconnect(crdtStore2);

replica1.increment(3L);
replica2.increment(5L);

assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);

Но как только кластер снова станет работоспособным, приращения будут объединены, что даст правильное значение:

crdtStore1.connect(crdtStore2);

assertThat(replica1.get())
.isEqualTo(11L);
assertThat(replica2.get())
.isEqualTo(11L);

6. Счетчик PN

Используя аналогичное правило для счетчика только приращения, мы можем создать счетчик, который может как увеличиваться, так и уменьшаться. PNCounter хранит все приращения и уменьшения отдельно.

При синхронизации реплик результирующее значение будет равно сумме всех приращений минус сумма всех декрементов :

@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();

replica1.increment();
replica2.decrement(2L);

assertThat(replica1.get()).isEqualTo(-1L);
assertThat(replica2.get()).isEqualTo(-1L);

crdtStore1.disconnect(crdtStore2);

replica1.decrement(3L);
replica2.increment(5L);

assertThat(replica1.get()).isEqualTo(-4L);
assertThat(replica2.get()).isEqualTo(4L);

crdtStore1.connect(crdtStore2);

assertThat(replica1.get()).isEqualTo(1L);
assertThat(replica2.get()).isEqualTo(1L);
}

7. Регистрация побед последнего автора

Иногда у нас есть более сложные бизнес-правила, и работы с наборами или счетчиками недостаточно. Мы можем использовать регистр Last-Writer-Wins, который сохраняет только последнее обновленное значение при слиянии разнородных наборов данных . Кассандра использует эту стратегию для разрешения конфликтов.

Мы должны быть очень осторожны при использовании этой стратегии, потому что она отбрасывает изменения, которые произошли за это время .

Создадим кластер из двух реплик и экземпляров класса LWWRegister :

LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);

LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();

replica1.set("apple");
replica2.set("banana");

assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");

Когда первая реплика устанавливает значение на яблоко , а вторая — на банан, LWWRegister сохраняет только последнее значение.

Давайте посмотрим, что произойдет, если кластер отключится:

crdtStore1.disconnect(crdtStore2);

replica1.set("strawberry");
replica2.set("pear");

assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");

Каждая реплика хранит свою локальную копию несогласованных данных. Когда мы вызываем метод set() , LWWRegister внутренне присваивает специальное значение версии, которое идентифицирует конкретное обновление для каждого использования алгоритма VectorClock .

Когда кластер синхронизируется, он берет значение с самой высокой версией и отбрасывает все предыдущие обновления :

crdtStore1.connect(crdtStore2);

assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");

8. Заключение

В этой статье мы показали проблему согласованности распределенных систем при сохранении доступности.

В случае сетевых разделов нам необходимо объединить разрозненные данные при синхронизации кластера. Мы увидели, как использовать CRDT для объединения разнородных данных.

Все эти примеры и фрагменты кода можно найти в проекте GitHub — это проект Maven, поэтому его легко импортировать и запускать как есть.