Consuming Data from Kafka
Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.
Consuming data from a Kafka topic and processing the messages received in real time is a common part of many big data applications. In this guide, you will learn how to accomplish it with the Cask Data Application Platform (CDAP).
What You Will Build
You will build a CDAP application that consumes data from a Kafka cluster v0.8.x on a specific topic and computes the average size of the messages received. You will:
Build a real-time Flow that subscribes to a Kafka topic;
Use the cdap-kafka-pack library to build a Flowlet to consume from Kafka;
Use a Dataset to persist the results of the analysis;
Build a Service to retrieve the analysis results via an HTTP RESTful endpoint; and
Start a Kafka Server v0.8.x and publish messages to the topic which the CDAP Application is subscribed.
What You Will Need
Let’s Build It!
Following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Build and Run Application section.
Application Implementation
Real-time processing capability within CDAP is supported by Flows. The application we are building in this guide uses a Flow for processing the messages received on a Kafka topic. The count and total size of these messages are persisted in a Dataset and made available via an HTTP RESTful endpoint using a Service.
The Flow consists of two processing nodes called Flowlets:
A subscriber Flowlet that subscribes to a specific topic on a Kafka cluster and emits the messages received to the next Flowlet.
A counter Flowlet that consumes the message emitted by the Kafka subscriber Flowlet to update the basic statistics of Kafka messages: total message size and count.
Application Implementation
The recommended way to build a CDAP application from scratch is to use a Maven project. Use the following directory structure (you’ll find contents of these files described below):
./pom.xml
./src/main/java/co/cask/cdap/guides/kafka/KafkaConsumerFlowlet.java
./src/main/java/co/cask/cdap/guides/kafka/KafkaIngestionApp.java
./src/main/java/co/cask/cdap/guides/kafka/KafkaIngestionFlow.java
./src/main/java/co/cask/cdap/guides/kafka/KafkaMessageCounterFlowlet.java
./src/main/java/co/cask/cdap/guides/kafka/KafkaStatsHandler.java
The application will use the cdap-kafka-pack
library which includes an implementation of the Kafka08ConsumerFlowlet
, which is designed to work with a 0.8.x Kakfa Cluster. If you want to use the application with a 0.7.x Kakfa Cluster, please refer to the documentation of c`dap-kafka-pack`.
You'll need to add the correct cdap-kafka-pack
library, based on your Kafka cluster version (cdap-flow-compat-0.8
for this guide) as a dependency to your project's pom.xml:
...
<dependencies>
...
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-kafka-flow-compat-0.8</artifactId>
<version>0.1.0</version>
</dependency>
</dependencies>
Create the KafkaIngestionApp
class which declares that the application has a Flow, a Service, and creates two Datasets:
public class KafkaIngestionApp extends AbstractApplication {
@Override
public void configure() {
setName(Constants.APP_NAME);
createDataset(Constants.OFFSET_TABLE_NAME, KeyValueTable.class);
createDataset(Constants.STATS_TABLE_NAME, KeyValueTable.class);
addFlow(new KafkaIngestionFlow());
addService(Constants.SERVICE_NAME, new KafkaStatsHandler());
}
}
The KafkaIngestionFlow
connects the KafkaConsumerFlowlet
to the KafkaMessageCounterFlowlet
:
The KafkaConsumerFlowlet
subclasses from the Kafka08ConsumerFlowlet
available in the cdap-kafka-pack
library:
Messages received by the KafkaConsumerFlowlet
are consumed by the KafkaMessageCounterFlowlet
that updates the total number of messages and their total size in the kafkaCounter
Dataset:
In a real-world scenario, the Flowlet could perform more sophisticated processing on the messages received from Kafka.
Finally, the KafkaStatsHandler
uses the kafkaCounter
Dataset to compute the average message size and serve it over HTTP:
Configuring the KafkaConsumerFlowlet
In order to utilize the KafkaConsumerFlowlet
, a Kafka ZooKeeper connection string along with a Kafka topic must be provided as runtime arguments. You can provide these to the KafkaConsumerFlowlet
as runtime arguments of the KafkaIngestionFlow
. (See the Build and Run Application section for information on how to pass the arguments to the program at the start.) The keys of these runtime arguments are:
Build and Run Application
The KafkaIngestionApp
application can be built and packaged using the Apache Maven command:
Note that the remaining commands assume that the cdap-cli.sh
script is available on your PATH. If this is not the case, please add it:
If you haven't already started a standalone CDAP installation, start it with the command:
We can then deploy the application to a standalone CDAP installation:
We can then start its components (note the runtime arguments, as described above in Configuring the KafkaConsumerFlowlet):
You can also use the CDAP CLI to start the Flow and Service:
Once the Flow is started, Kafka messages are processed as they are published. Now, let's send data to the Kafka topic.
Publish Messages to a Kakfa topic
If you don't have Kafka v0.8.x, you can download the binary at Kafka 0.8.x Download. Be sure you download v0.8.x (we recommend Kafka v0.8.0), as this guide is designed to work specifically with that version.
Follow the instructions on Kafka v0.8.x Quickstart to publish messages to MyTopic
. The instructions are repeated below for your convenience and assume you have downloaded the binary distribution:
Once the kafka-console-producer.sh
script is invoked, you can type messages on the console and every line is published as a message to MyTopic
. Go ahead and publish a few messages, such as:
Query Results
You can query for the average size of the Kafka messages:
Example output:
Share and Discuss!
Have a question? Discuss at the CDAP User Mailing List.
License
Copyright © 2014-2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.