...
- User stories documented (Albert)
- User stories reviewed (Nitin)
- Design documented (Albert)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:
Code Block |
---|
public interfaceabstract class StreamingSource<T> implements PipelineConfigurable, Serializable { public abstract JavaDStream<T> getStream(JavaStreamingContext jssc); } |
...
Code Block |
---|
public class TextFileWatcherSource implementsextends StreamingSource<StructuredRecord> { JavaDStream<StructuredRecord> getStream(JavaStreamingContext jssc) { return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class) .map(new TextToStructuredRecordFunction()); } } |
...
The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:
Code Block |
---|
public abstract interfaceclass StreamingSink<T> implements PipelineConfigurable, Serializable { public abstract void write(JavaDStream<T> stream, JavaSparkExecutionContext jsec); } |
...
A reduce function plugin will reduce all the records in a window into a single record.
Code Block |
---|
public interfaceabstract ReduceFunction<T> implements Serializable { public abstract T reduce(T record1, T record2); } |
...
Windowing can be implemented through a general StreamingTransform API:
Code Block |
---|
public abstract interfaceclass StreamTransform<IN, OUT> implements PipelineConfigurable, Serializable { public abstract JavaDStream<OUT> transform(JavaDStream<IN> stream) throws Exception; } |
...
Code Block |
---|
public class Window implementsextends StreamTransform<StructuredRecord, StructuredRecord> { public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) { return stream.window(Durations.millis(conf.durationMS)); } } |
Use case 3 (window aggregates):
The apache log use case could then be implemented with a pipeline like:
Code Block |
---|
{ "artifact": { "scope": "SYSTEM", "name": "cdap-data-streams", "version": "3.5.0" }, "config": { "batchInterval": "5s", "stages": [ { "name": "logSource", "plugin": { "name": "FileWatcher", "type": "streamingsource", "properties": { "path": "/path/to/files" } } }, { "name": "parser", "plugin": { "name": "LogParser", "type": "transform", "properties": { ... } } }, { "name": "windower", "plugin": { "name": "Window", "type": "streamingtransform", "properties": { "size": "5m", "interval": "10s" } } }, { "name": "urlAggregates", "plugin": { "name": "GroupByAggregate", "type": "aggregator", "properties": { "groupByFields": "url", "aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)" } } }, { "name": "flattener", "plugin": { "name": "Javascript", "type": "transform", "properties": { "script": "function transform(input, emitter, context) { emitter.emit({ 'metric': input.url + ".fastest", 'value': input.fastest }); emitter.emit({ 'metric': input.url + ".slowest", 'value': input.slowest }); emitter.emit({ 'metric': input.url + ".mean", 'value': input.mean }); emitter.emit({ 'metric': input.url + ".numRequests", 'value': input.numRequests }); } " } } }, { "name": "urlTimeseries", "plugin": { "name": "TimeseriesTable", "type": "streamingsink", "properties": { "name": "urlStats", "keyField": "metric", "valueField": "value" } } } ], "connections": [ { "from": "logSource", "to": "parser" }, { "from": "parser", "to": "windower" }, { "from": "windower", "to": "urlAggregates" }, { "from": "urlAggregates", "to": "flattener" }, { "from": "flattener", "to": "urlTimeseries" } ] } } |