...
- User stories documented (Albert)
- User stories reviewed (Nitin)
- Design documented (Albert)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:
Code Block |
---|
public interfaceabstract class StreamingSource<T> implements PipelineConfigurable, Serializable { public abstract JavaDStream<T> getStream(JavaStreamingContext jssc); } |
...
Code Block |
---|
public class TextFileWatcherSource extends implements StreamingSource<StructuredRecord> { JavaDStream<StructuredRecord> getStream(JavaStreamingContext jssc) { return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class) .map(new TextToStructuredRecordFunction()); } } |
...
The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:
Code Block |
---|
public abstract interfaceclass StreamingSink<T> implements PipelineConfigurable, Serializable { public abstract void write(JavaDStream<T> stream, JavaSparkExecutionContext jsec); } |
...
An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey().
ReduceFunction
A reduce function plugin will reduce all the records in a window into a single record.
Code Block |
---|
public abstract ReduceFunction<T> implements Serializable { public abstract T reduce(T record1, T record2); } |
This can be used to do things like calculate the min, max, average, count, etc of records in a window.
This will translate directly into a reduce(Function2<T, T, T>) call on a JavaDStream<T>.
StreamingTransform
Windowing can be implemented through a general StreamingTransform API:
Code Block |
---|
public abstract interfaceclass StreamTransform<IN, OUT> implements PipelineConfigurable, Serializable { public abstract JavaDStream<OUT> transform(JavaDStream<IN> stream) throws Exception; } |
...
Code Block |
---|
public class Window implementsextends StreamTransform<StructuredRecord, StructuredRecord> { public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) { return stream.window(Durations.millis(conf.durationMS)); } } |
Use case 3 (window aggregates):
The apache log use case could then be implemented with a pipeline like:
Code Block |
---|
{
"artifact": {
"scope": "SYSTEM",
"name": "cdap-data-streams",
"version": "3.5.0"
},
"config": {
"batchInterval": "5s",
"stages": [
{
"name": "logSource",
"plugin": {
"name": "FileWatcher",
"type": "streamingsource",
"properties": {
"path": "/path/to/files"
}
}
},
{
"name": "parser",
"plugin": {
"name": "LogParser",
"type": "transform",
"properties": { ... }
}
},
{
"name": "windower",
"plugin": {
"name": "Window",
"type": "streamingtransform",
"properties": {
"size": "5m",
"interval": "10s"
}
}
},
{
"name": "urlAggregates",
"plugin": {
"name": "GroupByAggregate",
"type": "aggregator",
"properties": {
"groupByFields": "url",
"aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)"
}
}
},
{
"name": "flattener",
"plugin": {
"name": "Javascript",
"type": "transform",
"properties": {
"script": "function transform(input, emitter, context) {
emitter.emit({
'metric': input.url + ".fastest",
'value': input.fastest
});
emitter.emit({
'metric': input.url + ".slowest",
'value': input.slowest
});
emitter.emit({
'metric': input.url + ".mean",
'value': input.mean
});
emitter.emit({
'metric': input.url + ".numRequests",
'value': input.numRequests
});
}
"
}
}
},
{
"name": "urlTimeseries",
"plugin": {
"name": "TimeseriesTable",
"type": "streamingsink",
"properties": {
"name": "urlStats",
"keyField": "metric",
"valueField": "value"
}
}
}
],
"connections": [
{ "from": "logSource", "to": "parser" },
{ "from": "parser", "to": "windower" },
{ "from": "windower", "to": "urlAggregates" },
{ "from": "urlAggregates", "to": "flattener" },
{ "from": "flattener", "to": "urlTimeseries" }
]
}
} |