1. Введение
ksqlDB можно описать как базу данных потоковой передачи событий в реальном времени, построенную поверх Apache Kafka и Kafka Streams . Он сочетает мощную потоковую обработку с моделью реляционной базы данных с использованием синтаксиса SQL.
В этом руководстве мы рассмотрим основные концепции ksqlDB и создадим пример приложения, чтобы продемонстрировать практический вариант использования.
2. Обзор
Поскольку ksqlDB — это база данных потоковой передачи событий, потоки и таблицы являются ее основными абстракциями. По сути, это наборы данных, которые можно преобразовывать и обрабатывать в режиме реального времени.
Потоковая обработка позволяет выполнять непрерывные вычисления над этими неограниченными потоками событий. Мы можем преобразовывать, фильтровать, агрегировать и объединять коллекции для получения новых коллекций или материализованных представлений с помощью SQL . Кроме того, новые события постоянно обновляют эти коллекции и представления для предоставления данных в реальном времени.
Наконец, запросы публикуют результаты различных операций потоковой обработки. Запросы ksqlDB поддерживают как асинхронные потоки приложений в реальном времени, так и синхронные потоки запросов/ответов, аналогичные традиционной базе данных .
3. Настройка
Чтобы увидеть ksqlDB в действии, мы создадим управляемое событиями приложение Java. Это позволит агрегировать и запрашивать неограниченный поток показаний из различных источников датчиков.
Основной вариант использования — обнаружение ситуаций, когда среднее значение показаний превышает заданный порог в течение определенного периода времени. Кроме того, ключевым требованием является то, что приложение должно предоставлять информацию в режиме реального времени, которую можно было бы, например, использовать при создании информационной панели или системы предупреждений.
Мы будем использовать Java-клиент ksqlDB для взаимодействия с сервером, чтобы создавать таблицы, агрегировать запросы и выполнять различные запросы.
3.1. Докер
Поскольку ksqlDB работает поверх Kafka, мы будем использовать Docker Compose для запуска компонентов Kafka, сервера ksqlDB и клиента CLI ksqlDB:
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
...
broker:
image: confluentinc/cp-kafka:6.2.0
hostname: broker
...
ksqldb-server:
image: confluentinc/ksqldb-server:0.19.0
hostname: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
healthcheck:
test: curl -f http://ksqldb-server:8088/ || exit 1
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.19.0
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
Кроме того, мы также будем использовать этот файл docker-compose.yml
в нашем приложении Java, чтобы раскрутить среду для наших интеграционных тестов с использованием среды Testcontainers .
Во-первых, давайте поднимем стек, запустив:
docker-compose up
Далее давайте подключимся к интерактивному интерфейсу командной строки, как только все службы запустятся. Это полезно для тестирования и взаимодействия с сервером:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Мы также скажем ksqlDB начинать все запросы с самой ранней точки в каждой теме:
ksql> SET 'auto.offset.reset' = 'earliest';
3.2. Зависимости
В этом проекте мы в основном будем использовать клиент Java для взаимодействия с ksqlDB. В частности, мы будем использовать ksqlDB для Confluent Platform (CP), поэтому нам нужно добавить репозиторий CP Maven в наш файл POM:
<repository>
<id>confluent</id>
<name>confluent-repo</name>
<url>http://packages.confluent.io/maven/</url>
</repository>
Теперь добавим зависимость для клиента:
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>6.2.0</version>
</dependency>
4. Агрегация данных в реальном времени
В этом разделе мы увидим, как создать материализованное представление, представляющее агрегацию в реальном времени, требуемую нашим приложением.
4.1. Создание потока
В Kafka топик хранит коллекцию событий. Точно так же в ksqkDB поток представляет события, поддерживаемые темой Kafka .
Давайте начнем с создания нашего потока для хранения входящих данных датчика:
CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)
WITH (KAFKA_TOPIC = 'readings',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp',
TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',
PARTITIONS = 1);
Здесь ksqlDB создает тему чтения
для хранения данных потока в формате JSON. Поскольку события представляют собой временные данные, важно, чтобы каждое показание содержало отметку времени, указывающую время события. Поле метки времени
хранит эти данные в указанном формате. Это гарантирует, что ksqlDB применяет семантику времени события для операций, связанных со временем, и событий не по порядку.
Далее мы создадим экземпляр клиента
с данными подключения к серверу ksqlDB и используем его для выполнения нашего оператора SQL:
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_PORT);
Client client = Client.create(options);
Map<String, Object> properties = Collections.singletonMap(
"auto.offset.reset", "earliest"
);
CompletableFuture<ExecuteStatementResult> result =
client.executeStatement(CREATE_READINGS_STREAM, properties);
Как и ранее с CLI, мы устанавливаем для свойства auto.offset.reset значение «
самый ранний
». Это гарантирует, что при отсутствии смещения Kafka запрос считывает соответствующую тему с самого раннего смещения.
Метод executeStatement
является частью асинхронного API, предоставляемого клиентом. Он немедленно возвращает CompletableFuture
перед отправкой каких-либо запросов на сервер. Затем вызывающий код может решить заблокировать и дождаться завершения (путем вызова метода get
или join
) или выполнить другие неблокирующие операции.
4.2. Создание материализованного представления
Теперь, когда у нас есть базовый поток событий, мы можем получить новую таблицу предупреждений
из потока показаний .
Этот постоянный запрос (или материализованное представление) выполняется на сервере неопределенно долго и обрабатывает события из исходного потока или таблицы .
В нашем случае он должен вызывать предупреждение, когда среднее значение для каждого датчика превышает значение 25 за 30-минутный период:
CREATE TABLE alerts AS
SELECT
sensor_id,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC')
AS start_period,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC')
AS end_period,
AVG(reading) AS average_reading
FROM readings
WINDOW TUMBLING (SIZE 30 MINUTES)
GROUP BY id
HAVING AVG(reading) > 25
EMIT CHANGES;
В этом запросе мы собираем новые входящие события в 30- минутном окне для каждого датчика. Мы также использовали функцию TIMESTAMPTOSTRING
, чтобы преобразовать метку времени UNIX во что-то более удобочитаемое.
Важно отметить, что материализованное представление обновляется данными только тогда, когда новое событие успешно интегрируется с функцией агрегирования .
Как и ранее, давайте воспользуемся клиентом для асинхронного выполнения этого оператора и создадим наше материализованное представление:
CompletableFuture<ExecuteStatementResult> result =
client.executeStatement(CREATE_ALERTS_TABLE, properties)
После создания такие представления обновляются поэтапно. Это ключ к эффективным и высокопроизводительным запросам для обновлений в реальном времени.
4.3. Вставка демонстрационных данных
Прежде чем мы сможем выполнять запросы, давайте создадим несколько примеров событий, представляющих различные показания с 10-минутными интервалами.
Давайте предоставим сопоставления ключ/значение для столбцов потока, используя KsqlObject
:
List<KsqlObject> rows = Arrays.asList(
new KsqlObject().put("sensor_id", "sensor-1")
.put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
new KsqlObject().put("sensor_id", "sensor-1")
.put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
new KsqlObject().put("sensor_id", "sensor-2")
.put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
// additional rows
);
CompletableFuture<Void> result = CompletableFuture.allOf(
rows.stream()
.map(row -> client.insertInto(READINGS_TABLE, row))
.toArray(CompletableFuture[]::new)
);
Здесь мы объединяем все отдельные операции вставки в одно будущее
для удобства. Это завершается после успешного завершения всех базовых экземпляров CompletableFuture .
5. Запрос данных
Запросы позволяют вызывающим объектам передавать данные материализованного представления в приложение. Их можно разделить на два типа.
5.1. Push-запрос
Этот тип запроса отправляет клиенту непрерывный поток обновлений. Эти запросы особенно подходят для асинхронных потоков приложений , поскольку они позволяют клиентам реагировать на новую информацию в режиме реального времени.
Однако, в отличие от постоянных запросов, сервер не сохраняет результаты таких запросов в топике Kafka. Поэтому мы должны сделать эти запросы как можно более простыми, перенося всю тяжелую работу на постоянные запросы .
Давайте создадим простой push-запрос, чтобы подписаться на результаты нашего материализованного представления предупреждений
, созданного ранее:
SELECT * FROM alerts EMIT CHANGES;
Здесь важно отметить предложение EMIT
, которое передает все изменения клиенту. Поскольку запрос не содержит ограничений, он будет продолжать передавать все результаты до тех пор, пока не будет завершен.
Далее подписываемся на результаты запроса, чтобы получать потоковые данные:
public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
return client.streamQuery(ALERTS_QUERY, PROPERTIES)
.thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Alerts push query failed", ex);
}
});
}
Здесь мы вызвали метод streamQuery
, который возвращает StreamedQueryResult
для получения потоковых данных. Это расширяет интерфейс Publisher от
Reactive Streams . Поэтому мы можем асинхронно потреблять результаты с помощью реактивного подписчика
. На самом деле подписчик представляет собой простую реализацию Reactive Streams , которая получает строки ksqlDB в виде JSON и преобразует их в Alert
POJO.
Теперь мы можем проверить это, используя наш файл Compose и контейнер DockerComposeContainer
из Testcontainers :
@Testcontainers
class KsqlDBApplicationLiveTest {
@Container
public static DockerComposeContainer dockerComposeContainer =
new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
.withServices("zookeeper", "broker", "ksqldb-server")
.withExposedService("ksqldb-server", 8088,
Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
.withLocalCompose(true);
// setup and teardown
@Test
void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
createAlertsMaterializedView();
// Reactive Streams Subscriber impl for receiving streaming data
RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);
ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
insertSampleData();
await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
assertThat(alertSubscriber.consumedItems)
.containsOnly(
expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
)
);
}
}
Здесь мы развернули полную среду ksqlDB для интеграционных тестов. Тест вставляет образцы строк в поток, а ksqlDB выполняет оконную агрегацию. Наконец, мы утверждаем, что наш подписчик получает последние оповещения, как и ожидалось.
5.2. Вытяните запрос
В отличие от push-запросов, pull-запросы извлекают данные, которые не обновляются динамически, как в традиционной СУБД. Такие запросы немедленно возвращаются с конечным набором результатов. Таким образом, запросы на вытягивание хорошо подходят для синхронных потоков приложений типа запрос/ответ .
В качестве простого примера давайте создадим запрос для получения всех предупреждений, инициированных для определенного идентификатора датчика:
String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";
List<Row> rows = client.executeQuery(pullQuery, PROPERTIES).get()
В отличие от push-запроса, этот запрос возвращает все доступные данные из материализованного представления во время выполнения. Это полезно для запроса текущего состояния материализованного представления.
5.3. Разные операции
Документы API для клиента предоставляют дополнительную информацию о других операциях, таких как описание источников; листинг потоков, таблиц, тем; завершение запросов и многое другое.
6. Заключение
В этой статье мы рассмотрели основные концепции потоков, таблиц и запросов, которые поддерживают ksqlDB как эффективную базу данных потоковой передачи событий.
Попутно мы создали простое реактивное приложение, используя краткие и компонуемые конструкции SQL. Мы также увидели, как использовать Java-клиент для создания потоков и таблиц, выполнения запросов к материализованным представлениям и извлечения данных в реальном времени.
Как всегда, полный исходный код доступен на GitHub .