1. Обзор
Это руководство будет введением в Apache Storm , распределенную систему вычислений в реальном времени.
Мы сосредоточимся и рассмотрим:
- Что такое Apache Storm и какие проблемы он решает
- Его архитектура и
- Как использовать в проекте
2. Что такое Apache Storm?
Apache Storm — бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.
Он обеспечивает отказоустойчивость, масштабируемость, гарантирует обработку данных и особенно хорош при обработке неограниченных потоков данных.
Некоторыми хорошими вариантами использования Storm могут быть обработка операций с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.
Storm позволяет интегрироваться с различными базами данных и системами очередей, доступными на рынке.
3. Зависимость от Maven
Прежде чем использовать Apache Storm, нам нужно включить в наш проект зависимость storm-core :
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
Мы должны использовать предоставленную область
только в том случае, если мы собираемся запускать наше приложение в кластере Storm.
Чтобы запустить приложение локально, мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный.
4. Модель данных
Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.
4.1. Кортеж
Кортеж — это
упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.
Storm должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, строки
и массивы байтов .
А поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config
, чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:
Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:
Config config = new Config();
config.registerSerialization(User.class);
В таком случае Kryo сериализует класс с помощью FieldSerializer .
По умолчанию это сериализует все непереходные поля класса, как частные, так и общедоступные.
Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:
Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);
Чтобы создать пользовательский сериализатор, нам нужно расширить универсальный класс Serializer
, который имеет два метода записи
и чтения.
4.2. Ручей
Поток — это
основная абстракция в экосистеме Storm. Поток — это
неограниченная последовательность кортежей.
Storms позволяет обрабатывать несколько потоков параллельно.
Каждый поток имеет идентификатор, который предоставляется и назначается во время объявления.
5. Топология
Логика приложения Storm реального времени упакована в топологию. Топология состоит из носиков и болтов .
5.1. Носик
Носики являются источниками ручьев. Они испускают кортежи в топологию.
Кортежи можно читать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.
Носики могут быть надежными
или ненадежными
. Надежный
означает, что источник может ответить, что кортеж не может быть обработан Storm. Ненадежный
означает, что носик не отвечает, так как он собирается использовать механизм «выстрелил-забыл» для отправки кортежей.
Чтобы создать собственный носик, нам нужно реализовать интерфейс IRichSpout
или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout
.
Создадим ненадежный
носик:
public class RandomIntSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector outputCollector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}
Наш пользовательский RandomIntSpout
будет генерировать случайное целое число и метку времени каждую секунду.
5.2. Болт
Bolts обрабатывают кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.
Некоторые операции требуют нескольких шагов, поэтому в таких случаях нам нужно будет использовать несколько болтов.
Чтобы создать собственный Bolt
, нам нужно реализовать интерфейс IRichBolt
или для более простых операций IBasicBolt
.
Существует также несколько вспомогательных классов для реализации Bolt.
В этом случае мы будем использовать BaseBasicBolt
:
public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
Этот пользовательский PrintingBolt
будет просто печатать все кортежи на консоли.
6. Создание простой топологии
Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.
6.1. RandomNumberSpout
В начале создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:
public class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
int operation = random.nextInt(101);
long timestamp = System.currentTimeMillis();
Values values = new Values(operation, timestamp);
collector.emit(values);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
6.2. ФильтрингБолт
Далее мы создадим болт, который будет отфильтровывать все элементы с операцией
, равной 0:
public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if (operation > 0) {
basicOutputCollector.emit(tuple.getValues());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
6.3. Агрегирующий Болт
Далее создадим более сложный Bolt
, который будет агрегировать все положительные операции за каждый день.
Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают с окнами, а не с отдельными кортежами: BaseWindowedBolt
.
Окна
— важная концепция потоковой обработки, разбивающая бесконечные потоки на конечные фрагменты. Затем мы можем применить вычисления к каждому фрагменту. Обычно окна бывают двух видов:
Временные окна используются для группировки элементов из заданного периода времени с использованием временных меток . Временные окна могут иметь разное количество элементов.
Счетные окна используются для создания окон определенного размера . В таком случае все окна будут иметь одинаковый размер, и окно не будет выведено, если элементов меньше определенного размера.
Наш AggregatingBolt
будет генерировать сумму всех положительных операций из временного окна
вместе с его временными метками начала и конца:
public class AggregatingBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(this::getTimestamp));
int sumOfOperations = tuples.stream()
.mapToInt(tuple -> tuple.getIntegerByField("operation"))
.sum();
Long beginningTimestamp = getTimestamp(tuples.get(0));
Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));
Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}
private Long getTimestamp(Tuple tuple) {
return tuple.getLongByField("timestamp");
}
}
Обратите внимание, что в этом случае безопасно получить первый элемент списка напрямую. Это связано с тем, что каждое окно вычисляется с использованием поля временной метки
кортежа,
поэтому в каждом окне должен быть хотя бы один элемент.
6.4. ФайлПисьмоБолт
Наконец, мы создадим болт, который возьмет все элементы с суммой операций
больше 2000, сериализует их и запишет в файл:
public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
private BufferedWriter writer;
private String filePath;
private ObjectMapper objectMapper;
@Override
public void cleanup() {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close writer!");
}
}
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}
@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");
if (sumOfOperations > 2000) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(
sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.newLine();
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}
// public constructor and other methods
}
Обратите внимание, что нам не нужно объявлять выход, так как это будет последний болт в нашей топологии.
6.5. Запуск топологии
Наконец, мы можем собрать все вместе и запустить нашу топологию:
public static void runTopology() {
TopologyBuilder builder = new TopologyBuilder();
Spout random = new RandomNumberSpout();
builder.setSpout("randomNumberSpout");
Bolt filtering = new FilteringBolt();
builder.setBolt("filteringBolt", filtering)
.shuffleGrouping("randomNumberSpout");
Bolt aggregating = new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5));
builder.setBolt("aggregatingBolt", aggregating)
.shuffleGrouping("filteringBolt");
String filePath = "./src/main/resources/data.txt";
Bolt file = new FileWritingBolt(filePath);
builder.setBolt("fileBolt", file)
.shuffleGrouping("aggregatingBolt");
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}
Чтобы данные проходили через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup
позволяет нам указать, что данные для filteringBolt
будут поступать из randomNumberSpout
.
Для каждого болта
нам нужно добавить shuffleGroup
, который определяет источник элементов для этого болта. Источником элементов может быть Носик
или другой Болт.
И если мы установим один и тот же источник для более чем одного болта ,
источник будет излучать все элементы для каждого из них.
В этом случае наша топология будет использовать LocalCluster
для локального запуска задания.
7. Заключение
В этом руководстве мы представили Apache Storm, распределенную систему вычислений в реальном времени. Мы создали носик, несколько болтов и стянули их вместе в полную топологию.
И, как всегда, все примеры кода можно найти на GitHub .