Перейти к основному содержимому

Введение в Apache Storm

· 8 мин. чтения

1. Обзор

Это руководство будет введением в Apache Storm , распределенную систему вычислений в реальном времени.

Мы сосредоточимся и рассмотрим:

  • Что такое Apache Storm и какие проблемы он решает
  • Его архитектура и
  • Как использовать в проекте

2. Что такое Apache Storm?

Apache Storm — бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.

Он обеспечивает отказоустойчивость, масштабируемость, гарантирует обработку данных и особенно хорош при обработке неограниченных потоков данных.

Некоторыми хорошими вариантами использования Storm могут быть обработка операций с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.

Storm позволяет интегрироваться с различными базами данных и системами очередей, доступными на рынке.

3. Зависимость от Maven

Прежде чем использовать Apache Storm, нам нужно включить в наш проект зависимость storm-core :

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>

Мы должны использовать предоставленную область только в том случае, если мы собираемся запускать наше приложение в кластере Storm.

Чтобы запустить приложение локально, мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный.

4. Модель данных

Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.

4.1. Кортеж

Кортеж — это упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.

Storm должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, строки и массивы байтов .

А поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config , чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:

Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:

Config config = new Config();
config.registerSerialization(User.class);

В таком случае Kryo сериализует класс с помощью FieldSerializer . По умолчанию это сериализует все непереходные поля класса, как частные, так и общедоступные.

Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

Чтобы создать пользовательский сериализатор, нам нужно расширить универсальный класс Serializer , который имеет два метода записи и чтения.

4.2. Ручей

Поток — это основная абстракция в экосистеме Storm. Поток — это неограниченная последовательность кортежей.

Storms позволяет обрабатывать несколько потоков параллельно.

Каждый поток имеет идентификатор, который предоставляется и назначается во время объявления.

5. Топология

Логика приложения Storm реального времени упакована в топологию. Топология состоит из носиков и болтов .

5.1. Носик

Носики являются источниками ручьев. Они испускают кортежи в топологию.

Кортежи можно читать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.

Носики могут быть надежными или ненадежными . Надежный означает, что источник может ответить, что кортеж не может быть обработан Storm. Ненадежный означает, что носик не отвечает, так как он собирается использовать механизм «выстрелил-забыл» для отправки кортежей.

Чтобы создать собственный носик, нам нужно реализовать интерфейс IRichSpout или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout .

Создадим ненадежный носик:

public class RandomIntSpout extends BaseRichSpout {

private Random random;
private SpoutOutputCollector outputCollector;

@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}

@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}

Наш пользовательский RandomIntSpout будет генерировать случайное целое число и метку времени каждую секунду.

5.2. Болт

Bolts обрабатывают кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.

Некоторые операции требуют нескольких шагов, поэтому в таких случаях нам нужно будет использовать несколько болтов.

Чтобы создать собственный Bolt , нам нужно реализовать интерфейс IRichBolt или для более простых операций IBasicBolt .

Существует также несколько вспомогательных классов для реализации Bolt. В этом случае мы будем использовать BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}
}

Этот пользовательский PrintingBolt будет просто печатать все кортежи на консоли.

6. Создание простой топологии

Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.

6.1. RandomNumberSpout

В начале создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:

public class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;

@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}

@Override
public void nextTuple() {
Utils.sleep(1000);
int operation = random.nextInt(101);
long timestamp = System.currentTimeMillis();

Values values = new Values(operation, timestamp);
collector.emit(values);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}

6.2. ФильтрингБолт

Далее мы создадим болт, который будет отфильтровывать все элементы с операцией , равной 0:

public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if (operation > 0) {
basicOutputCollector.emit(tuple.getValues());
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}

6.3. Агрегирующий Болт

Далее создадим более сложный Bolt , который будет агрегировать все положительные операции за каждый день.

Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают с окнами, а не с отдельными кортежами: BaseWindowedBolt .

Окна — важная концепция потоковой обработки, разбивающая бесконечные потоки на конечные фрагменты. Затем мы можем применить вычисления к каждому фрагменту. Обычно окна бывают двух видов:

Временные окна используются для группировки элементов из заданного периода времени с использованием временных меток . Временные окна могут иметь разное количество элементов.

Счетные окна используются для создания окон определенного размера . В таком случае все окна будут иметь одинаковый размер, и окно не будет выведено, если элементов меньше определенного размера.

Наш AggregatingBolt будет генерировать сумму всех положительных операций из временного окна вместе с его временными метками начала и конца:

public class AggregatingBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}

@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(this::getTimestamp));

int sumOfOperations = tuples.stream()
.mapToInt(tuple -> tuple.getIntegerByField("operation"))
.sum();
Long beginningTimestamp = getTimestamp(tuples.get(0));
Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}

private Long getTimestamp(Tuple tuple) {
return tuple.getLongByField("timestamp");
}
}

Обратите внимание, что в этом случае безопасно получить первый элемент списка напрямую. Это связано с тем, что каждое окно вычисляется с использованием поля временной метки кортежа, поэтому в каждом окне должен быть хотя бы один элемент.

6.4. ФайлПисьмоБолт

Наконец, мы создадим болт, который возьмет все элементы с суммой операций больше 2000, сериализует их и запишет в файл:

public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
private BufferedWriter writer;
private String filePath;
private ObjectMapper objectMapper;

@Override
public void cleanup() {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close writer!");
}
}

@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);

try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}

@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");

if (sumOfOperations > 2000) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(
sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.newLine();
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}

// public constructor and other methods
}

Обратите внимание, что нам не нужно объявлять выход, так как это будет последний болт в нашей топологии.

6.5. Запуск топологии

Наконец, мы можем собрать все вместе и запустить нашу топологию:

public static void runTopology() {
TopologyBuilder builder = new TopologyBuilder();

Spout random = new RandomNumberSpout();
builder.setSpout("randomNumberSpout");

Bolt filtering = new FilteringBolt();
builder.setBolt("filteringBolt", filtering)
.shuffleGrouping("randomNumberSpout");

Bolt aggregating = new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5));
builder.setBolt("aggregatingBolt", aggregating)
.shuffleGrouping("filteringBolt"); 

String filePath = "./src/main/resources/data.txt";
Bolt file = new FileWritingBolt(filePath);
builder.setBolt("fileBolt", file)
.shuffleGrouping("aggregatingBolt");

Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}

Чтобы данные проходили через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup позволяет нам указать, что данные для filteringBolt будут поступать из randomNumberSpout .

Для каждого болта нам нужно добавить shuffleGroup , который определяет источник элементов для этого болта. Источником элементов может быть Носик или другой Болт. И если мы установим один и тот же источник для более чем одного болта , источник будет излучать все элементы для каждого из них.

В этом случае наша топология будет использовать LocalCluster для локального запуска задания.

7. Заключение

В этом руководстве мы представили Apache Storm, распределенную систему вычислений в реальном времени. Мы создали носик, несколько болтов и стянули их вместе в полную топологию.

И, как всегда, все примеры кода можно найти на GitHub .