Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface StreamingSource<T> {
 
  JavaDStream<T> getStream(JavaSparkContextJavaStreamingContext jscjssc);
}

Implementation for basic sources (http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources) should mostly involve just defining what is configurable and translating relevant objects into StructuredRecords :

Code Block
public class TextFileWatcherSource implements StreamingSource<StructuredRecord> {
  private final Conf conf;
 
  JavaDStream<StructuredRecord> getStream(JavaSparkContextJavaStreamingContext jscjssc) {
    JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize));
    return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class)
      .map(new TextToStructuredRecordFunction());
  }
}

...

Code Block
public class MyStreamingSource {
 
  JavaDStream<StructuredRecord> getStream(JavaSparkContextJavaStreamingContext jscjssc) {
    JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(conf.batchSize));
    return jssc.receiverStream(new MyCustomReceiver()).map(new MyObjectToStructuredRecordFunction());
  }
}

...