Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

User should be able to run a spark ML job on the output of an transform stage in ETL batch scenario.


API Design:

Below is the SparkMLLib SparkSink class that users can implement to define their Spark MLLib SparkSink plugin.
Because it extends BatchConfigurable, users will be able to override:

  • configurePipeline - in order to add datasets and streams needed
  • prepareRun - configure the job before starting the run
  • onRunFinish - perform any end of run logic

The SparkMLLib SparkSink also exposes a run method, which gives access to an RDD representing data from previous stages of the pipeline.

  • run - perform the core of the computation. User will be responsible for persisting the output as he wishes (save the model to the path of a FileSet, for instance).

Code Block
languagejava
titleSparkMLLibSparkSink
/**
 * SparkMLLibSparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it
 * can also perform RDD operations on the key value pairs provided by the Batch run.
 *
 * {@link SparkMLLib#runSparkSink#run} method is called inside the Batch Run while {@link SparkMLLib#prepareRunSparkSink#prepareRun} and
 * {@link SparkMLLib#onRunFinishSparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the
 * Batch run starts and after it finishes respectively.
 *
 * @param <IN> The type of input record to the SparkMLLibSparkSink.
 */
@Beta
public abstract class SparkMLLib<IN>SparkSink<IN> extends BatchConfigurable<SparkMLLibContext>BatchConfigurable<SparkPluginContext> implements Serializable {

  public static final String PLUGIN_TYPE = "sparkmllibsparksink";

  private static final long serialVersionUID = -8600555200583639593L;

  /**
   * User Spark job which will be executed
   *
   * @param context {@link SparkMLLibContextSparkPluginContext} for this job
   * @param input the input from previous stages of the Batch run.
   */
  public abstract void run(SparkMLLibContextSparkPluginContext context, JavaRDD<IN> input) throws Exception;

}


Users will have access to a SparkMLLibContext SparkPluginContext which exposes functionality that would be available to a regular Spark program via SparkContext, except it excludes the following methods.:

  • getMetrics
  • getWorkflowToken
  • getTaskLocalizationContext
  • getSpecification
  • getServiceDiscoverer
  • setExecutorResources

 

Code Block
languagejava
titleSparkMLLibContext
/**
 * Context passed to spark SparkMLLibplugins.
 */
public interface SparkMLLibContextSparkPluginContext extends BatchContext {

  /**
   * Returns the logical start time of this Spark job. Logical start time is the time when this Spark
   * job is supposed to start if this job is started by the scheduler. Otherwise it would be the current time when the
   * job runs.
   *
   * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
   */
  long getLogicalStartTime();

  /**
   * Create a Spark RDD that uses {@link Dataset} as input source
   *
   * @param datasetName the name of the {@link Dataset} to be read as an RDD
   * @param kClass      the key class
   * @param vClass      the value class
   * @param <T>         type of RDD
   * @return the RDD created from Dataset
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass);

  /**
   * Create a Spark RDD that uses {@link Dataset} instantiated using the provided arguments as an input source.
   *
   * @param datasetName the name of the {@link Dataset} to be read as an RDD
   * @param kClass      the key class
   * @param vClass      the value class
   * @param datasetArgs arguments for the dataset
   * @param <T>         type of RDD
   * @return the RDD created from Dataset
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);

  /**
   * Writes a Spark RDD to {@link Dataset}
   *
   * @param rdd         the rdd to be stored
   * @param datasetName the name of the {@link Dataset} where the RDD should be stored
   * @param kClass      the key class
   * @param vClass      the value class
   * @param <T>         type of RDD
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass);

  /**
   * Writes a Spark RDD to {@link Dataset} instantiated using the provided arguments.
   *
   * @param rdd         the rdd to be stored
   * @param datasetName the name of the {@link Dataset} where the RDD should be stored
   * @param kClass      the key class
   * @param vClass      the value class
   * @param datasetArgs arguments for the dataset
   * @param <T>         type of RDD
   * @throws UnsupportedOperationException if the SparkContext is not yet initialized
   */
  <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs);

  /**
   * Create a Spark RDD that uses complete {@link Stream} as input source
   *
   * @param streamName the name of the {@link Stream} to be read as an RDD
   * @param vClass     the value class
   * @param <T>        type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source
   *
   * @param streamName the name of the {@link Stream} to be read as an RDD
   * @param vClass     the value class
   * @param startTime  the starting time of the stream to be read in milliseconds. To read from the starting of the
   *                   stream set this to 0
   * @param endTime    the ending time of the streams to be read in milliseconds. To read up to the end of the stream
   *                   set this to Long.MAX_VALUE
   * @param <T>        type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamEventDecoder}
   *
   * @param streamName  the name of the {@link Stream} to be read as an RDD
   * @param vClass      the value class
   * @param startTime   the starting time of the stream to be read in milliseconds. To read from the starting of the
   *                    stream set this to 0
   * @param endTime     the ending time of the streams to be read in milliseconds. To read up to the end of the stream
   *                    set this to Long.MAX_VALUE
   * @param decoderType the decoder to use while reading streams
   * @param <T>         type of RDD
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime,
                       Class<? extends StreamEventDecoder> decoderType);

  /**
   * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamBatchReadable}.
   *
   * @param stream a {@link StreamBatchReadable} containing information on the stream to read from
   * @param vClass the value class
   * @return the RDD created from {@link Stream}
   */
  <T> T readFromStream(StreamBatchReadable stream, Class<?> vClass);

  /**
   * Returns
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaSparkContext">
   * JavaSparkContext</a> or
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext">SparkContext</a>
   * depending on user's job type.
   *
   * @param <T> the type of Spark Context
   * @return the Spark Context
   */
  <T> T getOriginalSparkContext();

  /**
   * Returns a {@link Serializable} {@link PluginContext} which can be used to request for plugins instances. The
   * instance returned can also be used in Spark program's closures.
   *
   * @return A {@link Serializable} {@link PluginContext}.
   */
  PluginContext getPluginContext();

  /**
   * Sets a
   * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a>
   * to be used for the Spark execution. Only configurations set inside the
   * {@link SparkMLLib#prepareRunSparkSink#prepareRun} call will affect the Spark execution.
   *
   * @param <T> the SparkConf type
   */
  <T> void setSparkConf(T sparkConf);

}

 


Implementation Summary:

  1. SmartWorkflow will check if there are any plugins of type SparkMLLibSparkSink. If there are, it will choose to execute a ETLSpark instead of an ETLMapReduce for that phase.
  2. Because SparkMLLib extends BatchConfigurableSparkSink extends BatchConfigurable, its prepareRun and onFinish will automatically be handled by ETLSpark, though we will need to special case for the type of context passed in.
  3. There will be special casing for the SparkMLLib plugin SparkSink plugin type in ETLSparkProgram to transform the RDD outputted by the TransformExecutor and call the user plugin's run function.