Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class MyStreamingSource {
 
  JavaDStream<StructuredRecord> getStream(JavaSparkContext jsc) {
    JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize));
    return jssc.receiverStream(new MyCustomReceiver()).map(new MyObjectToStructuredRecordFunction());
  }
}

 

Note to self: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState

 

The streamingsink plugin type simply takes a JavaDStream and performs some operation on it:

Code Block
public interface StreamingSink<T> {
  void write(JavaDStream<T> stream);
}

See http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams for information on how to write context of a Spark DStream.

Note: Investigate support for realtimesink plugin type using the foreachRDD() method.

 

Multiple connections into a stage become a union of all DStreams coming into that stage.

Code Block

Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator();
JavaDStream<Object> stageStream = previousStreamsIter.next();
while (previousStreamsIter.hasNext()) {
  stageStream = stageStream.union(previousStreamsIter.next());
}

 

A Transform stage will become a map() call on its input DStream.

Code Block

  final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName);
  transform.initialize(transformContext);
  stageStream.flatMap(new FlatMapFunction<Object, Object>() {
    @Override
    public Iterable<Object> call(Object o) throws Exception {
      DefaultEmitter<Object> emitter = new DefaultEmitter<>();
      transform.transform(o, emitter);
      return emitter.getEntries();
    }
  });

 

A SparkCompute stage will become a transform() call on its input DStream

Code Block

  final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName);
  stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() {
    @Override
    public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
      return sparkCompute.transform(computeContext, v1);
    }
  });

 

A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs).