Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will introduce a new artifact similar to the DataPipeline artifact, called the DataStreaming artifact.  It will use the exact same configuration, except it will support its own set of plugin types. This artifact will support the transform, sparkcompute, aggregator, and joiner plugin types.  In addition, we will add streamingsource, streamingsink, and window streamingtransform plugin types.

StreamingSource

The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:

...

Note: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState

 

StreamingSink

The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:

...

Code Block
@Override
public void write(JavaDStream<StructuredRecord> stream, final JavaSparkExecutionContext jsec) throws Exception {
  final String tableName = conf.table;
  stream.mapToPair(new PairFunction<StructuredRecord, byte[], Put>() {
    @Override
    public Tuple2<byte[], Put> call(StructuredRecord record) throws Exception {
      Put put = recordPutTransformer.toPut(record);
      return new Tuple2<>(put.getRow(), put);
    }
  }).foreachRDD(new VoidFunction<JavaPairRDD<byte[], Put>>() {
    @Override
    public void call(JavaPairRDD<byte[], Put> putJavaPairRDD) throws Exception {
      jsec.saveAsDataset(putJavaPairRDD, tableName);
    }
  });
}

 

Unions

Multiple connections into a stage become a union of all DStreams coming into that stage.

Code Block
Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator();
JavaDStream<Object> stageStream = previousStreamsIter.next();
while (previousStreamsIter.hasNext()) {
  stageStream = stageStream.union(previousStreamsIter.next());
}

 

Transforms

A Transform stage will become a flatMap() call on its input DStream.

Code Block
  final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName);
  transform.initialize(transformContext);
  stageStream.flatMap(new FlatMapFunction<Object, Object>() {
    @Override
    public Iterable<Object> call(Object o) throws Exception {
      DefaultEmitter<Object> emitter = new DefaultEmitter<>();
      transform.transform(o, emitter);
      return emitter.getEntries();
    }
  });

...

SparkCompute

A SparkCompute stage will become a transform() call on its input DStream

Code Block
  final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName);
  stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() {
    @Override
    public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
      return sparkCompute.transform(computeContext, v1);
    }
  });

...

Join

A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs). 

Aggregator

An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey(). 

StreamingTransform

Windowing can be implemented through a general StreamingTransform API:

...