Iterative Data Processing with Apache Spark

Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.

Apache Spark is a very popular engine to perform in-memory cluster computing for Apache Hadoop. In this guide, you will learn how to run Apache Spark programs with CDAP.

What You Will Build

You will build a CDAP application that exposes a RESTful API to take in web pages’ backlinks information and serve out the PageRank for the known web pages. You will:

  • Use a Stream as the source of backlinks data;

  • Build a CDAP Spark program that reads directly from the Stream and computes the PageRank of the web pages;

  • Use a Dataset to store the output of the Spark program; and

  • Build a Service to serve the PageRank computation results over HTTP.

What You Will Need

Let’s Build It!

The following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Build and Run Application section.

Application Design

Backlinks data is sent to the backlinkURLStream over HTTP (e.g. by a web crawler as it processes web pages). The PageRank for known pages is computed periodically by a PageRankProgram. The program uses the backlinkURLStream as an input and persists the results in the pageRanks dataset.

The PageRankService then uses the pageRanks dataset to serve the PageRank for a given URL over HTTP.

In this guide we assume that the backlinks data will be sent to a CDAP application.

Implementation

The first step is to construct the application structure. We will use a standard Maven project structure for all of the source code files:

./pom.xml ./src/main/java/co/cask/cdap/guides/PageRankApp.java ./src/main/java/co/cask/cdap/guides/PageRankSpark.java ./src/main/java/co/cask/cdap/guides/PageRankHandler.java ./src/main/scala/co/cask/cdap/guides/PageRankProgram.scala

The application is identified by the PageRankApp class. This class extends AbstractApplication, and overrides the configure() method to define all of the application components:

public class PageRankApp extends AbstractApplication { public static final String PAGE_RANK_RANKS_SERVICE = "PageRankService"; public static final String PAGE_RANK_BACKLINK_STREAM = "backlinkURLStream"; public static final String PAGE_RANK_RANKS_DATASET = "pageRanks"; @Override public void configure() { addSpark(new PageRankSpark()); addStream(new Stream(PAGE_RANK_BACKLINK_STREAM)); addService(PAGE_RANK_RANKS_SERVICE, new PageRankHandler()); try { ObjectStores.createObjectStore(getConfigurer(), PAGE_RANK_RANKS_DATASET, Double.class); } catch (UnsupportedTypeException e) { throw new RuntimeException("Will never happen: all classes above are supported", e); } } }

In this example, we use a Stream to supply backlinks data; the Spark program that computes the PageRank of the web pages reads directly from the Stream. backlinkURLStream receives backlinks information in the form of two URLs separated by whitespace:

http://example.com/page1 http://example.com/page10

We’ll use Scala to write the Spark program (for an example of using Java, refer to the CDAP SparkPageRank example). You’ll need to add scala and maven-scala-plugin as dependencies in your Maven pom.xml.

The code below configures Spark in CDAP. This class extends AbstractSpark and overrides the configure() method to define all of the components. The setMainClassName method sets the Spark Program class which CDAP will run:

The PageRankProgram Spark program does the actual page rank computation. This code is taken from the Apache Spark's PageRank example; the Spark program stores the computed PageRank in an ObjectStore Dataset where the key is the URL and the value is the computed PageRank:

To serve results out via HTTP, add a PageRankHandler, which reads the PageRank for a given URL from the pageRanks dataset:

Build and Run Application

The PageRankApp application can be built and packaged using the Apache Maven command:

Note that the remaining commands assume that the cdap-cli.sh script is available on your PATH. If this is not the case, please add it:

If you haven't already started a standalone CDAP installation, start it with the command:

You can then deploy the application to a standalone CDAP installation:

Start the Service:

Send some Data to the Stream:

Run the Spark Program:

The Spark Program can take time to complete. You can check the status for completion using:

Query for the PageRank results:

Example output:

Congratulations! You have now learned how to incorporate Spark programs into your CDAP applications. Please continue to experiment and extend this sample application.

Share and Discuss!

Have a question? Discuss at the CDAP User Mailing List.

License

Copyright © 2014-2016 Cask Data, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.