Amazon Kinesis Spark Streaming Source

Description

Spark streaming source to ingest real-time streaming data into Hydrator pipelines.

User Story

User has a continuous data stream, generated by click streams, which is being dumped into Amazon Kinesis streams from various nodes. User has live dashboards running powered by Hydrator pipelines and want to feed the data being generated into the dashboards. User can leverage Kinesis Spark streaming source to connect the incoming stream to a Hydrator pipeline.

This will also help users to leverage AWS sources that are not currently supported by Hydrator or don't have existing plugins for them. Kinesis has out-of-the-box support for all of the AWS products and can act as a connector between AWS services and Hydrator pipelines.

Properties

Kinesis App Name: The application name that is used to checkpoint the Kinesis sequence numbers in a DynamoDB table by the Kinesis Spark streaming library. The application name must be unique for a given account and region.

Kinesis Stream name: The Kinesis stream that this streaming application will pull data from.

Endpoint URL: A valid Kinesis endpoints URL.

Region name: A valid Kinesis region name(s?).

Checkpoint interval: The interval (for example, duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

Initial position: Can be either InitialPositionInStream.TRIM_HORIZON or InitialPositionInStream.LATEST (see Kinesis Checkpointing section of the Amazon Kinesis API documentation for more details).

AWS access key Id: Api access key ID for Amazon Web Services

AWS access secret: Api access secret for Amazon Web Services

Design

 

Implementation

 

getStream
public JavaDStream<StructuredRecord> getStream(StreamingContext streamingContext) throws Exception {
  registerUsage(streamingContext);
  JavaStreamingContext javaStreamingContext = streamingContext.getSparkStreamingContext();
  JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(javaStreamingContext, config);
return kinesisStream.map(new MyObjectToStructuredRecordFunction());

 

Â