...
Code Block |
---|
public class Window extends StreamTransform<StructuredRecord, StructuredRecord> { public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) { return stream.window(Durations.millis(conf.durationMS)); } } |
...
Use case 3 (window aggregates):
The apache log use case could then be implemented with a pipeline like:
Code Block |
---|
{ "artifact": { "scope": "SYSTEM", "name": "cdap-data-streams", "version": "3.5.0" }, "config": { "batchInterval": "5s", "stages": [ { "name": "logSource", "plugin": { "name": "FileWatcher", "type": "streamingsource", "properties": { "path": "/path/to/files" } } }, { "name": "parser", "plugin": { "name": "LogParser", "type": "transform", "properties": { ... } } }, { "name": "windower", "plugin": { "name": "Window", "type": "streamingtransform", "properties": { "size": "5m", "interval": "10s" } } }, { "name": "urlAggregates", "plugin": { "name": "GroupByAggregate", "type": "aggregator", "properties": { "groupByFields": "url", "aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)" } } }, { "name": "flattener", "plugin": { "name": "Javascript", "type": "transform", "properties": { "script": "function transform(input, emitter, context) { emitter.emit({ 'metric': input.url + ".fastest", 'value': input.fastest }); emitter.emit({ 'metric': input.url + ".slowest", 'value': input.slowest }); emitter.emit({ 'metric': input.url + ".mean", 'value': input.mean }); emitter.emit({ 'metric': input.url + ".numRequests", 'value': input.numRequests }); } " } } }, { "name": "urlTimeseries", "plugin": { "name": "TimeseriesTable", "type": "streamingsink", "properties": { "name": "urlStats", "keyField": "metric", "valueField": "value" } } } ], "connections": [ { "from": "logSource", "to": "parser" }, { "from": "parser", "to": "windower" }, { "from": "windower", "to": "urlAggregates" }, { "from": "urlAggregates", "to": "flattener" }, { "from": "flattener", "to": "urlTimeseries" } ] } } |