Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


We will introduce a new artifact similar to the DataPipeline artifact, called the DataStreaming artifact.  It will use the exact same configuration, except it will support its own set of plugin types. This artifact will support the transform, sparkcompute, aggregator, and joiner plugin types.  In addition, we will add streamingsource, streamingsink, and window streamingtransform plugin types.


The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:


Note: Investigate support for realtimesource plugin type using to update SourceState



The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:


Code Block
public void write(JavaDStream<StructuredRecord> stream, final JavaSparkExecutionContext jsec) throws Exception {
  final String tableName = conf.table;
  stream.mapToPair(new PairFunction<StructuredRecord, byte[], Put>() {
    public Tuple2<byte[], Put> call(StructuredRecord record) throws Exception {
      Put put = recordPutTransformer.toPut(record);
      return new Tuple2<>(put.getRow(), put);
  }).foreachRDD(new VoidFunction<JavaPairRDD<byte[], Put>>() {
    public void call(JavaPairRDD<byte[], Put> putJavaPairRDD) throws Exception {
      jsec.saveAsDataset(putJavaPairRDD, tableName);



Multiple connections into a stage become a union of all DStreams coming into that stage.

Code Block
Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator();
JavaDStream<Object> stageStream =;
while (previousStreamsIter.hasNext()) {
  stageStream = stageStream.union(;



A Transform stage will become a flatMap() call on its input DStream.

Code Block
  final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName);
  stageStream.flatMap(new FlatMapFunction<Object, Object>() {
    public Iterable<Object> call(Object o) throws Exception {
      DefaultEmitter<Object> emitter = new DefaultEmitter<>();
      transform.transform(o, emitter);
      return emitter.getEntries();



A SparkCompute stage will become a transform() call on its input DStream

Code Block
  final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName);
  stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() {
    public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
      return sparkCompute.transform(computeContext, v1);



A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs). 


An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey(). 


Windowing can be implemented through a general StreamingTransform API:
