Перейти к основному содержимому

Apache Spark: различия между фреймами данных, наборами данных и RDD

· 6 мин. чтения

1. Обзор

Apache Spark — это быстрая распределенная система обработки данных. Он выполняет обработку данных в памяти и использует кэширование в памяти и оптимизированное выполнение, что приводит к высокой производительности. Он предоставляет API высокого уровня для популярных языков программирования, таких как Scala, Python, Java и R.

В этом кратком руководстве мы рассмотрим три основных понятия Spark: кадры данных, наборы данных и RDD.

2. Датафрейм

Spark SQL представил абстракцию табличных данных, называемую DataFrame, начиная с Spark 1.3. С тех пор это стало одной из самых важных функций в Spark. Этот API полезен, когда мы хотим обрабатывать структурированные и полуструктурированные распределенные данные.

В разделе 3 мы обсудим устойчивые распределенные наборы данных (RDD). DataFrames хранят данные более эффективно, чем RDD, потому что они используют неизменяемые, находящиеся в памяти, отказоустойчивые, распределенные и параллельные возможности RDD, но они также применяют схему к данным. DataFrames также переводят код SQL в оптимизированные низкоуровневые операции RDD.

Мы можем создавать DataFrames тремя способами:

  • Преобразование существующих RDD
  • Выполнение SQL-запросов
  • Загрузка внешних данных

Команда Spark представила SparkSession в версии 2.0, она объединяет все различные контексты, гарантируя, что разработчикам не придется беспокоиться о создании разных контекстов:

SparkSession session = SparkSession.builder()
.appName("TouristDataFrameExample")
.master("local[*]")
.getOrCreate();

DataFrameReader dataFrameReader = session.read();

Мы будем анализировать файл Tourist.csv :

Dataset<Row> data = dataFrameReader.option("header", "true")
.csv("data/Tourist.csv");

Поскольку Spark 2.0 DataFrame стал набором данных типа Row , мы можем использовать DataFrame в качестве псевдонима для Dataset<Row> .

Мы можем выбрать конкретные столбцы, которые нас интересуют. Мы также можем фильтровать и группировать по заданному столбцу:

data.select(col("country"), col("year"), col("value"))
.show();

data.filter(col("country").equalTo("Mexico"))
.show();

data.groupBy(col("country"))
.count()
.show();

3. Наборы данных

Набор данных — это набор строго типизированных структурированных данных . Они обеспечивают знакомый объектно-ориентированный стиль программирования, а также преимущества безопасности типов, поскольку наборы данных могут проверять синтаксис и выявлять ошибки во время компиляции.

Набор данных является расширением DataFrame, поэтому мы можем рассматривать DataFrame как нетипизированное представление набора данных.

Команда Spark выпустила API набора данных в Spark 1.6 и, как они упомянули: «Цель наборов данных Spark — предоставить API, который позволяет пользователям легко выражать преобразования в объектных доменах, а также обеспечивает преимущества производительности и надежности выполнения Spark SQL. двигатель".

Во-первых, нам нужно создать класс типа TouristData :

public class TouristData {
private String region;
private String country;
private String year;
private String series;
private Double value;
private String footnotes;
private String source;
// ... getters and setters
}

Чтобы сопоставить каждую из наших записей с указанным типом, нам нужно будет использовать кодировщик. Кодировщики переводят объекты Java во внутренний двоичный формат Spark :

// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"),
col("country"), col("year"), col("series"), col("value").cast("double"),
col("footnotes"), col("source"));

Dataset<TouristData> typedDataset = responseWithSelectedColumns
.as(Encoders.bean(TouristData.class));

Как и в случае с DataFrame, мы можем фильтровать и группировать по определенным столбцам:

typedDataset.filter((FilterFunction) record -> record.getCountry()
.equals("Norway"))
.show();

typedDataset.groupBy(typedDataset.col("country"))
.count()
.show();

Мы также можем выполнять такие операции, как фильтрация по столбцу, соответствующему определенному диапазону, или вычисление суммы определенного столбца, чтобы получить его общее значение:

typedDataset.filter((FilterFunction) record -> record.getYear() != null 
&& (Long.valueOf(record.getYear()) > 2010
&& Long.valueOf(record.getYear()) < 2017)).show();

typedDataset.filter((FilterFunction) record -> record.getValue() != null
&& record.getSeries()
.contains("expenditure"))
.groupBy("country")
.agg(sum("value"))
.show();

4. СДР

Resilient Distributed Dataset или RDD — это основная программная абстракция Spark. Он представляет собой набор неизменяемых, устойчивых и распределенных элементов .

RDD инкапсулирует большой набор данных, Spark автоматически распределяет данные, содержащиеся в RDD, по нашему кластеру и распараллеливает операции, которые мы над ними выполняем .

Мы можем создавать RDD только посредством операций с данными в стабильном хранилище или операций с другими RDD.

Отказоустойчивость необходима, когда мы имеем дело с большими наборами данных, а данные распределяются по кластерным компьютерам. RDD устойчивы благодаря встроенной в Spark механике восстановления после сбоев. Spark полагается на тот факт, что RDD запоминают, как они были созданы, поэтому мы можем легко проследить происхождение для восстановления раздела .

Есть два типа операций, которые мы можем выполнять с RDD: преобразования и действия .

4.1. Преобразования

Мы можем применять преобразования к RDD для управления его данными. После выполнения этой манипуляции мы получим совершенно новый RDD, поскольку RDD являются неизменяемыми объектами .

Мы проверим, как реализовать Map и Filter, два наиболее распространенных преобразования.

Во- первых, нам нужно создать JavaSparkContext и загрузить данные в виде RDD из файла Tourist.csv :

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

Далее давайте применим функцию карты, чтобы получить название страны из каждой записи и преобразовать название в верхний регистр. Мы можем сохранить этот вновь сгенерированный набор данных в виде текстового файла на диске:

JavaRDD<String> upperCaseCountries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1].toUpperCase();
}).distinct();

upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

Если мы хотим выбрать только конкретную страну, мы можем применить функцию фильтра к нашему исходному RDD для туристов:

JavaRDD<String> touristsInMexico = tourists
.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));

touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Действия

Действия вернут окончательное значение или сохранят результаты на диск после выполнения некоторых вычислений с данными.

Двумя из часто используемых действий в Spark являются Count и Reduce.

Давайте подсчитаем общее количество стран в нашем CSV-файле:

// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1];
}).distinct();

Long numberOfCountries = countries.count();

Теперь посчитаем общие расходы по странам. Нам нужно будет отфильтровать записи, содержащие расходы в их описании.

Вместо использования JavaRDD мы будем использовать JavaPairRDD . Пара RDD — это тип RDD, который может хранить пары ключ-значение . Давайте проверим это дальше:

JavaRDD<String> touristsExpenditure = tourists
.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));

JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
.mapToPair(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});

List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
.reduceByKey((x, y) -> x + y)
.collect();

5. Вывод

Подводя итог, мы должны использовать DataFrames или Datasets, когда нам нужны специфичные для предметной области API, нам нужны высокоуровневые выражения, такие как агрегация, сумма или SQL-запросы. Или когда нам нужна безопасность типов во время компиляции.

С другой стороны, мы должны использовать RDD, когда данные неструктурированы и нам не нужно реализовывать определенную схему или когда нам нужны низкоуровневые преобразования и действия.

Как всегда, все примеры кода доступны на GitHub .