Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Task marked complete

...

  •  User stories documented (Albert)
  •  User stories reviewed (Nitin)
  •  Design documented (Albert)
  •  Design reviewed (Terence/Andreas)
  •  Feature merged ()
  •  Examples and guides ()
  •  Integration tests () 
  •  Documentation for feature ()
  •  Blog post

...

The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:

Code Block
public interfaceabstract class StreamingSource<T> implements PipelineConfigurable, Serializable {
   public abstract JavaDStream<T> getStream(JavaStreamingContext jssc);
}

...

Code Block
public class TextFileWatcherSource implementsextends  StreamingSource<StructuredRecord> {
  JavaDStream<StructuredRecord> getStream(JavaStreamingContext jssc) {
    return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class)
      .map(new TextToStructuredRecordFunction());
  }
}

...

The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:

Code Block
public abstract interfaceclass StreamingSink<T> implements PipelineConfigurable, Serializable {
  public abstract void write(JavaDStream<T> stream, JavaSparkExecutionContext jsec);
}

...

An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey().

...

ReduceFunction

A reducer reduce function plugin will reduce all the records in a window into a single record.

Code Block
public interfaceabstract ReduceFunction<T> implements Serializable {
  public abstract T reduce(T record1, T record2);
}

...

Windowing can be implemented through a general StreamingTransform API:

Code Block
public interfaceabstract class StreamTransform<IN, OUT> implements PipelineConfigurable, Serializable {
 
  public abstract JavaDStream<OUT> transform(JavaDStream<IN> stream) throws Exception;
 
}

...

Code Block
public class Window implementsextends StreamTransform<StructuredRecord, StructuredRecord> {
 
  public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) {
    return stream.window(Durations.millis(conf.durationMS));
  }
 
}

Use case 3 (window aggregates):

The apache log use case could then be implemented with a pipeline like: 

Code Block
{
  "artifact": {
    "scope": "SYSTEM",
    "name": "cdap-data-streams",
    "version": "3.5.0"
  },
  "config": {
    "batchInterval": "5s",
    "stages": [
      {
        "name": "logSource",
        "plugin": {
          "name": "FileWatcher",
          "type": "streamingsource",
          "properties": {
            "path": "/path/to/files"
          }
        }
      },
      {
        "name": "parser",
        "plugin": {
          "name": "LogParser",
          "type": "transform",
          "properties": { ... }
        }
      },
      {
        "name": "windower",
        "plugin": {
          "name": "Window",
          "type": "streamingtransform",
          "properties": {
            "size": "5m",
            "interval": "10s"
          }
        }
      },
      {
        "name": "urlAggregates",
        "plugin": {
          "name": "GroupByAggregate",
          "type": "aggregator",
          "properties": {
            "groupByFields": "url",
            "aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)"
          }
        }
      },
      {
        "name": "flattener",
        "plugin": {
          "name": "Javascript",
          "type": "transform",
          "properties": {
            "script": "function transform(input, emitter, context) {
                emitter.emit({
                               'metric': input.url + ".fastest",
                               'value': input.fastest
                             });
                emitter.emit({
                               'metric': input.url + ".slowest",
                               'value': input.slowest
                             });
                emitter.emit({
                               'metric': input.url + ".mean",
                               'value': input.mean
                             });
                emitter.emit({
                               'metric': input.url + ".numRequests",
                               'value': input.numRequests
                             });
              }
            " 
          }
        }
      },
      {
        "name": "urlTimeseries",
        "plugin": {
          "name": "TimeseriesTable",
          "type": "streamingsink",
          "properties": {
            "name": "urlStats",
            "keyField": "metric",
            "valueField": "value"
          }
        }
      }
    ],
    "connections": [
      { "from": "logSource", "to": "parser" },
      { "from": "parser", "to": "windower" },
      { "from": "windower", "to": "urlAggregates" },
      { "from": "urlAggregates", "to": "flattener" },
      { "from": "flattener", "to": "urlTimeseries" }
    ]
  }
}