...
Code Block |
---|
public class MyStreamingSource { JavaDStream<StructuredRecord> getStream(JavaSparkContext jsc) { JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize)); return jssc.receiverStream(new MyCustomReceiver()).map(new MyObjectToStructuredRecordFunction()); } } |
Note to self: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState
The streamingsink plugin type simply takes a JavaDStream and performs some operation on it:
Code Block |
---|
public interface StreamingSink<T> {
void write(JavaDStream<T> stream);
} |
See http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams for information on how to write context of a Spark DStream.
Note: Investigate support for realtimesink plugin type using the foreachRDD() method.
Multiple connections into a stage become a union of all DStreams coming into that stage.
Code Block |
---|
Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator();
JavaDStream<Object> stageStream = previousStreamsIter.next();
while (previousStreamsIter.hasNext()) {
stageStream = stageStream.union(previousStreamsIter.next());
} |
A Transform stage will become a map() call on its input DStream.
Code Block |
---|
final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName);
transform.initialize(transformContext);
stageStream.flatMap(new FlatMapFunction<Object, Object>() {
@Override
public Iterable<Object> call(Object o) throws Exception {
DefaultEmitter<Object> emitter = new DefaultEmitter<>();
transform.transform(o, emitter);
return emitter.getEntries();
}
}); |
A SparkCompute stage will become a transform() call on its input DStream
Code Block |
---|
final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName);
stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
return sparkCompute.transform(computeContext, v1);
}
}); |
A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs).