1. Введение
Kinesis — это инструмент для сбора, обработки и анализа потоков данных в режиме реального времени, разработанный в Amazon. Одним из его основных преимуществ является то, что он помогает в разработке приложений, управляемых событиями.
В этом руководстве мы рассмотрим несколько библиотек, которые позволяют нашему приложению Spring создавать и использовать записи из Kinesis Stream . В примерах кода показаны основные функциональные возможности, но не представлен готовый к использованию код.
2. Обязательное условие
Прежде чем двигаться дальше, нам нужно сделать две вещи.
Первый — создать проект Spring , так как здесь цель — взаимодействовать с Kinesis из проекта Spring.
Второй — создать Kinesis Data Stream. Мы можем сделать это из веб-браузера в нашей учетной записи AWS. Одной из альтернатив для поклонников AWS CLI среди нас является использование командной строки . Поскольку мы будем взаимодействовать с ним из кода, у нас также должны быть учетные данные AWS IAM , ключ доступа и секретный ключ, а также регион.
Все наши производители будут создавать фиктивные записи IP-адресов, а потребители будут читать эти значения и перечислять их в консоли приложения.
3. SDK AWS для Java
Самая первая библиотека, которую мы будем использовать, — это AWS SDK для Java. Его преимущество в том, что он позволяет нам управлять многими частями работы с Kinesis Data Streams. Мы можем считывать данные, создавать данные, создавать потоки данных и повторно разделять потоки данных . Недостатком является то, что для того, чтобы иметь готовый к производству код, нам придется кодировать такие аспекты, как повторное разбиение, обработка ошибок или демон, чтобы поддерживать жизнь потребителя.
3.1. Зависимость от Maven
Зависимость Maven от amazon-kinesis-client предоставит все необходимое для работы примеров. Теперь мы добавим его в наш файл pom.xml
:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
3.2. Настройка весны
Давайте повторно используем объект AmazonKinesis
, необходимый для взаимодействия с нашим Kinesis Stream. Мы создадим его как @Bean
внутри нашего класса @SpringBootApplication
:
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
Далее давайте определим aws.access.key
и aws.secret.key
, необходимые для локальной машины, в application.properties
:
aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here
И прочитаем их с помощью аннотации @Value
:
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
Для простоты мы будем полагаться на методы @Scheduled
для создания и использования записей.
3.3. Потребитель
AWS SDK Kinesis Consumer использует модель извлечения , то есть наш код будет извлекать записи из осколков потока данных Kinesis:
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);
GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
recordsResult.getRecords().stream()
.map(record -> new String(record.getData().array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
recordsResult = kinesis.getRecords(recordsRequest);
}
Объект GetRecordsRequest
формирует запрос на потоковые данные . В нашем примере мы определили ограничение в 25 записей на запрос и продолжаем чтение до тех пор, пока не останется ничего.
Мы также можем заметить, что для нашей итерации мы использовали объект GetShardIteratorResult
. Мы создали этот объект внутри метода @PostConstruct
, чтобы сразу начать отслеживать записи:
private GetShardIteratorResult shardIterator;
@PostConstruct
private void buildShardIterator() {
GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
readShardsRequest.setStreamName(IPS_STREAM);
readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
readShardsRequest.setShardId(IPS_SHARD_ID);
this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}
3.4. Режиссер
Давайте теперь посмотрим, как обрабатывать создание записей для нашего потока данных Kinesis .
Мы вставляем данные с помощью объекта PutRecordsRequest
. Для этого нового объекта мы добавляем список, содержащий несколько объектов PutRecordsRequestEntry
:
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
entry.setPartitionKey(IPS_PARTITION_KEY);
return entry;
}).collect(Collectors.toList());
PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);
kinesis.putRecords(createRecordsRequest);
Мы создали базового потребителя и производителя смоделированных IP-записей. Все, что осталось сделать сейчас, это запустить наш проект Spring и увидеть IP-адреса, перечисленные в консоли нашего приложения.
4. ККЛ и КПЛ
Клиентская библиотека Kinesis (KCL) — это библиотека, которая упрощает использование записей . Это также уровень абстракции API-интерфейсов Java SDK AWS для Kinesis Data Streams. За кулисами библиотека выполняет балансировку нагрузки между многими экземплярами, реагируя на сбои экземпляров, устанавливая контрольные точки обработанных записей и реагируя на повторное разбиение.
Kinesis Producer Library (KPL) — это библиотека, полезная для записи в поток данных Kinesis . Он также обеспечивает уровень абстракции, который находится над API Java SDK AWS для Kinesis Data Streams. Для повышения производительности библиотека автоматически обрабатывает пакетную обработку, многопоточность и логику повторных попыток.
Главное преимущество KCL и KPL состоит в том, что они просты в использовании, поэтому мы можем сосредоточиться на создании и использовании записей.
4.1. Зависимости Maven
При необходимости две библиотеки можно добавить в наш проект отдельно. Чтобы включить KPL и KCL в наш проект Maven, нам нужно обновить наш файл pom.xml:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.11.2</version>
</dependency>
4.2. Настройка весны
Единственная подготовка к Spring, которая нам нужна, — это убедиться, что у нас есть учетные данные IAM. Значения для aws.access.key
и aws.secret.key
определены в нашем файле application.properties
, поэтому при необходимости мы можем прочитать их с помощью @Value
.
4.3. Потребитель
Во- первых, мы создадим класс, реализующий интерфейс IRecordProcessor
и определяющий нашу логику обработки записей потока данных Kinesis , то есть вывод их на консоль:
public class IpProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) { }
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.getRecords()
.forEach(record -> System.out.println(new String(record.getData().array())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) { }
}
Следующим шагом является определение фабричного класса, реализующего интерфейс IRecordProcessorFactory
и возвращающего ранее созданный объект IpProcessor
:
public class IpProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new IpProcessor();
}
}
И теперь, для последнего шага, мы будем использовать объект Worker
для определения нашего потребительского конвейера . Нам нужен объект KinesisClientLibConfiguration
, который при необходимости будет определять учетные данные IAM и регион AWS.
Мы передадим KinesisClientLibConfiguration
и наш объект IpProcessorFactory
нашему Worker
, а затем запустим его в отдельном потоке. Мы всегда поддерживаем эту логику использования записей с помощью класса Worker
, поэтому теперь мы постоянно читаем новые записи:
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
APP_NAME,
IPS_STREAM,
new AWSStaticCredentialsProvider(awsCredentials),
IPS_WORKER)
.withRegionName(Regions.EU_CENTRAL_1.getName());
final Worker worker = new Worker.Builder()
.recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build();
CompletableFuture.runAsync(worker.run());
4.4. Режиссер
Давайте теперь определим объект KinesisProducerConfiguration
, добавив учетные данные IAM и регион AWS:
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setVerifyCertificate(false)
.setRegion(Regions.EU_CENTRAL_1.getName());
this.kinesisProducer = new KinesisProducer(producerConfig);
Мы включим ранее созданный объект kinesisProducer в задание
@Scheduled
и будем непрерывно создавать записи для нашего потока данных Kinesis:
IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
.forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));
5. Spring Cloud Stream Binder Kinesis
Мы уже видели две библиотеки, обе созданы вне экосистемы Spring. Теперь мы увидим, как Spring Cloud Stream Binder Kinesis может еще больше упростить нашу жизнь при построении поверх Spring Cloud Stream .
5.1. Зависимость от Maven
Зависимость Maven, которую нам нужно определить в нашем приложении для Spring Cloud Stream Binder Kinesis :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
5.2. Настройка весны
При работе на EC2 необходимые свойства AWS обнаруживаются автоматически, поэтому их не нужно определять. Поскольку мы запускаем наши примеры на локальном компьютере, нам нужно определить наш ключ доступа к IAM, секретный ключ и регион для нашей учетной записи AWS. Мы также отключили автоматическое определение имени стека CloudFormation для приложения:
cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false
Spring Cloud Stream поставляется с тремя интерфейсами, которые мы можем использовать в нашей привязке потока:
- Sink
предназначен
для приема данных - Источник
используется
для публикации записей - Процессор
представляет
собой комбинацию обоих
Мы также можем определить наши собственные интерфейсы, если нам это нужно.
5.3. Потребитель
Определение потребителя состоит из двух частей. Во-первых, мы определим в application.properties
поток данных, из которого мы будем потреблять:
spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain
А затем давайте определим класс Spring @Component
. Аннотация @EnableBinding(Sink.class)
позволит нам читать из потока Kinesis с помощью метода, аннотированного с помощью @StreamListener(Sink.INPUT)
:
@EnableBinding(Sink.class)
public class IpConsumer {
@StreamListener(Sink.INPUT)
public void consume(String ip) {
System.out.println(ip);
}
}
5.4. Режиссер
Производитель также может быть разделен на две части. Во-первых, мы должны определить свойства нашего потока внутри application.properties
:
spring.cloud.stream.bindings.output.destination=live-ips
spring.cloud.stream.bindings.output.content-type=text/plain
Затем мы добавляем @EnableBinding(Source.class)
в Spring @Component
и каждые несколько секунд создаем новые тестовые сообщения :
@Component
@EnableBinding(Source.class)
public class IpProducer {
@Autowired
private Source source;
@Scheduled(fixedDelay = 3000L)
private void produce() {
IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
.forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
}
}
Это все, что нам нужно для работы Spring Cloud Stream Binder Kinesis. Мы можем просто запустить приложение прямо сейчас.
6. Заключение
В этой статье мы увидели, как интегрировать наш проект Spring с двумя библиотеками AWS для взаимодействия с Kinesis Data Stream. Мы также увидели, как использовать библиотеку Spring Cloud Stream Binder Kinesis, чтобы сделать реализацию еще проще.
Исходный код этой статьи можно найти на Github .