...
We will introduce a new artifact similar to the DataPipeline artifact, called the DataStreaming artifact. It will use the exact same configuration, except it will support its own set of plugin types. This artifact will support the transform, sparkcompute, aggregator, and joiner plugin types. In addition, we will add streamingsource, streamingsink, and window streamingtransform plugin types.
StreamingSource
The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:
...
Note: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState
StreamingSink
The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:
...
Code Block |
---|
@Override public void write(JavaDStream<StructuredRecord> stream, final JavaSparkExecutionContext jsec) throws Exception { final String tableName = conf.table; stream.mapToPair(new PairFunction<StructuredRecord, byte[], Put>() { @Override public Tuple2<byte[], Put> call(StructuredRecord record) throws Exception { Put put = recordPutTransformer.toPut(record); return new Tuple2<>(put.getRow(), put); } }).foreachRDD(new VoidFunction<JavaPairRDD<byte[], Put>>() { @Override public void call(JavaPairRDD<byte[], Put> putJavaPairRDD) throws Exception { jsec.saveAsDataset(putJavaPairRDD, tableName); } }); } |
Unions
Multiple connections into a stage become a union of all DStreams coming into that stage.
Code Block |
---|
Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator(); JavaDStream<Object> stageStream = previousStreamsIter.next(); while (previousStreamsIter.hasNext()) { stageStream = stageStream.union(previousStreamsIter.next()); } |
Transforms
A Transform stage will become a flatMap() call on its input DStream.
Code Block |
---|
final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName); transform.initialize(transformContext); stageStream.flatMap(new FlatMapFunction<Object, Object>() { @Override public Iterable<Object> call(Object o) throws Exception { DefaultEmitter<Object> emitter = new DefaultEmitter<>(); transform.transform(o, emitter); return emitter.getEntries(); } }); |
...
SparkCompute
A SparkCompute stage will become a transform() call on its input DStream
Code Block |
---|
final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName); stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() { @Override public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception { return sparkCompute.transform(computeContext, v1); } }); |
...
Join
A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs).
Aggregator
An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey().
StreamingTransform
Windowing can be implemented through a general StreamingTransform API:
...