1. Введение
Современным приложениям иногда требуется реплика базы данных, поисковый индекс для выполнения операции поиска, кэш-хранилище для ускорения чтения данных и хранилище данных для сложной аналитики данных.
Необходимость поддержки различных моделей данных и шаблонов доступа к данным представляет собой распространенную проблему, которую приходится решать большинству веб-разработчиков программного обеспечения, и именно здесь на помощь приходит система сбора измененных данных (CDC)!
В этой статье мы начнем с краткого обзора CDC и сосредоточимся на Debezium, платформе, обычно используемой для CDC .
2. Что такое CDC?
В этом разделе мы увидим, что такое CDC, основные преимущества его использования и некоторые распространенные варианты использования.
2.1. Изменить захват данных
Сбор измененных данных (CDC) — это метод и шаблон проектирования. Мы часто используем его для репликации данных между базами данных в режиме реального времени.
Мы также можем отслеживать изменения данных, записываемых в исходную базу данных, и автоматически синхронизировать целевые базы данных. CDC обеспечивает добавочную загрузку и устраняет необходимость обновления массовой загрузки .
2.2. Преимущества CDC
Сегодня большинство компаний по-прежнему используют пакетную обработку для синхронизации данных между своими системами. Использование пакетной обработки:
- Данные не синхронизируются сразу
- Больше выделенных ресурсов используется для синхронизации баз данных
- Репликация данных происходит только в указанные периоды пакетной обработки.
Однако сбор измененных данных дает некоторые преимущества:
- Постоянно отслеживает изменения в исходной базе данных
- Мгновенно обновляет целевую базу данных
- Использует потоковую обработку, чтобы гарантировать мгновенные изменения
Благодаря CDC различные базы данных постоянно синхронизируются , и массовый выбор остался в прошлом. Более того, стоимость передачи данных снижается, поскольку CDC передает только добавочные изменения.
2.3. Распространенные варианты использования CDC
CDC может помочь нам решить различные варианты использования, такие как репликация данных путем синхронизации различных источников данных, обновление или аннулирование кеша, обновление поисковых индексов, синхронизация данных в микросервисах и многое другое.
Теперь, когда мы немного знаем о том, что может сделать CDC, давайте посмотрим, как это реализовано в одном из известных инструментов с открытым исходным кодом.
3. Платформа Дебезиум
В этом разделе мы познакомим вас с Debezium , подробно рассмотрим его архитектуру и рассмотрим различные способы его развертывания.
3.1. Что такое дебезиум?
Debezium — это платформа с открытым исходным кодом для CDC, построенная поверх Apache Kafka . Его основное назначение — запись всех изменений на уровне строк, зафиксированных в каждой исходной таблице базы данных, в журнале транзакций. Каждое приложение, прослушивающее эти события, может выполнять необходимые действия на основе добавочных изменений данных.
Debezium предоставляет библиотеку коннекторов, поддерживающую несколько баз данных, таких как MySQL, MongoDB, PostgreSQL и другие.
Эти соединители могут отслеживать и записывать изменения в базе данных и публиковать их в службе потоковой передачи, такой как Kafka.
Более того, Debezium отслеживает, даже если наши приложения не работают . После перезапуска он начнет потреблять события с того места, где остановился, поэтому ничего не пропускает.
3.2. Дебезиум Архитектура
Развертывание Debezium зависит от имеющейся у нас инфраструктуры, но чаще всего мы используем Apache Kafka Connect.
Kafka Connect — это платформа, которая работает как отдельная служба вместе с брокером Kafka. Мы использовали его для потоковой передачи данных между Apache Kafka и другими системами.
Мы также можем определить соединители для передачи данных в Kafka и из него.
На приведенной ниже диаграмме показаны различные части конвейера отслеживания измененных данных на основе Debezium:
Во-первых, слева у нас есть исходная база данных MySQL, данные которой мы хотим скопировать и использовать в целевой базе данных, такой как PostgreSQL или любой аналитической базе данных.
Во-вторых, соединитель Kafka Connect анализирует и интерпретирует журнал транзакций и записывает его в тему Kafka.
Затем Kafka действует как брокер сообщений для надежной передачи набора изменений в целевые системы.
Затем справа у нас есть соединители Kafka, опрашивающие Kafka и отправляющие изменения в целевые базы данных.
Debezium использует Kafka в своей архитектуре , но также предлагает другие методы развертывания для удовлетворения потребностей нашей инфраструктуры.
Мы можем использовать его как автономный сервер с сервером Debezium, или мы можем встроить его в код нашего приложения в виде библиотеки.
Мы увидим эти методы в следующих разделах.
3.3. Дебезиум Сервер
Debezium предоставляет автономный сервер для регистрации изменений исходной базы данных. Он настроен на использование одного из исходных соединителей Debezium.
Кроме того, эти соединители отправляют события изменений в различные инфраструктуры обмена сообщениями, такие как Amazon Kinesis или Google Cloud Pub/Sub.
3.4. Встроенный дебезиум
Kafka Connect обеспечивает отказоустойчивость и масштабируемость при использовании для развертывания Debezium. Однако иногда нашим приложениям не требуется такой уровень надежности, и мы хотим минимизировать стоимость нашей инфраструктуры.
К счастью, мы можем сделать это, встроив движок Debezium в наше приложение . После этого мы должны настроить коннекторы.
4. Настройка
В этом разделе мы начнем сначала с архитектуры нашего приложения. Затем мы увидим, как настроить нашу среду, и выполним некоторые основные шаги для интеграции Debezium.
Давайте начнем с представления нашего приложения.
4.1. Пример архитектуры приложения
Чтобы сделать наше приложение простым, мы создадим приложение Spring Boot для управления клиентами.
В нашей модели клиента есть поля ID
, fullname
и email .
Для уровня доступа к данным мы будем использовать Spring Data JPA .
Прежде всего, наше приложение будет работать со встроенной версией Debezium. Давайте визуализируем архитектуру этого приложения:
Во-первых, Debezium Engine будет отслеживать журналы транзакций таблицы клиентов
в исходной базе данных MySQL (из другой системы или приложения).
Во-вторых, всякий раз, когда мы выполняем операцию с базой данных, такую как вставка/обновление/удаление в таблице клиентов
, коннектор Debezium будет вызывать метод службы.
Наконец, на основе этих событий этот метод синхронизирует данные таблицы клиентов
с целевой базой данных MySQL (основной базой данных нашего приложения).
4.2. Зависимости Maven
Давайте начнем с добавления необходимых зависимостей в наш pom.xml
:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>1.4.2.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.4.2.Final</version>
</dependency>
Точно так же мы добавляем зависимости для каждого из коннекторов Debezium, которые будет использовать наше приложение.
В нашем случае мы будем использовать коннектор MySQL :
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.4.2.Final</version>
</dependency>
4.3. Установка баз данных
Мы можем установить и настроить наши базы данных вручную. Однако, чтобы ускорить процесс, мы будем использовать файл docker-compose :
version: "3.9"
services:
# Install Source MySQL DB and setup the Customer database
mysql-1:
container_name: source-database
image: mysql
ports:
- 3305:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: user
MYSQL_PASSWORD: password
MYSQL_DATABASE: customerdb
# Install Target MySQL DB and setup the Customer database
mysql-2:
container_name: target-database
image: mysql
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: user
MYSQL_PASSWORD: password
MYSQL_DATABASE: customerdb
Этот файл будет запускать два экземпляра базы данных на разных портах.
Мы можем запустить этот файл с помощью команды docker-compose up -d
.
Теперь давайте создадим таблицу клиентов
, запустив скрипт SQL:
CREATE TABLE customer
(
id integer NOT NULL,
fullname character varying(255),
email character varying(255),
CONSTRAINT customer_pkey PRIMARY KEY (id)
);
5. Конфигурация
В этом разделе мы настроим Debezium MySQL Connector и посмотрим, как запустить Embedded Debezium Engine.
5.1. Настройка соединителя Debezium
Чтобы настроить наш Debezium MySQL Connector, мы создадим bean-компонент конфигурации Debezium:
@Bean
public io.debezium.config.Configuration customerConnector() {
return io.debezium.config.Configuration.create()
.with("name", "customer-mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets.dat")
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", customerDbHost)
.with("database.port", customerDbPort)
.with("database.user", customerDbUsername)
.with("database.password", customerDbPassword)
.with("database.dbname", customerDbName)
.with("database.include.list", customerDbName)
.with("include.schema.changes", "false")
.with("database.server.id", "10181")
.with("database.server.name", "customer-mysql-db-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/tmp/dbhistory.dat")
.build();
}
Рассмотрим эту конфигурацию более подробно.
Метод create
этого bean -компонента использует построитель для создания объекта Properties
.
Этот построитель задает несколько свойств, необходимых движку, независимо от предпочтительного соединителя. Для отслеживания исходной базы данных MySQL мы используем класс MySqlConnector
.
Когда этот коннектор запускается, он начинает отслеживать изменения из источника и записывает «смещения», чтобы определить , сколько данных он обработал из журнала транзакций .
Есть несколько способов сохранить эти смещения, но в этом примере мы будем использовать класс FileOffsetBackingStore
для хранения смещений в нашей локальной файловой системе.
Последние несколько параметров соединителя — это свойства базы данных MySQL.
Теперь, когда у нас есть конфигурация, мы можем создать наш движок.
5.2. Запуск двигателя Debezium
DebeziumEngine служит оболочкой для нашего коннектора MySQL .
Давайте создадим движок, используя конфигурацию коннектора:
private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(customerConnectorConfiguration.asProperties())
.notifying(this::handleEvent)
.build();
this.customerService = customerService;
}
Более того, движок будет вызывать метод для каждого изменения данных — в нашем примере это handleChangeEvent
.
В этом методе сначала мы будем анализировать каждое событие на основе формата, указанного при вызове create().
Затем мы находим, какая операция у нас была, и вызываем CustomerService
для выполнения функций Create/Update/Delete в нашей целевой базе данных:
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if(operation != Operation.READ) {
String record = operation == Operation.DELETE ? BEFORE : AFTER;
Struct struct = (Struct) sourceRecordChangeValue.get(record);
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
this.customerService.replicateData(payload, operation);
}
}
}
Теперь, когда мы настроили объект DebeziumEngine
, давайте запустим его асинхронно с помощью исполнителя службы:
private final Executor executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
6. Дебезиум в действии
Чтобы увидеть наш код в действии, давайте внесем некоторые изменения в данные таблицы клиентов исходной базы данных.
6.1. Вставка записи
Чтобы добавить новую запись в таблицу клиентов
, мы перейдем в оболочку MySQL и запустим:
INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', 'jd@example.com')
После выполнения этого запроса мы увидим соответствующий вывод нашего приложения:
23:57:57.897 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,email=jd@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=jd@example.com} with Operation: CREATE
Наконец, мы проверяем, что новая запись была вставлена в нашу целевую базу данных:
id fullname email
1 John Doe jd@example.com
6.2. Обновление записи
Теперь давайте попробуем обновить нашего последнего вставленного клиента и проверим, что произойдет:
UPDATE customerdb.customer t SET t.email = 'john.doe@example.com' WHERE t.id = 1
После этого мы получим тот же вывод, что и при вставке, за исключением того, что тип операции изменится на «ОБНОВЛЕНИЕ», и, конечно же, запрос, который использует Hibernate, является запросом «обновления»:
00:08:57.893 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=jd@example.com},after=Struct{id=1,fullname=John Doe,email=john.doe@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.doe@example.com} with Operation: UPDATE
Мы можем убедиться, что электронная почта Джона была изменена в нашей целевой базе данных:
id fullname email
1 John Doe john.doe@example.com
6.3. Удаление записи
Теперь мы можем удалить запись в таблице клиентов
, выполнив:
DELETE FROM customerdb.customer WHERE id = 1
Аналогично, здесь у нас есть изменение в операции и запрос снова:
00:12:16.892 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=john.doe@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.doe@example.com} with Operation: DELETE
Мы можем убедиться, что данные были удалены в нашей целевой базе данных:
select * from customerdb.customer where id= 1
0 rows retrieved
7. Заключение
В этой статье мы увидели преимущества CDC и проблемы, которые он может решить. Мы также узнали, что без этого у нас остается массовая загрузка данных, которая отнимает много времени и средств.
Мы также видели Debezium, отличную платформу с открытым исходным кодом, которая может помочь нам с легкостью решать задачи CDC.
Как всегда, полный исходный код статьи доступен на GitHub .