This is a sub-task (single user story) of Cask Hydrator++.
JIRA:
Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.
User Story:
User should be able to run a spark job on the output of an transform stage in ETL batch scenario.
API Design:
Below is the SparkSink class that users can implement to define their SparkSink plugin.
Because it extends BatchConfigurable, users will be able to override:
- configurePipeline - in order to add datasets and streams needed
- prepareRun - configure the job before starting the run
- onRunFinish - perform any end of run logic
The SparkSink also exposes a run method, which gives access to an RDD representing data from previous stages of the pipeline.
- run - perform the core of the computation. User will be responsible for persisting the output as he wishes (save the model to the path of a FileSet, for instance).
SparkSink
/** * SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it * can also perform RDD operations on the key value pairs provided by the Batch run. * * {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and * {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the * Batch run starts and after it finishes respectively. * * @param <IN> The type of input record to the SparkSink. */ @Beta public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable { public static final String PLUGIN_TYPE = "sparksink"; private static final long serialVersionUID = -8600555200583639593L; /** * User Spark job which will be executed * * @param context {@link SparkPluginContext} for this job * @param input the input from previous stages of the Batch run. */ public abstract void run(SparkPluginContext context, JavaRDD<IN> input) throws Exception; }
Users will have access to a SparkPluginContext which exposes functionality that would be available to a regular Spark program via SparkContext, except it excludes the following methods.:
- getMetrics
- getWorkflowToken
- getTaskLocalizationContext
- getSpecification
- getServiceDiscoverer
- setExecutorResources
SparkPluginContext
/** * Context passed to spark plugins. */ public interface SparkPluginContext extends BatchContext { /** * Returns the logical start time of this Spark job. Logical start time is the time when this Spark * job is supposed to start if this job is started by the scheduler. Otherwise it would be the current time when the * job runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * Create a Spark RDD that uses {@link Dataset} as input source * * @param datasetName the name of the {@link Dataset} to be read as an RDD * @param kClass the key class * @param vClass the value class * @param <T> type of RDD * @return the RDD created from Dataset * @throws UnsupportedOperationException if the SparkContext is not yet initialized */ <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass); /** * Create a Spark RDD that uses {@link Dataset} instantiated using the provided arguments as an input source. * * @param datasetName the name of the {@link Dataset} to be read as an RDD * @param kClass the key class * @param vClass the value class * @param datasetArgs arguments for the dataset * @param <T> type of RDD * @return the RDD created from Dataset * @throws UnsupportedOperationException if the SparkContext is not yet initialized */ <T> T readFromDataset(String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs); /** * Writes a Spark RDD to {@link Dataset} * * @param rdd the rdd to be stored * @param datasetName the name of the {@link Dataset} where the RDD should be stored * @param kClass the key class * @param vClass the value class * @param <T> type of RDD * @throws UnsupportedOperationException if the SparkContext is not yet initialized */ <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass); /** * Writes a Spark RDD to {@link Dataset} instantiated using the provided arguments. * * @param rdd the rdd to be stored * @param datasetName the name of the {@link Dataset} where the RDD should be stored * @param kClass the key class * @param vClass the value class * @param datasetArgs arguments for the dataset * @param <T> type of RDD * @throws UnsupportedOperationException if the SparkContext is not yet initialized */ <T> void writeToDataset(T rdd, String datasetName, Class<?> kClass, Class<?> vClass, Map<String, String> datasetArgs); /** * Create a Spark RDD that uses complete {@link Stream} as input source * * @param streamName the name of the {@link Stream} to be read as an RDD * @param vClass the value class * @param <T> type of RDD * @return the RDD created from {@link Stream} */ <T> T readFromStream(String streamName, Class<?> vClass); /** * Create a Spark RDD that uses {@link Stream} as input source * * @param streamName the name of the {@link Stream} to be read as an RDD * @param vClass the value class * @param startTime the starting time of the stream to be read in milliseconds. To read from the starting of the * stream set this to 0 * @param endTime the ending time of the streams to be read in milliseconds. To read up to the end of the stream * set this to Long.MAX_VALUE * @param <T> type of RDD * @return the RDD created from {@link Stream} */ <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime); /** * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamEventDecoder} * * @param streamName the name of the {@link Stream} to be read as an RDD * @param vClass the value class * @param startTime the starting time of the stream to be read in milliseconds. To read from the starting of the * stream set this to 0 * @param endTime the ending time of the streams to be read in milliseconds. To read up to the end of the stream * set this to Long.MAX_VALUE * @param decoderType the decoder to use while reading streams * @param <T> type of RDD * @return the RDD created from {@link Stream} */ <T> T readFromStream(String streamName, Class<?> vClass, long startTime, long endTime, Class<? extends StreamEventDecoder> decoderType); /** * Create a Spark RDD that uses {@link Stream} as input source according to the given {@link StreamBatchReadable}. * * @param stream a {@link StreamBatchReadable} containing information on the stream to read from * @param vClass the value class * @return the RDD created from {@link Stream} */ <T> T readFromStream(StreamBatchReadable stream, Class<?> vClass); /** * Returns * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.api.java.JavaSparkContext"> * JavaSparkContext</a> or * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext">SparkContext</a> * depending on user's job type. * * @param <T> the type of Spark Context * @return the Spark Context */ <T> T getOriginalSparkContext(); /** * Returns a {@link Serializable} {@link PluginContext} which can be used to request for plugins instances. The * instance returned can also be used in Spark program's closures. * * @return A {@link Serializable} {@link PluginContext}. */ PluginContext getPluginContext(); /** * Sets a * <a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a> * to be used for the Spark execution. Only configurations set inside the * {@link SparkSink#prepareRun} call will affect the Spark execution. * * @param <T> the SparkConf type */ <T> void setSparkConf(T sparkConf); }
Implementation Summary:
- SmartWorkflow will check if there are any plugins of type SparkSink. If there are, it will choose to execute a ETLSpark instead of an ETLMapReduce for that phase.
- Because SparkSink extends BatchConfigurable, its prepareRun and onFinish will automatically be handled by ETLSpark, though we will need to special case for the type of context passed in.
- There will be special casing for the SparkSink plugin type in ETLSparkProgram to transform the RDD outputted by the TransformExecutor and call the user plugin's run function.