1. Обзор
В этой статье мы рассмотрим платформу Mantis, разработанную Netflix.
Мы рассмотрим основные концепции Mantis, создав, запустив и исследуя задание потоковой обработки.
2. Что такое богомол?
** Mantis — это платформа для создания приложений (задания) для потоковой обработки . Он обеспечивает простой способ управления развертыванием и жизненным циклом заданий. Кроме того, это облегчает распределение ресурсов, обнаружение и связь между этими заданиями.**
Таким образом, разработчики могут сосредоточиться на реальной бизнес-логике, имея при этом поддержку надежной и масштабируемой платформы для запуска своих неблокирующих приложений с большими объемами и низкой задержкой.
Работа Mantis состоит из трех отдельных частей:
- источник , отвечающий за получение данных из внешнего
источника
- один или несколько
этапов
, отвечающих за обработку входящих потоков событий - и
приемник
, который собирает обработанные данные
Давайте теперь исследуем каждый из них.
3. Установка и зависимости
Начнем с добавления зависимостей mantis-runtime
и jackson-databind
:
<dependency>
<groupId>io.mantisrx</groupId>
<artifactId>mantis-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Теперь, для настройки источника данных нашего задания, давайте реализуем интерфейс Mantis Source :
public class RandomLogSource implements Source<String> {
@Override
public Observable<Observable<String>> call(Context context, Index index) {
return Observable.just(
Observable
.interval(250, TimeUnit.MILLISECONDS)
.map(this::createRandomLogEvent));
}
private String createRandomLogEvent(Long tick) {
// generate a random log entry string
...
}
}
Как мы видим, он просто генерирует случайные записи журнала несколько раз в секунду.
4. Наша первая работа
Давайте теперь создадим задание Mantis, которое просто собирает события журнала из нашего RandomLogSource
. Позже мы добавим групповые и агрегатные преобразования для более сложного и интересного результата.
Для начала создадим сущность LogEvent :
public class LogEvent implements JsonType {
private Long index;
private String level;
private String message;
// ...
}
Затем добавим наш TransformLogStage.
Это простой этап, который реализует интерфейс ScalarComputation и разбивает запись журнала для создания LogEvent
. Кроме того, он отфильтровывает все неправильно отформатированные строки:
public class TransformLogStage implements ScalarComputation<String, LogEvent> {
@Override
public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
return logEntry
.map(log -> log.split("#"))
.filter(parts -> parts.length == 3)
.map(LogEvent::new);
}
}
4.1. Выполнение задания
На данный момент у нас достаточно строительных блоков для сборки нашей работы Mantis:
public class LogCollectingJob extends MantisJobProvider<LogEvent> {
@Override
public Job<LogEvent> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
Давайте поближе познакомимся с нашей работой.
Как мы видим, он расширяет MantisJobProvider.
Сначала он извлекает данные из нашего RandomLogSource
и применяет TransformLogStage
к полученным данным. Наконец, он отправляет обработанные данные во встроенный приемник, который охотно подписывается и доставляет данные через SSE .
Теперь давайте настроим наше задание для локального выполнения при запуске:
@SpringBootApplication
public class MantisApplication implements CommandLineRunner {
// ...
@Override
public void run(String... args) {
LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
}
}
Запустим приложение. Мы увидим сообщение журнала, например:
...
Serving modern HTTP SSE server sink on port: 86XX
Давайте теперь подключимся к приемнику с помощью curl
:
$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...
4.2. Настройка приемника
До сих пор мы использовали встроенный приемник для сбора обработанных данных. Давайте посмотрим, сможем ли мы добавить больше гибкости нашему сценарию, предоставив собственный приемник.
Что, если, например, мы хотим отфильтровать журналы по сообщению
?
Давайте создадим LogSink
, реализующий интерфейс Sink<LogEvent>
:
public class LogSink implements Sink<LogEvent> {
@Override
public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
.withEncoder(LogEvent::toJsonString)
.withPredicate(filterByLogMessage())
.build();
logEventObservable.subscribe();
sink.call(context, portRequest, logEventObservable);
}
private Predicate<LogEvent> filterByLogMessage() {
return new Predicate<>("filter by message",
parameters -> {
if (parameters != null && parameters.containsKey("filter")) {
return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
}
return logEvent -> true;
});
}
}
В этой реализации приемника мы настроили предикат, который использует параметр фильтра
для извлечения только тех журналов, которые содержат текст, заданный в параметре фильтра
:
$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...
Примечание. Mantis также предлагает мощный язык запросов MQL , который можно использовать для запроса, преобразования и анализа потоковых данных в стиле SQL.
5. Цепочка этапов
Давайте теперь предположим, что нам интересно узнать, сколько записей журнала ERROR
, WARN
или INFO
у нас есть в данный интервал времени. Для этого мы добавим в нашу работу еще два этапа и свяжем их вместе.
5.1. Группировка
Во-первых, давайте создадим GroupLogStage.
Этот этап представляет собой реализацию ToGroupComputation
, которая получает данные потока LogEvent из существующего
TransformLogStage
. После этого он группирует записи по уровню логирования и отправляет их на следующий этап:
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {
@Override
public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
}
public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
.description("Group event data by level")
.codec(JacksonCodecs.pojo(LogEvent.class))
.concurrentInput();
}
}
Мы также создали пользовательскую конфигурацию этапа, предоставив описание, кодек, который будет использоваться для сериализации вывода, и разрешили параллельное выполнение метода вызова этого этапа с помощью concurrentInput().
Следует отметить, что этот этап является горизонтально масштабируемым. Это означает, что мы можем запускать столько экземпляров этого этапа, сколько необходимо. Также стоит упомянуть, что при развертывании в кластере Mantis этот этап отправляет данные на следующий этап, чтобы все события, принадлежащие определенной группе, попадали на один и тот же рабочий процесс следующего этапа.
5.2. Агрегирование
Прежде чем мы продолжим и создадим следующий этап, давайте сначала добавим объект LogAggregate
:
public class LogAggregate implements JsonType {
private final Integer count;
private final String level;
}
Теперь давайте создадим последний этап в цепочке.
На этом этапе реализуется GroupToScalarComputation
и преобразуется поток групп журналов в скалярный LogAggregate
. Он делает это, подсчитывая, сколько раз каждый тип журнала появляется в потоке. Кроме того, у него также есть параметр LogAggregationDuration
, который можно использовать для управления размером окна агрегации:
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {
private int duration;
@Override
public void init(Context context) {
duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
}
@Override
public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
return mantisGroup
.window(duration, TimeUnit.MILLISECONDS)
.flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
.flatMap(group -> group.reduce(0, (count, value) -> count = count + 1)
.map((count) -> new LogAggregate(count, group.getKey()))
));
}
public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
.description("sum events for a log level")
.codec(JacksonCodecs.pojo(LogAggregate.class))
.withParameters(getParameters());
}
public static List<ParameterDefinition<?>> getParameters() {
List<ParameterDefinition<?>> params = new ArrayList<>();
params.add(new IntParameter()
.name("LogAggregationDuration")
.description("window size for aggregation in milliseconds")
.validator(Validators.range(100, 10000))
.defaultValue(5000)
.build());
return params;
}
}
5.3. Настройте и запустите задание
Осталось только настроить нашу работу:
public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
@Override
public Job<LogAggregate> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
Как только мы запустим приложение и выполним наше новое задание, мы увидим, что количество журналов извлекается каждые несколько секунд:
$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}
data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...
6. Заключение
Подводя итог, в этой статье мы увидели, что такое Netflix Mantis и для чего его можно использовать. Кроме того, мы рассмотрели основные концепции, использовали их для создания заданий и изучили настраиваемые конфигурации для различных сценариев.
Как всегда, полный код доступен на GitHub .