Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class Window extends StreamTransform<StructuredRecord, StructuredRecord> {
 
  public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) {
    return stream.window(Durations.millis(conf.durationMS));
  }
 
}

 Use case 3 (window aggregates)

Code Block
{
  "artifact": {
    "scope": "SYSTEM",
    "name": "cdap-data-streams",
    "version": "3.5.0"
  },
  "config": {
    "batchInterval": "5s",
    "stages": [
      {
        "name": "logSource",
        "plugin": {
          "name": "FileWatcher",
          "type": "streamingsource",
          "properties": {
            "path": "/path/to/files"
          }
        }
      },
      {
        "name": "parser",
        "plugin": {
          "name": "LogParser",
          "type": "transform",
          "properties": { ... }
        }
      },
      {
        "name": "windower",
        "plugin": {
          "name": "Window",
          "type": "streamingtransform",
          "properties": {
            "size": "5m",
            "interval": "10s"
          }
        }
      },
      {
        "name": "urlAggregates",
        "plugin": {
          "name": "GroupByAggregate",
          "type": "aggregator",
          "properties": {
            "groupByFields": "url",
            "aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)"
          }
        }
      },
      {
        "name": "flattener",
        "plugin": {
          "name": "Javascript",
          "type": "transform",
          "properties": {
            "script": "function transform(input, emitter, context) {
                emitter.emit({
                               'metric': input.url + ".fastest",
                               'value': input.fastest
                             });
                emitter.emit({
                               'metric': input.url + ".slowest",
                               'value': input.slowest
                             });
                emitter.emit({
                               'metric': input.url + ".mean",
                               'value': input.mean
                             });
                emitter.emit({
                               'metric': input.url + ".numRequests",
                               'value': input.numRequests
                             });
              }
            " 
          }
        }
      },
      {
        "name": "urlTimeseries",
        "plugin": {
          "name": "TimeseriesTable",
          "type": "streamingsink",
          "properties": {
            "name": "urlStats",
            "keyField": "metric",
            "valueField": "value"
          }
        }
      }
    ],
    "connections": [
      { "from": "logSource", "to": "parser" },
      { "from": "parser", "to": "windower" },
      { "from": "windower", "to": "urlAggregates" },
      { "from": "urlAggregates", "to": "flattener" },
      { "from": "flattener", "to": "urlTimeseries" }
    ]
  }
}