Spark Sink

This is a sub-task (single user story) of Cask Hydrator++.

JIRA: 

Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.

 

User Story:

User should be able to run a spark job on the output of an transform stage in ETL batch scenario.


API Design:

Below is the SparkSink class that users can implement to define their SparkSink plugin.
Because it extends BatchConfigurable, users will be able to override:

  • configurePipeline - in order to add datasets and streams needed
  • prepareRun - configure the job before starting the run
  • onRunFinish - perform any end of run logic

The SparkSink also exposes a run method, which gives access to an RDD representing data from previous stages of the pipeline.

  • run - perform the core of the computation. User will be responsible for persisting the output as he wishes (save the model to the path of a FileSet, for instance).

SparkSink
/**
 * SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it
 * can also perform RDD operations on the key value pairs provided by the Batch run.
 *
 * {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and
 * {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the
 * Batch run starts and after it finishes respectively.
 *
 * @param <IN> The type of input record to the SparkSink.
 */
@Beta
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable {

  public static final String PLUGIN_TYPE = "sparksink";

  private static final long serialVersionUID = -8600555200583639593L;

  /**
   * User Spark job which will be executed
   *
   * @param context {@link SparkPluginContext} for this job
   * @param input the input from previous stages of the Batch run.
   */
  public abstract void run(SparkPluginContext context, JavaRDD<IN> input) throws Exception;

}


Users will have access to a SparkPluginContext which exposes functionality that would be available to a regular Spark program via SparkContext, except it excludes the following methods.:

  • getMetrics
  • getWorkflowToken
  • getTaskLocalizationContext
  • getSpecification
  • getServiceDiscoverer
  • setExecutorResources

 

SparkPluginContext
/**
 * Context passed to spark plugins.
 */
public interface SparkPluginContext extends BatchContext {

  /**
   * Returns the logical start time of this Spark job. Logical start time is the time when this Spark
   * job is supposed to start if this job is started by the scheduler. Otherwise it would be the current time when the
   * job runs.
   *
   * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
   */
  long getLogicalStartTime();

  /**
   * Create a Spark RDD that uses {@link Dataset} as input source
   *
   * @param datasetName the name of the {@link Dataset} to be read as an RDD
   * @param kClass      the key class
   * @param vClass      the value class
   * @param <T>         type of RDD
   * @return the RDD created from Dataset
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass);

  /**
   * Create a Spark RDD that uses {@link Dataset} instantiated using the provided arguments as an input source.
   *
   * @param datasetName the name of the {@link Dataset} to be read as an RDD
   * @param kClass      the key class
   * @param vClass      the value class
   * @param datasetArgs arguments for the dataset
   * @param <T>         type of RDD
   * @return the RDD created from Dataset
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);

  /**
   * Writes a Spark RDD to {@link Dataset}
   *
   * @param rdd         the rdd to be stored
   * @param datasetName the name of the {@link Dataset} where the RDD should be stored
   * @param kClass      the key class
   * @param vClass      the value class
   * @param <T>         type of RDD
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass);

  /**
   * Writes a Spark RDD to {@link Dataset} instantiated using the provided arguments.
   *
   * @param rdd         the rdd to be stored
   * @param datasetName the name of the {@link Dataset} where the RDD should be stored
   * @param kClass      the key class
   * @param vClass      the value class
   * @param datasetArgs arguments for the dataset
   * @param <T>         type of RDD
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);

  /**
   * Create a Spark RDD that uses complete {@link Stream} as input source
   *
   * @param streamName the name of the {@link Stream} to be read as an RDD
   * @param vClass     the value class
   * @param <T>        type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source
   *
   * @param streamName the name of the {@link Stream} to be read as an RDD
   * @param vClass     the value class
   * @param startTime  the starting time of the stream to be read in milliseconds. To read from the starting of the
   *                   stream set this to 0
   * @param endTime    the ending time of the streams to be read in milliseconds. To read up to the end of the stream
   *                   set this to Long.MAX_VALUE
   * @param <T>        type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamEventDecoder}
   *
   * @param streamName  the name of the {@link Stream} to be read as an RDD
   * @param vClass      the value class
   * @param startTime   the starting time of the stream to be read in milliseconds. To read from the starting of the
   *                    stream set this to 0
   * @param endTime     the ending time of the streams to be read in milliseconds. To read up to the end of the stream
   *                    set this to Long.MAX_VALUE
   * @param decoderType the decoder to use while reading streams
   * @param <T>         type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime,
                       Class<? extends StreamEventDecoder> decoderType);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamBatchReadable}.
   *
   * @param stream a {@link StreamBatchReadable} containing information on the stream to read from
   * @param vClass the value class
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(StreamBatchReadable stream, Class<?> vClass);

  /**
   * Returns
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaSparkContext">
   * JavaSparkContext</a> or
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext">SparkContext</a>
   * depending on user's job type.
   *
   * @param <T> the type of Spark Context
   * @return the Spark Context
   */
  <T> T getOriginalSparkContext();

  /**
   * Returns a {@link Serializable} {@link PluginContext} which can be used to request for plugins instances. The
   * instance returned can also be used in Spark program's closures.
   *
   * @return A {@link Serializable} {@link PluginContext}.
   */
  PluginContext getPluginContext();

  /**
   * Sets a
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a>
   * to be used for the Spark execution. Only configurations set inside the
   * {@link SparkSink#prepareRun} call will affect the Spark execution.
   *
   * @param <T> the SparkConf type
   */
  <T> void setSparkConf(T sparkConf);

}

 


Implementation Summary:

  1. SmartWorkflow will check if there are any plugins of type SparkSink. If there are, it will choose to execute a ETLSpark instead of an ETLMapReduce for that phase.
  2. Because SparkSink extends BatchConfigurable, its prepareRun and onFinish will automatically be handled by ETLSpark, though we will need to special case for the type of context passed in.
  3. There will be special casing for the SparkSink plugin type in ETLSparkProgram to transform the RDD outputted by the TransformExecutor and call the user plugin's run function.Â