1. Обзор
Apache Spark — это распределенная система аналитики и обработки данных с открытым исходным кодом, которая позволяет масштабировать инженерию данных и науку о данных. Он упрощает разработку приложений, ориентированных на аналитику, предлагая унифицированный API для передачи данных, массовых преобразований и распространения.
DataFrame является
важным и важным компонентом Spark API. В этом руководстве мы рассмотрим некоторые API-интерфейсы Spark DataFrame
на простом примере с данными о клиентах.
2. DataFrame
в Spark
Логически DataFrame
представляет собой неизменяемый набор записей, организованных в именованные столбцы `` . Он имеет сходство с таблицей в RDBMS или ResultSet
в Java.
В качестве API DataFrame
обеспечивает унифицированный доступ к нескольким библиотекам Spark, включая Spark SQL, Spark Streaming, MLib и GraphX .
В Java мы используем Dataset<Row>
для представления DataFrame
.
По сути, Row
использует эффективное хранилище под названием Tungsten
, которое значительно оптимизирует операции Spark по сравнению с его предшественниками .
3. Зависимости Maven
Начнем с добавления зависимостей spark-core
и spark-sql
в наш pom.xml
:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>
4. DataFrame
и схема
По сути, DataFrame
— это RDD со схемой. Схема может быть либо выведена, либо определена как StructType
.
StructType
— это встроенный тип данных в Spark SQL, который мы используем для представления коллекции объектов StructField
.
Давайте определим образец схемы Customer
StructType
:
public static StructType minimumCustomerDataSchema() {
return DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("gender", DataTypes.StringType, true),
DataTypes.createStructField("transaction_amount", DataTypes.IntegerType, true) }
);
}
Здесь у каждого StructField
есть имя, которое представляет имя столбца DataFrame
, тип и логическое
значение, указывающее, может ли оно принимать значение NULL.
5. Создание фреймов данных
Первой операцией для каждого приложения Spark является получение SparkSession
через master.
Он предоставляет нам точку входа для доступа к DataFrames
. Начнем с создания SparkSession
:
public static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("Customer Aggregation pipeline")
.master("local")
.getOrCreate();
}
Обратите внимание, что мы подключаемся к Spark с помощью локального мастера. Если бы нам нужно было подключиться к кластеру, мы бы вместо этого дали адрес кластера.
Когда у нас есть SparkSession
, мы можем создать DataFrame
, используя различные методы. Кратко рассмотрим некоторые из них.
5.1. DataFrame
из списка<POJO>
Давайте сначала создадим List<Customer>
: ``
List<Customer> customers = Arrays.asList(
aCustomerWith("01", "jo", "Female", 2000),
aCustomerWith("02", "jack", "Male", 1200)
);
Далее создадим DataFrame
из List<Customer>
с помощью createDataFrame
:
Dataset<Row> df = SPARK_SESSION
.createDataFrame(customerList, Customer.class);
5.2. DataFrame
из набора данных
Если у нас есть набор данных
, мы можем легко преобразовать его в DataFrame
, вызвав toDF
для набора данных
.
Давайте сначала создадим Dataset<Customer>
, используя createDataset
, который принимает org.apache.spark.sql.Encoders
:
Dataset<Customer> customerPOJODataSet = SPARK_SESSION
.createDataset(CUSTOMERS, Encoders.bean(Customer.class));
Далее конвертируем в DataFrame
:
Dataset<Row> df = customerPOJODataSet.toDF();
5.3. Строка
из POJO с использованием RowFactory
Поскольку DataFrame
по сути является Dataset<Row>
, давайте посмотрим, как мы можем создать строку
из POJO клиента .
По сути, реализуя MapFunction<Customer, Row>
и переопределяя метод call
, мы можем сопоставить каждого Customer
с Row,
используя RowFactory.create
:
public class CustomerToRowMapper implements MapFunction<Customer, Row> {
@Override
public Row call(Customer customer) throws Exception {
Row row = RowFactory.create(
customer.getId(),
customer.getName().toUpperCase(),
StringUtils.substring(customer.getGender(),0, 1),
customer.getTransaction_amount()
);
return row;
}
}
Следует отметить, что здесь мы можем манипулировать данными Customer
перед преобразованием их в Row
.
5.4. Кадр данных
из списка<строка>
Мы также можем создать DataFrame
из списка объектов Row
:
List<Row> rows = customer.stream()
.map(c -> new CustomerToRowMapper().call(c))
.collect(Collectors.toList());
Теперь давайте передадим этот List<Row>
в SparkSession
вместе со схемой StructType
:
Dataset<Row> df = SparkDriver.getSparkSession()
.createDataFrame(rows, SchemaFactory.minimumCustomerDataSchema());
Обратите внимание, что List<Row>
будет преобразован в DataFrame
на основе определения схемы . Любое поле, отсутствующее в схеме, не будет частью DataFrame.
5.5. DataFrame
из структурированных файлов и базы данных
DataFrames
может хранить столбцовую информацию, например файл CSV, а также вложенные поля и массивы, например файл JSON.
API DataFrame
остается неизменным независимо от того, работаем ли мы с файлами CSV, JSON или другими форматами, а также с базами данных.
Давайте создадим DataFrame
из многострочных данных JSON:
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/minCustomerData.json");
Аналогично, в случае чтения из базы данных, мы будем иметь:
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.option("url", "jdbc:postgresql://localhost:5432/customerdb")
.option("dbtable", "customer")
.option("user", "user")
.option("password", "password")
.option("serverTimezone", "EST")
.format("jdbc")
.load();
6. Преобразование DataFrame
в набор данных
Теперь давайте посмотрим, как мы можем преобразовать наш DataFrame
в набор данных
. Это преобразование полезно, если мы хотим манипулировать нашими существующими POJO и расширенным API, которые применяются только к DataFrame
.
Мы продолжим работу с DataFrame,
созданным из JSON в предыдущем разделе.
Давайте вызовем функцию сопоставления, которая берет каждую строку набора данных <Row>
и преобразует ее в объект Customer
:
Dataset<Customer> ds = df.map(
new CustomerMapper(),
Encoders.bean(Customer.class)
);
Здесь CustomerMapper
реализует MapFunction<Row, Customer>
:
public class CustomerMapper implements MapFunction<Row, Customer> {
@Override
public Customer call(Row row) {
Customer customer = new Customer();
customer.setId(row.getAs("id"));
customer.setName(row.getAs("name"));
customer.setGender(row.getAs("gender"));
customer.setTransaction_amount(Math.toIntExact(row.getAs("transaction_amount")));
return customer;
}
}
Следует отметить, что экземпляр MapFunction<Row, Customer> создается
только один раз, независимо от количества обрабатываемых записей .
7. Операции и преобразования DataFrame
Теперь давайте создадим простой конвейер, используя пример данных о клиентах. Мы хотим принимать данные клиентов в виде фреймов данных
из двух разных файловых источников, нормализовать их, а затем выполнять некоторые преобразования данных.
Наконец, мы запишем преобразованные данные в базу данных.
Цель этих преобразований — выяснить ежегодные расходы, упорядоченные по полу и источнику.
7.1. Получение данных
Во-первых, давайте получим данные из нескольких источников, используя метод чтения
SparkSession
, начиная с данных JSON: ``
Dataset<Row> jsonDataToDF = SPARK_SESSION.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/customerData.json");
Теперь давайте сделаем то же самое с нашим источником CSV:
Dataset<Row> csvDataToDF = SPARK_SESSION.read()
.format("csv")
.option("header", "true")
.schema(SchemaFactory.customerSchema())
.option("dateFormat", "m/d/YYYY")
.load("data/customerData.csv");
csvDataToDF.show();
csvDataToDF.printSchema();
return csvData;
Важно отметить, что для чтения этих данных CSV мы предоставляем схему StructType
, которая определяет типы данных столбцов.
После того, как мы получили данные, мы можем проверить содержимое DataFrame,
используя метод show
.
Кроме того, мы также можем ограничить количество строк, указав размер в методе show
.
И мы можем использовать printSchema
для проверки схем вновь созданных DataFrames.
Мы заметим, что две схемы имеют некоторые различия. Поэтому нам нужно нормализовать схему, прежде чем мы сможем выполнять какие-либо преобразования.
7.2. Нормализация фреймов данных
Далее мы нормализуем необработанные кадры данных,
представляющие данные CSV и JSON.
Здесь давайте посмотрим на некоторые из выполненных преобразований:
private Dataset<Row> normalizeCustomerDataFromEbay(Dataset<Row> rawDataset) {
Dataset<Row> transformedDF = rawDataset
.withColumn("id", concat(rawDataset.col("zoneId"),lit("-"), rawDataset.col("customerId")))
.drop(column("customerId"))
.withColumn("source", lit("ebay"))
.withColumn("city", rawDataset.col("contact.customer_city"))
.drop(column("contact"))
.drop(column("zoneId"))
.withColumn("year", functions.year(col("transaction_date")))
.drop("transaction_date")
.withColumn("firstName", functions.split(column("name"), " ")
.getItem(0))
.withColumn("lastName", functions.split(column("name"), " ")
.getItem(1))
.drop(column("name"));
return transformedDF;
}
Некоторые важные операции с DataFrame
в приведенном выше примере:
- c
oncat
для объединения данных из нескольких столбцов и литералов для создания нового столбцаидентификаторов
lit
статическая функция возвращает столбец с литеральным значениемфункции. year
, чтобы извлечь год изtransactionDate
function.split
для разделенияимени
настолбцы
имени ифамилии
метод drop
удаляет столбец во фрейме данныхcol
возвращает столбец набора данных на основе его имениwithColumnRenamed
возвращает столбец с переименованным значением
Важно отметить, что мы видим, что DataFrame
неизменяем. Следовательно, всякий раз, когда что-то нужно изменить, мы должны создать новый DataFrame
.
В конце концов, оба фрейма данных нормализуются к одной и той же схеме, как показано ниже:
root
|-- gender: string (nullable = true)
|-- transaction_amount: long (nullable = true)
|-- id: string (nullable = true)
|-- source: string (nullable = false)
|-- city: string (nullable = true)
|-- year: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
7.3. Объединение фреймов данных
Давайте объединим нормализованные кадры данных
следующим образом:
Dataset<Row> combineDataframes(Dataset<Row> df1, Dataset<Row> df2) {
return df1.unionByName(df2);
}
Важно отметить, что:
- Если мы заботимся об именах столбцов при объединении двух
DataFrames
, мы должны использоватьunionByName
- Если нам не нужны имена столбцов при объединении двух
DataFrames
, мы должны использоватьunion
7.4. Агрегирование фреймов данных
Затем давайте сгруппируем объединенные кадры данных
, чтобы узнать годовые расходы по годам, источникам и полу.
Затем мы отсортируем агрегированные данные по столбцам года
по возрастанию и за год
в порядке убывания:
Dataset<Row> aggDF = dataset
.groupBy(column("year"), column("source"), column("gender"))
.sum("transactionAmount")
.withColumnRenamed("sum(transaction_amount)", "yearly spent")
.orderBy(col("year").asc(), col("yearly spent").desc());
Некоторые важные операции с DataFrame
в приведенном выше примере:
groupBy
используется для организации идентичных данных в группы вDataFrame,
а затем для выполнения агрегатных функций, аналогичных предложениюSQL «GROUP BY»
.sum
применяет функцию агрегации к столбцуtransactionAmount
после группировкиorderBy
сортируетDataFrame
по одному или нескольким столбцам- `
Функции
ascи
desc из класса
Наконец, давайте воспользуемся методом show
, чтобы увидеть, как выглядит фрейм данных после преобразования:
+----+------+------+---------------+
|year|source|gender|annual_spending|
+----+------+------+---------------+
|2018|amazon| Male| 10600|
|2018|amazon|Female| 6200|
|2018| ebay| Male| 5500|
|2021| ebay|Female| 16000|
|2021| ebay| Male| 13500|
|2021|amazon| Male| 4000|
|2021|amazon|Female| 2000|
+----+------+------+---------------+
Следовательно, схема после окончательного преобразования должна быть:
root
|-- source: string (nullable = false)
|-- gender: string (nullable = true)
|-- year: integer (nullable = true)
|-- yearly spent: long (nullable = true)
7.5. Запись из DataFrame
в реляционную базу данных
Наконец, давайте закончим записью преобразованного DataFrame
в виде таблицы в реляционной базе данных:
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
Далее мы можем использовать сеанс Spark для записи в базу данных:
String connectionURL = dbProperties.getProperty("connectionURL");
dataset.write()
.mode(SaveMode.Overwrite)
.jdbc(connectionURL, "customer", dbProperties);
8. Тестирование
Теперь мы можем протестировать конвейер от начала до конца, используя два источника приема, с образами Docker postgres
и pgAdmin
:
@Test
void givenCSVAndJSON_whenRun_thenStoresAggregatedDataFrameInDB() throws Exception {
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
pipeline = new CustomerDataAggregationPipeline(dbProps);
pipeline.run();
String allCustomersSql = "Select count(*) from customer";
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(allCustomersSql);
resultSet.next();
int count = resultSet.getInt(1);
assertEquals(7, count);
}
После запуска мы можем убедиться, что существует таблица со столбцами и строками, соответствующими DataFrame
. Наконец, мы также можем наблюдать этот вывод через клиент pgAdmin4
:
Здесь следует отметить пару важных моментов:
- Таблица
клиентов
создается автоматически в результате операциизаписи .
- Используется режим
SaveMode.Overwrite.
Следовательно, это перезапишет все, что уже существует в таблице. Другими доступными параметрами являютсяAppend
,Ignore
иErrorIfExists
.
Кроме того, мы также можем использовать запись
для экспорта данных DataFrame в форматах
CSV
, JSON
или parquet
среди других форматов.
9. Заключение
В этом руководстве мы рассмотрели, как использовать DataFrames
для обработки и агрегирования данных в Apache Spark.
Во-первых, мы создали DataFrames
из различных источников ввода. Затем мы использовали некоторые методы API для нормализации, объединения и агрегирования данных.
Наконец, мы экспортировали DataFrame
как таблицу в реляционную базу данных.
Как всегда, полный исходный код доступен на GitHub .