Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.
MapReduce is the most popular paradigm for processing large amounts of data in a reliable and fault-tolerant manner. In this guide, you will learn how to batch process data using MapReduce in the Cask Data Application Platform (CDAP).
What You Will Build
This guide will take you through building a CDAP application that uses ingested Apache access log events to compute the top 10 client IPs in a specific time-range and query the results. You will:
Build a MapReduce program to process Apache access log events;
Use a Dataset to persist results of the MapReduce program; and
Build a Service to serve the results via HTTP.
What You Will Need
Let’s Build It!
The following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Build and Run Application section.
Application Design
The application will assume that the Apache access logs are ingested into a Stream. The log events can be ingested into a Stream continuously in real-time or in batches; whichever way, it doesn’t affect the ability of the MapReduce program to consume them.
The MapReduce program extracts the required information from the raw logs and computes the top 10 Client IPs by traffic in a specific time range. The results of the computation are persisted in a Dataset.
Finally, the application contains a Service that exposes an HTTP endpoint to access the data stored in the Dataset.
Implementation
The first step is to construct our application structure. We will use a standard Maven project structure for all of the source code files:
./pom.xml ./src/main/java/co/cdap/guides/ClientCount.java ./src/main/java/co/cdap/guides/CountsCombiner.java ./src/main/java/co/cdap/guides/IPMapper.java ./src/main/java/co/cdap/guides/LogAnalyticsApp.java ./src/main/java/co/cdap/guides/TopClientsMapReduce.java ./src/main/java/co/cdap/guides/TopClientsService.java ./src/main/java/co/cdap/guides/TopNClientsReducer.java
The CDAP application is identified by the LogAnalyticsApp
class. This class extends an AbstractApplication, and overrides the configure()
method to define all of the application components:
public class LogAnalyticsApp extends AbstractApplication { public static final String DATASET_NAME = "topClients"; @Override public void configure() { setName("LogAnalyticsApp"); setDescription("An application that computes the top 10 Client IPs from Apache access log data"); addStream(new Stream("logEvents")); addMapReduce(new TopClientsMapReduce()); try { DatasetProperties props = ObjectStores.objectStoreProperties(Types.listOf(ClientCount.class), DatasetProperties.EMPTY); createDataset(DATASET_NAME, ObjectStore.class, props); } catch (UnsupportedTypeException e) { throw Throwables.propagate(e); } addService(new TopClientsService()); } }
The LogAnalytics
application defines a new Stream where Apache access logs are ingested.
The log events can be ingested into the CDAP stream. Once the data is ingested, the events can be processed in real-time or batch. In our application, we will process the events in batch using the TopClientsMapReduce
program and compute the top 10 Client IPs in a specific time-range.
The results of the MapReduce program is persisted into a Dataset; the application uses the createDataset
method to define the Dataset. The ClientCount
class defines the types used in the Dataset.
Finally, the application adds a service for querying the results from the Dataset.
Let's take a closer look at the MapReduce program.
The TopClientsMapReduce
extends an AbstractMapReduce class and overrides the configure()
and initialize()
methods:
configure()
method configures a MapReduce, setting the program name, description and output Dataset.initialize()
method is invoked at runtime, before the MapReduce is executed. Here you can access the Hadoop job configuration through theMapReduceContext
returned bygetContext()
. Mapper, Reducer, and Combiner classes—as well as the intermediate data format—are set in this method.
public class TopClientsMapReduce extends AbstractMapReduce { @Override public void configure() { setName("TopClientsMapReduce"); setDescription("MapReduce program that computes top 10 clients in the last 1 hour"); } @Override public void initialize() throws Exception { MapReduceContext context = getContext(); // Get the Hadoop job context, set Mapper, Reducer and Combiner. Job job = (Job) context.getHadoopJob(); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setMapperClass(IPMapper.class); job.setCombinerClass(CountsCombiner.class); // Number of reducer set to 1 to compute topN in a single reducer. job.setNumReduceTasks(1); job.setReducerClass(TopNClientsReducer.class); // Read events from last 60 minutes as input to the mapper. final long endTime = context.getLogicalStartTime(); final long startTime = endTime - TimeUnit.MINUTES.toMillis(60); context.addInput(Input.ofStream("logEvents", startTime, endTime)); context.addOutput(Output.ofDataset(LogAnalyticsApp.RESULTS_DATASET_NAME)); } }
In this example, Mapper and Reducer classes are built by implementing the Hadoop APIs.
In the application, the Mapper class reads the Apache access log event from the Stream and produces the Client IP and count as the intermediate map output key and value:
public class IPMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final IntWritable OUTPUT_VALUE = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // The body of the stream event is contained in the Text value String streamBody = value.toString(); if (streamBody != null && !streamBody.isEmpty()) { String ip = streamBody.substring(0, streamBody.indexOf(" ")); // Map output Key: IP and Value: Count context.write(new Text(ip), OUTPUT_VALUE); } } }
The reducer class gets the Client IP and count from the MapReducer job and then aggregates the count for each Client IP and stores it in a priority queue. The number of reducers is set to 1, so that all results go into the same reducer to compute the top 10 results. The top 10 results are written to the MapReduce context in the cleanup method of the Reducer, which is called once during the end of the task. Writing the results in the context automatically writes the result to the output Dataset, specified in the configure()
method of the MapReduce program.
public class TopNClientsReducer extends Reducer<Text, IntWritable, byte[], List<ClientCount>> { private static final int COUNT = 10; private static final PriorityQueue<ClientCount> priorityQueue = new PriorityQueue<ClientCount>(COUNT); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // For each Key: IP, aggregate the Value: Count. int count = 0; for (IntWritable data : values) { count += data.get(); } // Store the Key and Value in a priority queue. priorityQueue.add(new ClientCount(key.toString(), count)); // Ensure the priority queue is always contains topN count. if (priorityQueue.size() > COUNT) { priorityQueue.poll(); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // Write topN results in reduce output. Since the "topN" (ObjectStore) Dataset is used as // output the entries will be written to the Dataset without any additional effort. List<ClientCount> topNResults = Lists.newArrayList(); while (priorityQueue.size() != 0) { topNResults.add(priorityQueue.poll()); } context.write(TopClientsService.DATASET_RESULTS_KEY, topNResults); } }
Now that we have set the data ingestion and processing components, the next step is to create a service to query the processed data.
The TopClientsService
defines a simple HTTP RESTful endpoint to perform this query and return a response:
public class TopClientsService extends AbstractService { public static final byte [] DATASET_RESULTS_KEY = {'r'}; @Override protected void configure() { setName("TopClientsService"); addHandler(new ResultsHandler()); } public static class ResultsHandler extends AbstractHttpServiceHandler { @UseDataSet(LogAnalyticsApp.DATASET_NAME) private ObjectStore<List<ClientCount>> topN; @GET @Path("/results") public void getResults(HttpServiceRequest request, HttpServiceResponder responder) { List<ClientCount> result = topN.read(DATASET_RESULTS_KEY); if (result == null) { responder.sendError(404, "Result not found"); } else { responder.sendJson(200, result); } } } }
Build and Run Application
The LogAnalyticsApp
can be built and packaged using the Apache Maven command:
$ mvn clean package
Note that the remaining commands assume that the cdap-cli.sh
script is available on your PATH. If this is not the case, please add it:
$ export PATH=$PATH:<CDAP home>/bin
If you haven't already started a standalone CDAP installation, start it with the command:
$ cdap.sh start
We can then deploy the application to the standalone CDAP installation:
$ cdap-cli.sh load artifact target/cdap-mapreduce-guide-<version>.jar $ cdap-cli.sh create app LogAnalyticsApp cdap-mapreduce-guide <version> user
Next, we will send some sample Apache access log event into the stream for processing:
$ cdap-cli.sh send stream logEvents \'255.255.255.185 - - [23/Sep/2014:11:45:38 -0400] "GET /cdap.html HTTP/1.0" 200 190 "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)"\' $ cdap-cli.sh send stream logEvents \'255.255.255.185 - - [23/Sep/2014:11:45:38 -0400] "GET /tigon.html HTTP/1.0" 200 102 "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)"\' $ cdap-cli.sh send stream logEvents \'255.255.255.185 - - [23/Sep/2014:11:45:38 -0400] "GET /coopr.html HTTP/1.0" 200 121 "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)"\' $ cdap-cli.sh send stream logEvents \'255.255.255.182 - - [23/Sep/2014:11:45:38 -0400] "GET /tigon.html HTTP/1.0" 200 111 "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)"\' $ cdap-cli.sh send stream logEvents \'255.255.255.182 - - [23/Sep/2014:11:45:38 -0400] "GET /tigon.html HTTP/1.0" 200 145 "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)"\'
We can now start the MapReduce program to process the events that were ingested:
$ cdap-cli.sh start mapreduce LogAnalyticsApp.TopClientsMapReduce
The MapReduce program will take a couple of moments to process.
We can then start the TopClientsService
and then query the processed results:
$ cdap-cli.sh start service LogAnalyticsApp.TopClientsService $ curl -w'\n' http://localhost:10000/v3/namespaces/default/apps/LogAnalyticsApp/services/TopClientsService/methods/results
Example output:
[{"clientIP":"255.255.255.185","count":3},{"clientIP":"255.255.255.182","count":2}]
You have now learned how to write a MapReduce program to process events from a Stream, write the results to a Dataset and query the results using a Service.
Related Topics
Wise: Web Analytics tutorial, part of CDAP
Extend This Example
Now that you have the basics of MapReduce programs down, you can extend this example by:
Writing a workflow to schedule this MapReduce every hour and process the previous hour's data
Store the results in a Timeseries data to analyze trends
Share and Discuss!
Have a question? Discuss at the CDAP User Mailing List.
License
Copyright © 2014-2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.