Iterative Data Processing with Apache Spark
Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.
Apache Spark is a very popular engine to perform in-memory cluster computing for Apache Hadoop. In this guide, you will learn how to run Apache Spark programs with CDAP.
What You Will Build
You will build a CDAP application that exposes a RESTful API to take in web pages’ backlinks information and serve out the PageRank for the known web pages. You will:
Use a Stream as the source of backlinks data;
Build a CDAP Spark program that reads directly from the Stream and computes the PageRank of the web pages;
Use a Dataset to store the output of the Spark program; and
Build a Service to serve the PageRank computation results over HTTP.
What You Will Need
Let’s Build It!
The following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Build and Run Application section.
Application Design
Backlinks data is sent to the backlinkURLStream over HTTP (e.g. by a web crawler as it processes web pages). The PageRank for known pages is computed periodically by a PageRankProgram. The program uses the backlinkURLStream as an input and persists the results in the pageRanks dataset.
The PageRankService then uses the pageRanks dataset to serve the PageRank for a given URL over HTTP.
In this guide we assume that the backlinks data will be sent to a CDAP application.
Implementation
The first step is to construct the application structure. We will use a standard Maven project structure for all of the source code files:
./pom.xml
./src/main/java/co/cask/cdap/guides/PageRankApp.java
./src/main/java/co/cask/cdap/guides/PageRankSpark.java
./src/main/java/co/cask/cdap/guides/PageRankHandler.java
./src/main/scala/co/cask/cdap/guides/PageRankProgram.scala
The application is identified by the PageRankApp
 class. This class extends AbstractApplication, and overrides the configure()
 method to define all of the application components:
public class PageRankApp extends AbstractApplication {
public static final String PAGE_RANK_RANKS_SERVICE = "PageRankService";
public static final String PAGE_RANK_BACKLINK_STREAM = "backlinkURLStream";
public static final String PAGE_RANK_RANKS_DATASET = "pageRanks";
@Override
public void configure() {
addSpark(new PageRankSpark());
addStream(new Stream(PAGE_RANK_BACKLINK_STREAM));
addService(PAGE_RANK_RANKS_SERVICE, new PageRankHandler());
try {
ObjectStores.createObjectStore(getConfigurer(), PAGE_RANK_RANKS_DATASET, Double.class);
} catch (UnsupportedTypeException e) {
throw new RuntimeException("Will never happen: all classes above are supported", e);
}
}
}
In this example, we use a Stream to supply backlinks data; the Spark program that computes the PageRank of the web pages reads directly from the Stream. backlinkURLStream
 receives backlinks information in the form of two URLs separated by whitespace:
http://example.com/page1 http://example.com/page10
We’ll use Scala to write the Spark program (for an example of using Java, refer to the CDAP SparkPageRank example). You’ll need to add scala
 and maven-scala-plugin
 as dependencies in your Maven pom.xml.
The code below configures Spark in CDAP. This class extends AbstractSpark and overrides the configure()
 method to define all of the components. The setMainClassName
 method sets the Spark Program class which CDAP will run:
The PageRankProgram
 Spark program does the actual page rank computation. This code is taken from the Apache Spark's PageRank example; the Spark program stores the computed PageRank in an ObjectStore Dataset where the key is the URL and the value is the computed PageRank:
To serve results out via HTTP, add a PageRankHandler
, which reads the PageRank for a given URL from the pageRanks
 dataset:
Build and Run Application
The PageRankApp
 application can be built and packaged using the Apache Maven command:
Note that the remaining commands assume that the cdap-cli.sh
 script is available on your PATH. If this is not the case, please add it:
If you haven't already started a standalone CDAP installation, start it with the command:
You can then deploy the application to a standalone CDAP installation:
Start the Service:
Send some Data to the Stream:
Run the Spark Program:
The Spark Program can take time to complete. You can check the status for completion using:
Query for the PageRank results:
Example output:
Congratulations! You have now learned how to incorporate Spark programs into your CDAP applications. Please continue to experiment and extend this sample application.
Share and Discuss!
Have a question? Discuss at the CDAP User Mailing List.
License
Copyright © 2014-2016 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.