Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. ETL - An development team has some realtime Hydrator pipelines that use a CDAP Worker. They want to run their ETL pipelines using Spark Streaming because their company is standardizing on Spark.

  2. Data enrichment - Every time a purchase is made on an online store, an event with purchase information is pushed to Kafka. The event contains a timestamp, purchase id, customer id, item id, and price. A pipeline developer wants to create a realtime pipeline that reads events from Kafka and joins customer information (email, age, gender, etc) to each event, then writes the events to a CDAP Table.

  3. Window and Reduce - Customer locations are being fed into Kafka. Each event contains the customer id and their location. A pipeline developer wants to create a pipeline that reads these events and it into records containing the customer id, speed of the customer, distance of the customer from their home, a flag indicating whether or not the customer just left their home, and demographic information about the customer (age, gender, etc).
  4. Machine Learning - An email client is set up to push an event to a Kafka topic whenever somebody uses the client to send an email. The client is also set up to push an event to another topic whenever an email is marked as spam. A pipeline developer wants to create a realtime pipeline that reads events from spam topic and trains a spam classification model in realtime using Streaming linear regression (http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression). The pipeline developer also wants to create another pipeline that reads from the email topic and adds a 'isSpam' field to each record based on the model trained by the other pipeline.
  5. Window and Reduce - Customer locations are being fed into Kafka. Each event contains the customer id and their location. A pipeline developer wants to create a pipeline that reads these events and it into records containing the customer id, speed of the customer, distance of the customer from their home, a flag indicating whether or not the customer just left their home, and demographic information about the customer (age, gender, etc).

User Stories

  1. As a pipeline developer, I want to create realtime ETL pipelines that run using Spark Streaming.

  2. As a pipeline developer, I want to enrich streaming events by joining to other datasets.

  3. As a pipeline developer, I want to be able to group events into time windows in my streaming pipeline.

  4. As a pipeline developer, I want to train machine learning models in my streaming pipeline.
  5. As a plugin developer, I want my transform, aggregate, and join plugins to work in both Spark Streaming and Data Pipelines.

  6. As a plugin developer, I want to be able to use features available in Spark Streaming like MLLib to write plugins.

...

The streamingsink plugin type simply takes a JavaDStream and a context and performs some operation on it:

Code Block
public interface StreamingSink<T> {
  void write(JavaDStream<T> stream, JavaSparkExecutionContext jsec);
}

See http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams for information on how to write context of a Spark DStream.

Note: Looks like it will be difficult to write to CDAP datasets. May have to add saveToDataset(String datasetName, JavaPairDStream stream) methods to the JavaSparkExecutionContext.

Note: Investigate support for realtimesink plugin type using the foreachRDD() method.For example, writing to a Table would look something like:

Code Block
@Override
public void write(JavaDStream<StructuredRecord> stream, final JavaSparkExecutionContext jsec) throws Exception {
  final String tableName = conf.table;
  stream.mapToPair(new PairFunction<StructuredRecord, byte[], Put>() {
    @Override
    public Tuple2<byte[], Put> call(StructuredRecord record) throws Exception {
      Put put = recordPutTransformer.toPut(record);
      return new Tuple2<>(put.getRow(), put);
    }
  }).foreachRDD(new VoidFunction<JavaPairRDD<byte[], Put>>() {
    @Override
    public void call(JavaPairRDD<byte[], Put> putJavaPairRDD) throws Exception {
      jsec.saveAsDataset(putJavaPairRDD, tableName);
    }
  });
}

 

Multiple connections into a stage become a union of all DStreams coming into that stage.

...