...
We will introduce a new artifact similar to the DataPipeline artifact, called the DataStreaming artifact. This artifact will support the transform, sparkcompute, and joiner plugin types. In addition, we will add streamingsource, streamingsink, and window plugin types.
The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:
Code Block |
---|
public interface StreamingSource<T> {
JavaDStream<T> getStream(JavaSparkContext jsc);
} |
Implementation for basic sources (http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources) should mostly involve just defining what is configurable and translating relevant objects into StructuredRecords :
Code Block |
---|
public class TextFileWatcherSource implements StreamingSource<StructuredRecord> {
private final Conf conf;
JavaDStream<StructuredRecord> getStream(JavaSparkContext jsc) {
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize));
return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class)
.map(new TextToStructuredRecordFunction());
}
} |
People can also use their own existing custom receivers (http://spark.apache.org/docs/latest/streaming-custom-receivers.html).
Code Block |
---|
public class MyStreamingSource { JavaDStream<StructuredRecord> getStream(JavaSparkContext jsc) { JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize)); return jssc.receiverStream(new MyCustomReceiver()).map(new MyObjectToStructuredRecordFunction()); } } |
Note to self: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState