Перейти к основному содержимому

Искра DataFrame

· 10 мин. чтения

1. Обзор

Apache Spark — это распределенная система аналитики и обработки данных с открытым исходным кодом, которая позволяет масштабировать инженерию данных и науку о данных. Он упрощает разработку приложений, ориентированных на аналитику, предлагая унифицированный API для передачи данных, массовых преобразований и распространения.

DataFrame является важным и важным компонентом Spark API. В этом руководстве мы рассмотрим некоторые API-интерфейсы Spark DataFrame на простом примере с данными о клиентах.

2. DataFrame в Spark

Логически DataFrame представляет собой неизменяемый набор записей, организованных в именованные столбцы `` . Он имеет сходство с таблицей в RDBMS или ResultSet в Java.

В качестве API DataFrame обеспечивает унифицированный доступ к нескольким библиотекам Spark, включая Spark SQL, Spark Streaming, MLib и GraphX .

В Java мы используем Dataset<Row> для представления DataFrame .

По сути, Row использует эффективное хранилище под названием Tungsten , которое значительно оптимизирует операции Spark по сравнению с его предшественниками .

3. Зависимости Maven

Начнем с добавления зависимостей spark-core и spark-sql в наш pom.xml :

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>

4. DataFrame и схема

По сути, DataFrame — это RDD со схемой. Схема может быть либо выведена, либо определена как StructType .

StructType — это встроенный тип данных в Spark SQL, который мы используем для представления коллекции объектов StructField .

Давайте определим образец схемы Customer StructType :

public static StructType minimumCustomerDataSchema() {
return DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("gender", DataTypes.StringType, true),
DataTypes.createStructField("transaction_amount", DataTypes.IntegerType, true) }
);
}

Здесь у каждого StructField есть имя, которое представляет имя столбца DataFrame , тип и логическое значение, указывающее, может ли оно принимать значение NULL.

5. Создание фреймов данных

Первой операцией для каждого приложения Spark является получение SparkSession через master.

Он предоставляет нам точку входа для доступа к DataFrames . Начнем с создания SparkSession :

public static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("Customer Aggregation pipeline")
.master("local")
.getOrCreate();
}

Обратите внимание, что мы подключаемся к Spark с помощью локального мастера. Если бы нам нужно было подключиться к кластеру, мы бы вместо этого дали адрес кластера.

Когда у нас есть SparkSession , мы можем создать DataFrame , используя различные методы. Кратко рассмотрим некоторые из них.

5.1. DataFrame из списка<POJO>

Давайте сначала создадим List<Customer> : ``

List<Customer> customers = Arrays.asList(
aCustomerWith("01", "jo", "Female", 2000),
aCustomerWith("02", "jack", "Male", 1200)
);

Далее создадим DataFrame из List<Customer> с помощью createDataFrame :

Dataset<Row> df = SPARK_SESSION
.createDataFrame(customerList, Customer.class);

5.2. DataFrame из набора данных

Если у нас есть набор данных , мы можем легко преобразовать его в DataFrame , вызвав toDF для набора данных .

Давайте сначала создадим Dataset<Customer> , используя createDataset , который принимает org.apache.spark.sql.Encoders :

Dataset<Customer> customerPOJODataSet = SPARK_SESSION
.createDataset(CUSTOMERS, Encoders.bean(Customer.class));

Далее конвертируем в DataFrame :

Dataset<Row> df = customerPOJODataSet.toDF();

5.3. Строка из POJO с использованием RowFactory

Поскольку DataFrame по сути является Dataset<Row> , давайте посмотрим, как мы можем создать строку из POJO клиента .

По сути, реализуя MapFunction<Customer, Row> и переопределяя метод call , мы можем сопоставить каждого Customer с Row, используя RowFactory.create :

public class CustomerToRowMapper implements MapFunction<Customer, Row> {

@Override
public Row call(Customer customer) throws Exception {
Row row = RowFactory.create(
customer.getId(),
customer.getName().toUpperCase(),
StringUtils.substring(customer.getGender(),0, 1),
customer.getTransaction_amount()
);
return row;
}
}

Следует отметить, что здесь мы можем манипулировать данными Customer перед преобразованием их в Row .

5.4. Кадр данных из списка<строка>

Мы также можем создать DataFrame из списка объектов Row :

List<Row> rows = customer.stream()
.map(c -> new CustomerToRowMapper().call(c))
.collect(Collectors.toList());

Теперь давайте передадим этот List<Row> в SparkSession вместе со схемой StructType :

Dataset<Row> df = SparkDriver.getSparkSession()
.createDataFrame(rows, SchemaFactory.minimumCustomerDataSchema());

Обратите внимание, что List<Row> будет преобразован в DataFrame на основе определения схемы . Любое поле, отсутствующее в схеме, не будет частью DataFrame.

5.5. DataFrame из структурированных файлов и базы данных

DataFrames может хранить столбцовую информацию, например файл CSV, а также вложенные поля и массивы, например файл JSON.

API DataFrame остается неизменным независимо от того, работаем ли мы с файлами CSV, JSON или другими форматами, а также с базами данных.

Давайте создадим DataFrame из многострочных данных JSON:

Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/minCustomerData.json");

Аналогично, в случае чтения из базы данных, мы будем иметь:

Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.option("url", "jdbc:postgresql://localhost:5432/customerdb")
.option("dbtable", "customer")
.option("user", "user")
.option("password", "password")
.option("serverTimezone", "EST")
.format("jdbc")
.load();

6. Преобразование DataFrame в набор данных

Теперь давайте посмотрим, как мы можем преобразовать наш DataFrame в набор данных . Это преобразование полезно, если мы хотим манипулировать нашими существующими POJO и расширенным API, которые применяются только к DataFrame .

Мы продолжим работу с DataFrame, созданным из JSON в предыдущем разделе.

Давайте вызовем функцию сопоставления, которая берет каждую строку набора данных <Row> и преобразует ее в объект Customer :

Dataset<Customer> ds = df.map(
new CustomerMapper(),
Encoders.bean(Customer.class)
);

Здесь CustomerMapper реализует MapFunction<Row, Customer> :

public class CustomerMapper implements MapFunction<Row, Customer> {

@Override
public Customer call(Row row) {
Customer customer = new Customer();
customer.setId(row.getAs("id"));
customer.setName(row.getAs("name"));
customer.setGender(row.getAs("gender"));
customer.setTransaction_amount(Math.toIntExact(row.getAs("transaction_amount")));
return customer;
}
}

Следует отметить, что экземпляр MapFunction<Row, Customer> создается только один раз, независимо от количества обрабатываемых записей .

7. Операции и преобразования DataFrame

Теперь давайте создадим простой конвейер, используя пример данных о клиентах. Мы хотим принимать данные клиентов в виде фреймов данных из двух разных файловых источников, нормализовать их, а затем выполнять некоторые преобразования данных.

Наконец, мы запишем преобразованные данные в базу данных.

Цель этих преобразований — выяснить ежегодные расходы, упорядоченные по полу и источнику.

7.1. Получение данных

Во-первых, давайте получим данные из нескольких источников, используя метод чтения SparkSession , начиная с данных JSON: ``

Dataset<Row> jsonDataToDF = SPARK_SESSION.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/customerData.json");

Теперь давайте сделаем то же самое с нашим источником CSV:

Dataset<Row> csvDataToDF = SPARK_SESSION.read()
.format("csv")
.option("header", "true")
.schema(SchemaFactory.customerSchema())
.option("dateFormat", "m/d/YYYY")
.load("data/customerData.csv");

csvDataToDF.show();
csvDataToDF.printSchema();
return csvData;

Важно отметить, что для чтения этих данных CSV мы предоставляем схему StructType , которая определяет типы данных столбцов.

После того, как мы получили данные, мы можем проверить содержимое DataFrame, используя метод show .

Кроме того, мы также можем ограничить количество строк, указав размер в методе show . И мы можем использовать printSchema для проверки схем вновь созданных DataFrames.

Мы заметим, что две схемы имеют некоторые различия. Поэтому нам нужно нормализовать схему, прежде чем мы сможем выполнять какие-либо преобразования.

7.2. Нормализация фреймов данных

Далее мы нормализуем необработанные кадры данных, представляющие данные CSV и JSON.

Здесь давайте посмотрим на некоторые из выполненных преобразований:

private Dataset<Row> normalizeCustomerDataFromEbay(Dataset<Row> rawDataset) {
Dataset<Row> transformedDF = rawDataset
.withColumn("id", concat(rawDataset.col("zoneId"),lit("-"), rawDataset.col("customerId")))
.drop(column("customerId"))
.withColumn("source", lit("ebay"))
.withColumn("city", rawDataset.col("contact.customer_city"))
.drop(column("contact"))
.drop(column("zoneId"))
.withColumn("year", functions.year(col("transaction_date")))
.drop("transaction_date")
.withColumn("firstName", functions.split(column("name"), " ")
.getItem(0))
.withColumn("lastName", functions.split(column("name"), " ")
.getItem(1))
.drop(column("name"));

return transformedDF;
}

Некоторые важные операции с DataFrame в приведенном выше примере:

  • c oncat для объединения данных из нескольких столбцов и литералов для создания нового столбца идентификаторов
  • lit статическая функция возвращает столбец с литеральным значением
  • функции. year , чтобы извлечь год из transactionDate
  • function.split для разделения имени на столбцы имени и фамилии
  • метод drop удаляет столбец во фрейме данных
  • col возвращает столбец набора данных на основе его имени
  • withColumnRenamed возвращает столбец с переименованным значением

Важно отметить, что мы видим, что DataFrame неизменяем. Следовательно, всякий раз, когда что-то нужно изменить, мы должны создать новый DataFrame .

В конце концов, оба фрейма данных нормализуются к одной и той же схеме, как показано ниже:

root
|-- gender: string (nullable = true)
|-- transaction_amount: long (nullable = true)
|-- id: string (nullable = true)
|-- source: string (nullable = false)
|-- city: string (nullable = true)
|-- year: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)

7.3. Объединение фреймов данных

Давайте объединим нормализованные кадры данных следующим образом:

Dataset<Row> combineDataframes(Dataset<Row> df1, Dataset<Row> df2) {
return df1.unionByName(df2);
}

Важно отметить, что:

  • Если мы заботимся об именах столбцов при объединении двух DataFrames , мы должны использовать unionByName
  • Если нам не нужны имена столбцов при объединении двух DataFrames , мы должны использовать union

7.4. Агрегирование фреймов данных

Затем давайте сгруппируем объединенные кадры данных , чтобы узнать годовые расходы по годам, источникам и полу.

Затем мы отсортируем агрегированные данные по столбцам года по возрастанию и за год в порядке убывания:

Dataset<Row> aggDF = dataset
.groupBy(column("year"), column("source"), column("gender"))
.sum("transactionAmount")
.withColumnRenamed("sum(transaction_amount)", "yearly spent")
.orderBy(col("year").asc(), col("yearly spent").desc());

Некоторые важные операции с DataFrame в приведенном выше примере:

  • groupBy используется для организации идентичных данных в группы в DataFrame, а затем для выполнения агрегатных функций, аналогичных предложению SQL «GROUP BY» .
  • sum применяет функцию агрегации к столбцу transactionAmount после группировки
  • orderBy сортирует DataFrame по одному или нескольким столбцам
  • `Функции asc и desc из класса Column` можно использовать для указания порядка сортировки.

Наконец, давайте воспользуемся методом show , чтобы увидеть, как выглядит фрейм данных после преобразования:

+----+------+------+---------------+
|year|source|gender|annual_spending|
+----+------+------+---------------+
|2018|amazon| Male| 10600|
|2018|amazon|Female| 6200|
|2018| ebay| Male| 5500|
|2021| ebay|Female| 16000|
|2021| ebay| Male| 13500|
|2021|amazon| Male| 4000|
|2021|amazon|Female| 2000|
+----+------+------+---------------+

Следовательно, схема после окончательного преобразования должна быть:

root
|-- source: string (nullable = false)
|-- gender: string (nullable = true)
|-- year: integer (nullable = true)
|-- yearly spent: long (nullable = true)

7.5. Запись из DataFrame в реляционную базу данных

Наконец, давайте закончим записью преобразованного DataFrame в виде таблицы в реляционной базе данных:

Properties dbProps = new Properties();

dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");

Далее мы можем использовать сеанс Spark для записи в базу данных:

String connectionURL = dbProperties.getProperty("connectionURL");

dataset.write()
.mode(SaveMode.Overwrite)
.jdbc(connectionURL, "customer", dbProperties);

8. Тестирование

Теперь мы можем протестировать конвейер от начала до конца, используя два источника приема, с образами Docker postgres и pgAdmin :

@Test
void givenCSVAndJSON_whenRun_thenStoresAggregatedDataFrameInDB() throws Exception {
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");

pipeline = new CustomerDataAggregationPipeline(dbProps);
pipeline.run();

String allCustomersSql = "Select count(*) from customer";

Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(allCustomersSql);
resultSet.next();
int count = resultSet.getInt(1);
assertEquals(7, count);
}

После запуска мы можем убедиться, что существует таблица со столбцами и строками, соответствующими DataFrame . Наконец, мы также можем наблюдать этот вывод через клиент pgAdmin4 :

./84507618517daca87a568316515ef9f5.png

Здесь следует отметить пару важных моментов:

  • Таблица клиентов создается автоматически в результате операции записи .
  • Используется режим SaveMode.Overwrite. Следовательно, это перезапишет все, что уже существует в таблице. Другими доступными параметрами являются Append , Ignore и ErrorIfExists .

Кроме того, мы также можем использовать запись для экспорта данных DataFrame в форматах CSV , JSON или parquet среди других форматов.

9. Заключение

В этом руководстве мы рассмотрели, как использовать DataFrames для обработки и агрегирования данных в Apache Spark.

Во-первых, мы создали DataFrames из различных источников ввода. Затем мы использовали некоторые методы API для нормализации, объединения и агрегирования данных.

Наконец, мы экспортировали DataFrame как таблицу в реляционную базу данных.

Как всегда, полный исходный код доступен на GitHub .