Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Task marked complete

...

To allow users to use the Hydrator drag and drop UI to easily create pipelines that run on Spark Streaming, leveraging built-in capabilities like windowing and machine learning.

Checklist

  •  User stories documented (Albert)
  •  User stories reviewed (Nitin)
  •  Design documented (Albert)
  •  Design reviewed (Terence/Andreas)
  •  Feature merged ()
  •  Examples and guides ()
  •  Integration tests () 
  •  Documentation for feature ()
  •  Blog post

...

  1. ETL - An development team has some realtime Hydrator pipelines that use a CDAP Worker. They want to run their ETL pipelines using Spark Streaming because their company is standardizing on Spark.

  2. Data enrichment - Every time a purchase is made on an online store, an event with purchase information is pushed to Kafka. The event contains a timestamp, purchase id, customer id, item id, and price. A pipeline developer wants to create a realtime pipeline that reads events from Kafka and joins customer information (email, age, gender, etc) to each event, then writes the events to a CDAP Table.

  3. Window and Reduce - Apache log data is being fed into Kafka. Each event contains the response code, response time, url, and other fields. A pipeline developer wants to create a pipeline that writes a set of output records every 10 seconds, where each record contains a url, and the min, max, and average response times for that url over the last five minutes.  
  4. Machine Learning - An email client is set up to push an event to a Kafka topic whenever somebody uses the client to send an email. The client is also set up to push an event to another topic whenever an email is marked as spam. A pipeline developer wants to create a realtime pipeline that reads events from spam topic and trains a spam classification model in realtime using Streaming linear regression (http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression). The pipeline developer also wants to create another pipeline that reads from the email topic and adds a 'isSpam' field to each record based on the model trained by the other pipeline.Windowing - Stock trade events are being pushed to Kafka.
  5. Ordered Event Processing - Customer locations are being fed into Kafka. Each event contains the customer id and their location. A pipeline developer wants to create a realtime pipeline that examines all trades made in various time windows and looks for trades in each window that were made for a significantly different amount of money than other trades in the window. The pipeline makes an HTTP call to some external system for each such trade event.reads these events and transforms it into records containing the customer id, speed of the customer, distance of the customer from their home, a flag indicating whether or not the customer just left their home, and demographic information about the customer (age, gender, etc).  Note: this use case doesn't seem possible without doing some weird things in SparkStreaming

User Stories

  1. As a pipeline developer, I want to create realtime ETL pipelines that run using Spark Streaming. (3.5)

  2. As a pipeline developer, I want to be able to group events into time windows in my streaming pipeline. (3.5)

  3. As a pipeline developer, I want to perform aggregations over windows of events in my pipeline. (3.5)

  4. As a pipeline developer, I want to enrich streaming events by joining to other datasets. (3.5)

  5. As a pipeline developer, I want to be able to window events join data streams in my pipeline. (3.5)

  6. As a pipeline developer, I want to train machine learning models in my streaming pipeline. (4.0)
  7. As a plugin developer, I want to write a transform, aggregate, and join plugins that will be able to create new streaming source and sink plugins. (3.5)

  8. As a plugin developer, I want my transform, aggregator, joiner, and sparkcompute plugins to work in both Spark Streaming and the Data Pipelines.  

Design

...

  1. Data Pipelines. (3.5)

  2. As a plugin developer, I want to be able to write plugins that train ML models using MLLib. (4.0)

Design

We will introduce a new artifact similar to the DataPipeline artifact, called the DataStreaming artifact.  It will use the exact same configuration, except it will support its own set of plugin types. This artifact will support the transform, sparkcompute, aggregator, and joiner plugin types.  In addition, we will add streamingsource, streamingsink, and streamingtransform plugin types. 

Each pipeline will contain a config setting called 'batchInterval', which controls how much data is contained in each RDD of the discretized stream at the source(s) of the pipeline.

StreamingSource

The streamingsource plugin type simply takes a JavaSparkContext and returns a JavaDStream:

Code Block
public abstract class StreamingSource<T> implements PipelineConfigurable, Serializable {
  public abstract JavaDStream<T> getStream(JavaStreamingContext jssc);
}

Implementation for basic sources (http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources) should mostly involve just defining what is configurable and translating relevant objects into StructuredRecords :

Code Block
public class TextFileWatcherSource extends  StreamingSource<StructuredRecord> {
  JavaDStream<StructuredRecord> getStream(JavaStreamingContext jssc) {
    return jssc.fileStream(conf.path, LongWritable.class, Text.class, TextInputFormat.class)
      .map(new TextToStructuredRecordFunction());
  }
}

