1. Обзор
В этой статье мы рассмотрим бесконфликтные реплицированные типы данных (CRDT) и способы работы с ними в Java. Для наших примеров мы будем использовать реализации из библиотеки wurmloch-crdt
.
Когда у нас есть кластер из N
узлов-реплик в распределенной системе, мы можем столкнуться с сетевым разделом — некоторые узлы временно не могут общаться друг с другом . Эта ситуация называется разделенным мозгом.
Когда у нас в системе работает split-brain, некоторые запросы на запись — даже для одного и того же пользователя — могут уходить на разные реплики, не связанные друг с другом . Когда возникает такая ситуация, наша система по-прежнему доступна, но не является согласованной .
Нам нужно решить, что делать с несогласованными записями и данными, когда сеть между двумя разделенными кластерами снова начнет работать.
2. Бесконфликтные реплицированные типы данных спешат на помощь
Давайте рассмотрим два узла, A
и B
, которые отключились из-за разделения мозга.
Допустим, пользователь меняет свой логин и запрос идет к узлу А.
Затем он снова решает изменить его, но на этот раз запрос идет к узлу B.
Из-за разделения мозга два узла не связаны. Нам нужно решить, как должен выглядеть логин этого пользователя, когда сеть снова заработает.
Мы можем использовать несколько стратегий: мы можем предоставить пользователю возможность разрешать конфликты (как это делается в Google Docs), или мы можем использовать CRDT для слияния данных из разрозненных реплик для нас.
3. Зависимость от Maven
Во-первых, давайте добавим в библиотеку зависимость, которая предоставляет набор полезных CRDT:
<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>
Последнюю версию можно найти на Maven Central .
4. Набор только для выращивания
Самый простой CRDT — это набор только для роста. Элементы можно только добавлять в GSet
и никогда не удалять. Когда GSet
расходится, его можно легко объединить, вычислив объединение двух наборов.
Во-первых, давайте создадим две реплики для имитации распределенной структуры данных и соединим эти две реплики с помощью метода connect()
:
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
Получив две реплики в нашем кластере, мы можем создать GSet
на первой реплике и сослаться на нее на второй реплике:
GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();
На данный момент наш кластер работает как положено, и есть активное соединение между двумя репликами. Мы можем добавить в набор два элемента из двух разных реплик и утверждать, что набор содержит одни и те же элементы на обеих репликах:
replica1.add("apple");
replica2.add("banana");
assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");
Допустим, вдруг у нас есть сетевой раздел и нет связи между первой и второй репликами. Мы можем смоделировать сетевой раздел, используя метод disconnect()
:
crdtStore1.disconnect(crdtStore2);
Затем, когда мы добавляем элементы в набор данных из обеих реплик, эти изменения не видны глобально, потому что между ними нет связи:
replica1.add("strawberry");
replica2.add("pear");
assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");
Как только соединение между обоими элементами кластера снова установлено, GSet
внутренне объединяется с использованием объединения обоих наборов, и обе реплики снова становятся согласованными:
crdtStore1.connect(crdtStore2);
assertThat(replica1)
.contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
.contains("apple", "banana", "strawberry", "pear");
5. Инкрементный счетчик
Счетчик только приращений — это CRDT, который агрегирует все приращения локально на каждом узле.
Когда реплики синхронизируются, после сетевого раздела результирующее значение вычисляется путем суммирования всех приращений на всех узлах — это похоже на LongAdder
из java.concurrent,
но на более высоком уровне абстракции.
Давайте создадим счетчик только для приращения с помощью GCounter
и будем увеличивать его из обеих реплик. Мы видим, что сумма рассчитана правильно:
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();
replica1.increment();
replica2.increment(2L);
assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);
Когда мы отключаем оба члена кластера и выполняем операции локального увеличения, мы видим, что значения несовместимы:
crdtStore1.disconnect(crdtStore2);
replica1.increment(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);
Но как только кластер снова станет работоспособным, приращения будут объединены, что даст правильное значение:
crdtStore1.connect(crdtStore2);
assertThat(replica1.get())
.isEqualTo(11L);
assertThat(replica2.get())
.isEqualTo(11L);
6. Счетчик PN
Используя аналогичное правило для счетчика только приращения, мы можем создать счетчик, который может как увеличиваться, так и уменьшаться. PNCounter хранит
все приращения и уменьшения отдельно.
При синхронизации реплик результирующее значение будет равно сумме всех приращений минус сумма всех декрементов :
@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();
replica1.increment();
replica2.decrement(2L);
assertThat(replica1.get()).isEqualTo(-1L);
assertThat(replica2.get()).isEqualTo(-1L);
crdtStore1.disconnect(crdtStore2);
replica1.decrement(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(-4L);
assertThat(replica2.get()).isEqualTo(4L);
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo(1L);
assertThat(replica2.get()).isEqualTo(1L);
}
7. Регистрация побед последнего автора
Иногда у нас есть более сложные бизнес-правила, и работы с наборами или счетчиками недостаточно. Мы можем использовать регистр Last-Writer-Wins, который сохраняет только последнее обновленное значение при слиянии разнородных наборов данных . Кассандра использует эту стратегию для разрешения конфликтов.
Мы должны быть очень осторожны при использовании этой стратегии, потому что она отбрасывает изменения, которые произошли за это время .
Создадим кластер из двух реплик и экземпляров класса LWWRegister
:
LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);
LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();
replica1.set("apple");
replica2.set("banana");
assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");
Когда первая реплика устанавливает значение на яблоко
, а вторая — на банан,
LWWRegister сохраняет
только последнее значение.
Давайте посмотрим, что произойдет, если кластер отключится:
crdtStore1.disconnect(crdtStore2);
replica1.set("strawberry");
replica2.set("pear");
assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");
Каждая реплика хранит свою локальную копию несогласованных данных. Когда мы вызываем метод set()
, LWWRegister
внутренне присваивает специальное значение версии, которое идентифицирует конкретное обновление для каждого использования алгоритма VectorClock
.
Когда кластер синхронизируется, он берет значение с самой высокой версией и отбрасывает все предыдущие обновления :
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");
8. Заключение
В этой статье мы показали проблему согласованности распределенных систем при сохранении доступности.
В случае сетевых разделов нам необходимо объединить разрозненные данные при синхронизации кластера. Мы увидели, как использовать CRDT для объединения разнородных данных.
Все эти примеры и фрагменты кода можно найти в проекте GitHub — это проект Maven, поэтому его легко импортировать и запускать как есть.