Batch Data Processing with CDAP
Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.
MapReduce is the most popular paradigm for processing large amounts of data in a reliable and fault-tolerant manner. In this guide, you will learn how to batch process data using MapReduce in the Cask Data Application Platform (CDAP).
What You Will Build
This guide will take you through building a CDAP application that uses ingested Apache access log events to compute the top 10 client IPs in a specific time-range and query the results. You will:
Build a MapReduce program to process Apache access log events;
Use a Dataset to persist results of the MapReduce program; and
Build a Service to serve the results via HTTP.
What You Will Need
Let’s Build It!
The following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the “Build and Run Application” section.
Application Design
The application will assume that the Apache access logs are ingested into a Stream. The log events can be ingested into a Stream continuously in real-time or in batches; whichever way, it doesn’t affect the ability of the MapReduce program to consume them.
The MapReduce program extracts the required information from the raw logs and computes the top 10 Client IPs by traffic in a specific time range. The results of the computation are persisted in a Dataset.
Finally, the application contains a Service that exposes an HTTP endpoint to access the data stored in the Dataset.
Implementation
The first step is to construct our application structure. We will use a standard Maven project structure for all of the source code files:
./pom.xml
./src/main/java/co/cdap/guides/ClientCount.java
./src/main/java/co/cdap/guides/CountsCombiner.java
./src/main/java/co/cdap/guides/IPMapper.java
./src/main/java/co/cdap/guides/LogAnalyticsApp.java
./src/main/java/co/cdap/guides/TopClientsMapReduce.java
./src/main/java/co/cdap/guides/TopClientsService.java
./src/main/java/co/cdap/guides/TopNClientsReducer.java
The CDAP application is identified by the LogAnalyticsApp
 class. This class extends an AbstractApplication, and overrides the configure()
 method to define all of the application components:
public class LogAnalyticsApp extends AbstractApplication {
public static final String DATASET_NAME = "topClients";
@Override
public void configure() {
setName("LogAnalyticsApp");
setDescription("An application that computes the top 10 Client IPs from Apache access log data");
addStream(new Stream("logEvents"));
addMapReduce(new TopClientsMapReduce());
try {
DatasetProperties props = ObjectStores.objectStoreProperties(Types.listOf(ClientCount.class),
DatasetProperties.EMPTY);
createDataset(DATASET_NAME, ObjectStore.class, props);
} catch (UnsupportedTypeException e) {
throw Throwables.propagate(e);
}
addService(new TopClientsService());
}
}
The LogAnalytics
 application defines a new Stream where Apache access logs are ingested.
The log events can be ingested into the CDAP stream. Once the data is ingested, the events can be processed in real-time or batch. In our application, we will process the events in batch using the TopClientsMapReduce
 program and compute the top 10 Client IPs in a specific time-range.
The results of the MapReduce program is persisted into a Dataset; the application uses the createDataset
 method to define the Dataset. The ClientCount
 class defines the types used in the Dataset.
Finally, the application adds a service for querying the results from the Dataset.
Let's take a closer look at the MapReduce program.
The TopClientsMapReduce
 extends an AbstractMapReduce class and overrides the configure()
 and initialize()
 methods:
configure()
 method configures a MapReduce, setting the program name, description and output Dataset.initialize()
 method is invoked at runtime, before the MapReduce is executed. Here you can access the Hadoop job configuration through theÂMapReduceContext
 returned byÂgetContext()
. Mapper, Reducer, and Combiner classes—as well as the intermediate data format—are set in this method.
public class TopClientsMapReduce extends AbstractMapReduce {
@Override
public void configure() {
setName("TopClientsMapReduce");
setDescription("MapReduce program that computes top 10 clients in the last 1 hour");
}
@Override
public void initialize() throws Exception {
MapReduceContext context = getContext();
// Get the Hadoop job context, set Mapper, Reducer and Combiner.
Job job = (Job) context.getHadoopJob();
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(IPMapper.class);
job.setCombinerClass(CountsCombiner.class);
// Number of reducer set to 1 to compute topN in a single reducer.
job.setNumReduceTasks(1);
job.setReducerClass(TopNClientsReducer.class);
// Read events from last 60 minutes as input to the mapper.
final long endTime = context.getLogicalStartTime();
final long startTime = endTime - TimeUnit.MINUTES.toMillis(60);
context.addInput(Input.ofStream("logEvents", startTime, endTime));
context.addOutput(Output.ofDataset(LogAnalyticsApp.RESULTS_DATASET_NAME));
}
}
In this example, Mapper and Reducer classes are built by implementing the Hadoop APIs.
In the application, the Mapper class reads the Apache access log event from the Stream and produces the Client IP and count as the intermediate map output key and value:
The reducer class gets the Client IP and count from the MapReducer job and then aggregates the count for each Client IP and stores it in a priority queue. The number of reducers is set to 1, so that all results go into the same reducer to compute the top 10 results. The top 10 results are written to the MapReduce context in the cleanup method of the Reducer, which is called once during the end of the task. Writing the results in the context automatically writes the result to the output Dataset, specified in the configure()
 method of the MapReduce program.
Now that we have set the data ingestion and processing components, the next step is to create a service to query the processed data.
The TopClientsService
 defines a simple HTTP RESTful endpoint to perform this query and return a response:
Build and Run Application
The LogAnalyticsApp
 can be built and packaged using the Apache Maven command:
Note that the remaining commands assume that the cdap-cli.sh
 script is available on your PATH. If this is not the case, please add it:
If you haven't already started a standalone CDAP installation, start it with the command:
We can then deploy the application to the standalone CDAP installation:
Next, we will send some sample Apache access log event into the stream for processing:
We can now start the MapReduce program to process the events that were ingested:
The MapReduce program will take a couple of moments to process.
We can then start the TopClientsService
 and then query the processed results:
Example output:
You have now learned how to write a MapReduce program to process events from a Stream, write the results to a Dataset and query the results using a Service.
Related Topics
Wise: Web Analytics tutorial, part of CDAP
Extend This Example
Now that you have the basics of MapReduce programs down, you can extend this example by:
Writing a workflow to schedule this MapReduce every hour and process the previous hour's data
Store the results in a Timeseries data to analyze trends
Share and Discuss!
Have a question? Discuss at the CDAP User Mailing List.
License
Copyright © 2014-2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.