1. Введение
В этом руководстве мы продемонстрируем Apache Crunch на примере приложения для обработки данных. Мы запустим это приложение, используя фреймворк MapReduce .
Мы начнем с краткого рассмотрения некоторых концепций Apache Crunch. Затем мы перейдем к примеру приложения. В этом приложении мы будем обрабатывать текст:
- Сначала прочитаем строки из текстового файла
- Позже мы разделим их на слова и удалим некоторые общие слова.
- Then, we'll group the remaining words to get a list of unique words and their counts
- Finally, we'll write this list to a text file
2. What Is Crunch?
MapReduce is a distributed, parallel programming framework for processing large amounts of data on a cluster of servers. Software frameworks such as Hadoop and Spark implement MapReduce.
Crunch provides a framework for writing, testing and running MapReduce pipelines in Java. Here, we don't write the MapReduce jobs directly. Rather, we define data pipeline (i.e. the operations to perform input, processing, and output steps) using the Crunch APIs. Crunch Planner maps them to the MapReduce jobs and executes them when needed.
Therefore, every Crunch data pipeline is coordinated by an instance of the Pipeline
interface. This interface also defines methods for reading data into a pipeline via Source
instances and writing data out from a pipeline to Target
instances.
We have 3 interfaces for representing data:
PCollection
– an immutable, distributed collection of elementsPTable<K
, V>
– an immutable, distributed, unordered multi-map of keys and valuesPGroupedTable<K
, V>
– a distributed, sorted map of keys of type K to anIterable
V that may be iterated over exactly once
DoFn
is the base class for all data processing functions . It corresponds to Mapper
, Reducer
and Combiner
classes in MapReduce. We spend most of the development time writing and testing logical computations using it .
Now that we're more familiar with Crunch, let's use it to build the example application.
3. Setting up a Crunch Project
First of all, let's set up a Crunch Project with Maven. We can do so in two ways:
- Add the required dependencies in the
pom.xml
file of an existing project - Use an archetype to generate a starter project
Let's have a quick look at both approaches.
3.1. Maven Dependencies
In order to add Crunch to an existing project, let's add the required dependencies in the pom.xml
file.
First, let's add the crunch-core
library:
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>0.15.0</version>
</dependency>
Next, let's add the hadoop-client
library to communicate with Hadoop. We use the version matching Hadoop installation:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
We can check Maven Central for the latest versions of crunch-core and hadoop-client libraries.
3.2. Maven Archetype
Another approach is to quickly generate a starter project using the Maven archetype provided by Crunch :
mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype
When prompted by the above command, we provide the Crunch version and the project artifact details.
4. Crunch Pipeline Setup
After setting up the project, we need to create a Pipeline
object. Crunch has 3 Pipeline
implementations :
MRPipeline
– executes within Hadoop MapReduceSparkPipeline
– executes as a series of Spark pipelinesMemPipeline
– executes in-memory on the client and is useful for unit testing
Usually, we develop and test using an instance of MemPipeline
. Later we use an instance of MRPipeline
or SparkPipeline
for actual execution.
If we needed an in-memory pipeline, we could use the static method getInstance
to get the MemPipeline
instance:
Pipeline pipeline = MemPipeline.getInstance();
But for now, let's create an instance of MRPipeline
to execute the application with Hadoop :
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
5. Read Input Data
After creating the pipeline object, we want to read input data. The Pipeline
interface provides a convenience method to read input from a text file , readTextFile(pathName).
Let's call this method to read the input text file:
PCollection<String> lines = pipeline.readTextFile(inputPath);
The above code reads the text file as a collection of String
.
As the next step, let's write a test case for reading input:
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
In this test, we verify that we get the expected number of lines when reading a text file.
6. Data Processing Steps
After reading the input data, we need to process it. Crunch API contains a number of subclasses of DoFn
to handle common data processing scenarios :
FilterFn
– filters members of a collection based on a boolean conditionMapFn
– maps each input record to exactly one output recordCombineFn
– combines a number of values into a single valueJoinFn
– performs joins such as inner join, left outer join, right outer join and full outer join
Let's implement the following data processing logic by using these classes:
- Split each line in the input file into words
- Remove the stop words
- Count the unique words
6.1. Split a Line of Text Into Words
First of all, let's create the Tokenizer
class to split a line into words.
We'll extend the DoFn
class. This class has an abstract method called process
. This method processes the input records from a PCollection
and sends the output to an Emitter.
We need to implement the splitting logic in this method:
public class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter
.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line, Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}
In the above implementation, we've used the Splitter
class from Guava library to extract words from a line.
Next, let's write a unit test for the Tokenizer
class:
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}
The above test verifies that the correct words are returned.
Finally, let's split the lines read from the input text file using this class.
The parallelDo
method of PCollection
interface applies the given DoFn
to all the elements and returns a new PCollection
.
Let's call this method on the lines collection and pass an instance of Tokenizer
:
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
As a result, we get the list of words in the input text file. We'll remove the stop words in the next step.
6.2. Remove Stop Words
Similarly to the previous step, let's create a StopWordFilter
class to filter out stop words.
However, we'll extend FilterFn
instead of DoFn
. FilterFn
has an abstract method called accept
. We need to implement the filtering logic in this method:
public class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
"or", "s", "such", "t", "that", "the", "their", "then", "there",
"these", "they", "this", "to", "was", "will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}
Next, let's write the unit test for StopWordFilter
class:
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
assertFalse(filter.accept("a"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
assertTrue(filter.accept("World"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline
.collectionOf("This", "is", "a", "test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}
This test verifies that the filtering logic is performed correctly.
Finally, let's use StopWordFilter
to filter the list of words generated in the previous step. The filter
method of PCollection
interface applies the given FilterFn
to all the elements and returns a new PCollection
.
Let's call this method on the words collection and pass an instance of StopWordFilter
:
PCollection<String> noStopWords = words.filter(new StopWordFilter());
As a result, we get the filtered collection of words.
6.3. Count Unique Words
After getting the filtered collection of words, we want to count how often each word occurs. PCollection
interface has a number of methods to perform common aggregations:
min
– returns the minimum element of the collectionmax
– returns the maximum element of the collectionlength
– returns the number of elements in the collectioncount
– returns aPTable
that contains the count of each unique element of the collection
Let's use the count
method to get the unique words along with their counts:
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
7. Specify Output
As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline
interface provides convenience methods to write output:
void write(PCollection<?> collection, Target target);
void write(PCollection<?> collection, Target target,
Target.WriteMode writeMode);
<T> void writeTextFile(PCollection<T> collection, String pathName);
Therefore, let's call the writeTextFile
method:
pipeline.writeTextFile(counts, outputPath);
8. Manage Pipeline Execution
All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.
It doesn't run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:
run
– prepares an execution plan to create the required outputs and then executes it synchronouslydone
– runs any remaining jobs required to generate outputs and then cleans up any intermediate data files createdrunAsync
– similar to run method, but executes in a non-blocking fashion
Therefore, let's call the done
method to execute the pipeline as MapReduce jobs:
PipelineResult result = pipeline.done();
The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.
9. Putting the Pipeline Together
So far we have developed and unit tested the logic to read input data, process it and write to the output file.
Next, let's put them together to build the entire data pipeline:
public int run(String[] args) throws Exception {
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
// Take the collection of words and remove known stop words.
PCollection<String> noStopWords = words.filter(new StopWordFilter());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
10. Hadoop Launch Configuration
The data pipeline is thus ready.
However, we need the code to launch it. Therefore, let's write the main
method to launch the application:
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
ToolRunner.run
parses the Hadoop configuration from the command line and executes the MapReduce job.
11. Run Application
The complete application is now ready. Let's run the following command to build it:
mvn package
As a result of the above command, we get the packaged application and a special job jar in the target directory.
Let's use this job jar to execute the application on Hadoop:
hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>
The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:
[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]
In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.
12. Conclusion
В этом руководстве мы создали приложение для обработки данных, работающее на MapReduce. Apache Crunch упрощает написание, тестирование и выполнение конвейеров MapReduce на Java.
Как обычно, полный исходный код можно найти на Github .