Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.
The Workflow system in Cask Data Application Platform (CDAP) allows specifying, executing, scheduling, and monitoring complex series of jobs and tasks. In this guide, you will learn how it can be used to execute the MapReduce programs in parallel based on the evaluation of conditions.
What You Will Build
This guide will take you through building a CDAP application that uses ingested raw purchase events (of the form '<name> bought <n> <item>s for $<price>
', which are parsed using a primitive parser for sentences) to compute in parallel the total purchases made by each customer along with the total purchases made for each product.
You will:
Build
PurchaseEventParser
, a MapReduce program to parse the raw purchase events and createPurchase
objects from them;Build
PurchaseCounterByCustomer
, a MapReduce program to count the purchases made per customer;Build
PurchaseCounterByProduct
, a MapReduce program to count the purchases made per product;Build
PurchaseWorkflow
, a Workflow which will first execute the MapReduce programPurchaseEventParser
. If the predicatePurchaseEventVerifier
, which uses the MapReduce counters emitted by the PEP to determine data quality, evaluates to true, the workflow will in parallel execute the MapReduce programPurchaseCounterByCustomer
andPurchaseCounterByProduct
otherwise, it will execute the actionProblemLogger
;Use Datasets to persist results of the MapReduce programs; and
Build a Service to serve the results via HTTP.
What You Will Need
Let’s Build It!
The following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Build and Run Application section.
Application Design
The application will assume that the purchase events are ingested into a Stream. The events can be ingested into a Stream continuously in real time or in batches; whichever way, it doesn’t affect the ability of the MapReduce programs to consume them.
The PurchaseWorkflow
encapsulates the set of MapReduce programs, which extracts the required information from the raw purchase events and computes the total purchases made by each customer and total purchases made for each product in a specific time range. The results of the computation are persisted in Datasets.
Finally, the application contains a Service that exposes an HTTP endpoint to access the data stored in the Datasets.
Implementation
The first step is to construct our application structure. We will use a standard Maven project structure for all of the source code files:
./pom.xml ./src/main/java/co/cdap/guides/workflow/ProblemLogger.java ./src/main/java/co/cdap/guides/workflow/Purchase.java ./src/main/java/co/cdap/guides/workflow/PurchaseCounterByProduct.java ./src/main/java/co/cdap/guides/workflow/PurchaseCounterByCustomer.java ./src/main/java/co/cdap/guides/workflow/PurchaseEventParser.java ./src/main/java/co/cdap/guides/workflow/PurchaseEventVerifier.java ./src/main/java/co/cdap/guides/workflow/PurchaseResultService.java ./src/main/java/co/cdap/guides/workflow/PurchaseWorkflow.java ./src/main/java/co/cdap/guides/workflow/PurchaseWorkflowApp.java
The CDAP application is identified by the PurchaseWorkflowApp
class. This class extends an AbstractApplication, and overrides the configure
method to define all of the application components:
public class PurchaseWorkflowApp extends AbstractApplication { @Override public void configure() { setName("PurchaseWorkflowApp"); setDescription("Application describing the Workflow"); addStream(new Stream("purchaseEvents")); addMapReduce(new PurchaseEventParser()); addMapReduce(new PurchaseCounterByCustomer()); addMapReduce(new PurchaseCounterByProduct()); addWorkflow(new PurchaseWorkflow()); scheduleWorkflow(Schedules.builder("HourlySchedule") .setDescription("Schedule execution every 1 hour") .createTimeSchedule("0 * * * *"), "PurchaseWorkflow"); addService(new PurchaseResultService()); createDataset("purchaseRecords", KeyValueTable.class); createDataset("customerPurchases", KeyValueTable.class); createDataset("productPurchases", KeyValueTable.class); } }
The PurchaseWorkflowApp
application defines a new Stream where purchase events are ingested. Once the data is ingested, the events can be processed in real time or batch. In our application, we will process the events in batch using the PurchaseWorkflow
program and compute the total purchases made by each customer and the total purchases made for each product in a specific time range. We will use three MapReduce programs PurchaseEventParser
, PurchaseCounterByCustomer
, and PurchaseCounterByProduct
to apply different processing on the purchase events and the Workflow PurchaseWorkflow
to connect these MapReduce programs.
The result of the Workflow execution is persisted into Datasets; the application uses the createDataset
method to define the Dataset. We use three datasets: purchaseRecords
to store the valid parsed purchase events; customerPurchases
to store the total purchases made by each customer; and productPurchases
to store the total purchases made for each product. The Purchase
class defines the type used to store the parsed purchase events.
The application also adds a custom Workflow action ProblemLogger
. When a Workflow executes a custom action, it invokes the run
method in the action. In ProblemLogger
, we only add a log statement; however it could be customized to send emails to the concerned parties.
The PurchaseWorkflow
is scheduled to execute every hour.
Finally, the application adds a service for querying the results from the Datasets.
Let's take a closer look at the Workflow.
The PurchaseWorkflow
extends an AbstractWorkflow class and overrides the configure
method:
public class PurchaseWorkflow extends AbstractWorkflow { @Override protected void configure() { setName("PurchaseWorkflow"); setDescription("Workflow to parse the purchase events and count the revenue per customer and per product"); addMapReduce("PurchaseEventParser"); condition(new PurchaseEventVerifier()) .fork() .addMapReduce("PurchaseCounterByCustomer") .also() .addMapReduce("PurchaseCounterByProduct") .join() .otherwise() .addAction(new ProblemLogger()) .end(); } }
In the configure
method we specify the topology for connecting the programs which will run as a part of the Workflow execution. As the first action in the PurchaseWorkflow
, we add the MapReduce program PurchaseEventParser
. This program will parse raw purchase events (using a primitive sentence parser) and create Purchase
objects from them.
After that, we add a condition
in the Workflow, which takes a predicate PurchaseEventVerifier
. If the predicate evaluates to true, we fork
the execution of the Workflow into two parallel branches. One branch executes the PurchaseCounterByCustomer
MapReduce program, while the other executes the PurchaseCounterByProduct
MapReduce program.
If the predicate evaluates to false, then actions in the otherwise
section will be executed. We have added a single custom action, ProblemLogger
to the otherwise
section as an example of what is possible.
Lets take a closer look at the predicate PurchaseEventVerifier
.
public class PurchaseEventVerifier implements Predicate<WorkflowContext> { private static final String TASK_COUNTER_GROUP_NAME = "org.apache.hadoop.mapreduce.TaskCounter"; private static final String MAP_INPUT_RECORDS_COUNTER_NAME = "MAP_INPUT_RECORDS"; private static final String MAP_OUTPUT_RECORDS_COUNTER_NAME = "MAP_OUTPUT_RECORDS"; @Override public boolean apply(WorkflowContext workflowContext) { if (workflowContext == null) { return false; } WorkflowToken token = workflowContext.getToken(); if (token == null) { return false; } Value mapInputRecords = token.get(TASK_COUNTER_GROUP_NAME + "." + MAP_INPUT_RECORDS_COUNTER_NAME, WorkflowToken.Scope.SYSTEM); Value mapOutputRecords = token.get(TASK_COUNTER_GROUP_NAME + "." + MAP_OUTPUT_RECORDS_COUNTER_NAME, WorkflowToken.Scope.SYSTEM); if (mapInputRecords != null && mapOutputRecords != null) { // Return true if at least 80% of the records were successfully parsed and emitted // by previous map job return (mapOutputRecords.getAsLong() >= (mapInputRecords.getAsLong() * 80/100)); } return false; } }
PurchaseEventVerifier
needs to be a public class which implements the interface Predicate<WorkflowContext>
. The apply
method in the predicate takes WorkflowContext
as a parameter. The Hadoop counters emitted by the previous MapReduce program (in our case PurchaseEventParser
) can be retrieved in this method using the workflowContext
object. We query for the number of input records to the mappers and the number of records emitted by the mappers. If at least 80% of the records were successfully parsed and emitted as Purchase
by the mappers, the method returns true and the fork
in the Workflow will be executed. If the method returns false, the otherwise
section in the condition
is executed, which contains the ProblemLogger
custom action.
Build and Run Application
The PurchaseWorkflowApp
can be built and packaged using the Apache Maven command:
$ mvn clean package
Note that the remaining commands assume that the cdap-cli.sh
script is available on your PATH. If this is not the case, please add it:
$ export PATH=$PATH:<CDAP home>/bin
If you haven't already started a standalone CDAP installation, start it with the command:
$ cdap.sh start
We can then deploy the application to the standalone CDAP installation:
$ cdap-cli.sh load artifact target/cdap-workflow-guide-<version>.jar $ cdap-cli.sh create app PurchaseWorkflowApp cdap-workflow-guide <version> user
Next, we will send some sample purchase events into the stream for processing:
$ cdap-cli.sh send stream purchaseEvents '"bob bought 3 apples for $30"' $ cdap-cli.sh send stream purchaseEvents '"joe bought 1 apple for $100"' $ cdap-cli.sh send stream purchaseEvents '"joe bought 10 pineapples for $20"' $ cdap-cli.sh send stream purchaseEvents '"cat bought 3 bottles for $12"' $ cdap-cli.sh send stream purchaseEvents '"cat bought 2 pops for $14"'
We can now start the Workflow to process the events that were ingested:
$ cdap-cli.sh start workflow PurchaseWorkflowApp.PurchaseWorkflow
The Workflow will take a couple of minutes to execute.
We can then start the PurchaseResultService
and query the processed results:
$ cdap-cli.sh start service PurchaseWorkflowApp.PurchaseResultService
Retrieve the purchase records for customer
joe
:$ curl http://localhost:10000/v3/namespaces/default/apps/PurchaseWorkflowApp/services/PurchaseResultService/methods/purchaserecords/joe
Example output:
[{"customer":"joe","product":"pineapple","quantity":10,"price":20,"purchaseTime":1430962917227},{"customer":"joe","product":"apple","quantity":1,"price":100,"purchaseTime":1430962917227}]
Retrieve the total purchases made by customer
joe
:$ curl http://localhost:10000/v3/namespaces/default/apps/PurchaseWorkflowApp/services/PurchaseResultService/methods/purchases/customers/joe
Example output:
120
Retrieve the total purchases made for product
apple
:$ curl http://localhost:10000/v3/namespaces/default/apps/PurchaseWorkflowApp/services/PurchaseResultService/methods/purchases/products/apple
Example output:
130
You have now seen how to write a Workflow to connect different MapReduce programs and run them in parallel based on a condition.
Related Topics
CDAP MapReduce Guide tutorial for MapReduce
Wise: Web Analytics tutorial, part of CDAP
Share and Discuss!
Have a question? Discuss at the CDAP User Mailing List.
License
Copyright © 2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.