1. Введение
Spring Cloud Data Flow — это набор инструментов для построения конвейеров интеграции и обработки данных в реальном времени.
Конвейеры в данном случае — это приложения Spring Boot, созданные с использованием фреймворков Spring Cloud Stream или Spring Cloud Task .
В этом руководстве мы покажем, как использовать Spring Cloud Data Flow с Apache Spark .
2. Локальный сервер потока данных
Во- первых, нам нужно запустить Data Flow Server , чтобы иметь возможность развертывать наши задания.
Чтобы запустить Data Flow Server локально, нам нужно создать новый проект с зависимостью spring-cloud-starter-dataflow-server-
local :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
После этого нам нужно аннотировать основной класс на сервере с помощью @EnableDataFlowServer
:
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
Как только мы запустим это приложение, у нас будет локальный сервер потока данных на порту 9393.
3. Создание проекта
Мы создадим задание Spark как отдельное локальное приложение, поэтому для его запуска нам не понадобится какой-либо кластер.
3.1. Зависимости
Во- первых, мы добавим зависимость Spark :
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.4.0</version>
</dependency>
3.2. Создание задания
И для нашей работы давайте аппроксимируем пи:
public class PiApproximation {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ForEachPIApproximation");
JavaSparkContext context = new JavaSparkContext(conf);
int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;
List<Integer> xs = IntStream.rangeClosed(0, n)
.mapToObj(element -> Integer.valueOf(element))
.collect(Collectors.toList());
JavaRDD<Integer> dataSet = context.parallelize(xs, slices);
JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y ) < 1 ? 1: 0;
});
int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);
System.out.println("The pi was estimated as:" + count / n);
context.stop();
}
}
4. Оболочка потока данных
Data Flow Shell — это приложение, которое позволит нам взаимодействовать с сервером . Shell использует команды DSL для описания потоков данных.
Чтобы использовать Data Flow Shell , нам нужно создать проект, который позволит нам запустить его. Во- первых, нам нужна зависимость spring-cloud-dataflow-
shell :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
После добавления зависимости мы можем создать класс, который будет запускать нашу оболочку Data Flow:
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
5. Развертывание проекта
Для развертывания нашего проекта мы воспользуемся так называемым средством запуска задач, доступным для Apache Spark в трех версиях: cluster
, yarn
и client
. Мы собираемся продолжить работу с локальной версией клиента .
Средство запуска задач — это то, что запускает наше задание Spark.
Для этого нам сначала нужно зарегистрировать нашу задачу с помощью Data Flow Shell :
app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
Задача позволяет нам указать несколько различных параметров, некоторые из которых являются необязательными, но некоторые параметры необходимы для правильного развертывания задания Spark:
spark.app-class
, основной класс нашего отправленного заданияspark.app-jar
, путь к толстой банке, содержащей нашу работуspark.app -
name
, имя, которое будет использоваться для нашей работы .spark.app-args
— аргументы, которые будут переданы заданию.
Мы можем использовать зарегистрированную задачу spark-client
для отправки нашей работы, не забыв указать необходимые параметры:
task create spark1 --definition "spark-client \
--spark.app-name=my-test-pi --spark.app-class=com.foreach.spring.cloud.PiApproximation \
--spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
Обратите внимание, что spark.app-jar
— это путь к fat-jar с нашей работой.
После успешного создания задачи мы можем приступить к ее запуску с помощью следующей команды:
task launch spark1
Это вызовет выполнение нашей задачи.
6. Резюме
В этом руководстве мы показали, как использовать платформу Spring Cloud Data Flow для обработки данных с помощью Apache Spark. Более подробную информацию о фреймворке Spring Cloud Data Flow можно найти в документации .
Все примеры кода можно найти на GitHub.