People can also use their own existing custom receivers (http://spark.apache.org/docs/latest/streaming-custom-receivers.html).

Code Block
public class MyStreamingSource {
 
  JavaDStream<StructuredRecord> getStream(JavaStreamingContext jssc) {
    return jssc.receiverStream(new MyCustomReceiver()).map(new MyObjectToStructuredRecordFunction());
  }
}

Note: Investigate support for realtimesource plugin type using http://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation to update SourceState

StreamingSink

The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:

Code Block
public abstract class StreamingSink<T> implements PipelineConfigurable, Serializable {
  public abstract void write(JavaDStream<T> stream, JavaSparkExecutionContext jsec);
}

See http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams for information on how to write context of a Spark DStream.

For example, writing to a Table would look something like:

Code Block
@Override
public void write(JavaDStream<StructuredRecord> stream, final JavaSparkExecutionContext jsec) throws Exception {
  final String tableName = conf.table;
  stream.mapToPair(new PairFunction<StructuredRecord, byte[], Put>() {
    @Override
    public Tuple2<byte[], Put> call(StructuredRecord record) throws Exception {
      Put put = recordPutTransformer.toPut(record);
      return new Tuple2<>(put.getRow(), put);
    }
  }).foreachRDD(new VoidFunction<JavaPairRDD<byte[], Put>>() {
    @Override
    public void call(JavaPairRDD<byte[], Put> putJavaPairRDD) throws Exception {
      jsec.saveAsDataset(putJavaPairRDD, tableName);
    }
  });
}

Unions

Multiple connections into a stage become a union of all DStreams coming into that stage.

Code Block
Iterator<JavaDStream<Object>> previousStreamsIter = previousStreams.iterator();
JavaDStream<Object> stageStream = previousStreamsIter.next();
while (previousStreamsIter.hasNext()) {
  stageStream = stageStream.union(previousStreamsIter.next());
}

Transforms

A Transform stage will become a flatMap() call on its input DStream.

Code Block
  final Transform<Object, Object> transform = pluginContext.newPluginInstance(stageName);
  transform.initialize(transformContext);
  stageStream.flatMap(new FlatMapFunction<Object, Object>() {
    @Override
    public Iterable<Object> call(Object o) throws Exception {
      DefaultEmitter<Object> emitter = new DefaultEmitter<>();
      transform.transform(o, emitter);
      return emitter.getEntries();
    }
  });

SparkCompute

A SparkCompute stage will become a transform() call on its input DStream

Code Block
  final SparkCompute sparkCompute = pluginContext.newPluginInstance(stageName);
  stageStream = stageStream.transform(new Function<JavaRDD<Object>, JavaRDD<Object>>() {
    @Override
    public JavaRDD<Object> call(JavaRDD<Object> v1) throws Exception {
      return sparkCompute.transform(computeContext, v1);
    }
  });

Join

A Join stage with n inputs will implemented as n mapToPair() calls on all inputs into the stage, then x - 1 joins (assuming x inputs).

Aggregator

An aggregator plugin will operate over windows in the stream, and can be implemented by a call to flatMapToPair(), then reduceByKey().

ReduceFunction

A reduce function plugin will reduce all the records in a window into a single record.

Code Block
public abstract ReduceFunction<T> implements Serializable {
  public abstract T reduce(T record1, T record2);
}

This can be used to do things like calculate the min, max, average, count, etc of records in a window.

This will translate directly into a reduce(Function2<T, T, T>) call on a JavaDStream<T>.

StreamingTransform

Windowing can be implemented through a general StreamingTransform API:

Code Block
public abstract class StreamTransform<IN, OUT> implements PipelineConfigurable, Serializable {
 
  public abstract JavaDStream<OUT> transform(JavaDStream<IN> stream) throws Exception;
 
}

For example, a window plugin would be implemented like:

Code Block
public class Window extends StreamTransform<StructuredRecord, StructuredRecord> {
 
  public JavaDStream<StructuredRecord> transform(JavaDStream<StructuredRecord> stream) {
    return stream.window(Durations.millis(conf.durationMS));
  }
 
}

Use case 3 (window aggregates):

The apache log use case could then be implemented with a pipeline like: 

Code Block
{
  "artifact": {
    "scope": "SYSTEM",
    "name": "cdap-data-streams",
    "version": "3.5.0"
  },
  "config": {
    "batchInterval": "5s",
    "stages": [
      {
        "name": "logSource",
        "plugin": {
          "name": "FileWatcher",
          "type": "streamingsource",
          "properties": {
            "path": "/path/to/files"
          }
        }
      },
      {
        "name": "parser",
        "plugin": {
          "name": "LogParser",
          "type": "transform",
          "properties": { ... }
        }
      },
      {
        "name": "windower",
        "plugin": {
          "name": "Window",
          "type": "streamingtransform",
          "properties": {
            "size": "5m",
            "interval": "10s"
          }
        }
      },
      {
        "name": "urlAggregates",
        "plugin": {
          "name": "GroupByAggregate",
          "type": "aggregator",
          "properties": {
            "groupByFields": "url",
            "aggregates": "fastest:min(responseTime), slowest:max(responseTime), mean:avg(responseTime), numRequests:count(*)"
          }
        }
      },
      {
        "name": "flattener",
        "plugin": {
          "name": "Javascript",
          "type": "transform",
          "properties": {
            "script": "function transform(input, emitter, context) {
                emitter.emit({
                               'metric': input.url + ".fastest",
                               'value': input.fastest
                             });
                emitter.emit({
                               'metric': input.url + ".slowest",
                               'value': input.slowest
                             });
                emitter.emit({
                               'metric': input.url + ".mean",
                               'value': input.mean
                             });
                emitter.emit({
                               'metric': input.url + ".numRequests",
                               'value': input.numRequests
                             });
              }
            " 
          }
        }
      },
      {
        "name": "urlTimeseries",
        "plugin": {
          "name": "TimeseriesTable",
          "type": "streamingsink",
          "properties": {
            "name": "urlStats",
            "keyField": "metric",
            "valueField": "value"
          }
        }
      }
    ],
    "connections": [
      { "from": "logSource", "to": "parser" },
      { "from": "parser", "to": "windower" },
      { "from": "windower", "to": "urlAggregates" },
      { "from": "urlAggregates", "to": "flattener" },
      { "from": "flattener", "to": "urlTimeseries" }
    ]
  }
}