...
- User stories documented (Albert)
- User stories reviewed (Nitin)
- Design documented (Albert)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
Code Block |
---|
public class Window extends StreamTransform<StructuredRecord, StructuredRecord> { public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) { return stream.window(Durations.millis(conf.durationMS)); } } |
Use case 3 (window aggregates):
The apache log use case could then be implemented with a pipeline like:
Code Block |
---|
{
"artifact": {
"scope": "SYSTEM",
"name": "cdap-data-streams",
"version": "3.5.0"
},
"config": {
"batchInterval": "5s",
"stages": [
{
"name": "logSource",
"plugin": {
"name": "FileWatcher",
"type": "streamingsource",
"properties": {
"path": "/path/to/files"
}
}
},
{
"name": "parser",
"plugin": {
"name": "LogParser",
"type": "transform",
"properties": { ... }
}
},
{
"name": "windower",
"plugin": {
"name": "Window",
"type": "streamingtransform",
"properties": {
"size": "5m",
"interval": "10s"
}
}
},
{
"name": "urlAggregates",
"plugin": {
"name": "GroupByAggregate",
"type": "aggregator",
"properties": {
"groupByFields": "url",
"aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)"
}
}
},
{
"name": "flattener",
"plugin": {
"name": "Javascript",
"type": "transform",
"properties": {
"script": "function transform(input, emitter, context) {
emitter.emit({
'metric': input.url + ".fastest",
'value': input.fastest
});
emitter.emit({
'metric': input.url + ".slowest",
'value': input.slowest
});
emitter.emit({
'metric': input.url + ".mean",
'value': input.mean
});
emitter.emit({
'metric': input.url + ".numRequests",
'value': input.numRequests
});
}
"
}
}
},
{
"name": "urlTimeseries",
"plugin": {
"name": "TimeseriesTable",
"type": "streamingsink",
"properties": {
"name": "urlStats",
"keyField": "metric",
"valueField": "value"
}
}
}
],
"connections": [
{ "from": "logSource", "to": "parser" },
{ "from": "parser", "to": "windower" },
{ "from": "windower", "to": "urlAggregates" },
{ "from": "urlAggregates", "to": "flattener" },
{ "from": "flattener", "to": "urlTimeseries" }
]
}
} |