Table of Contents |
---|
Current problems
API
- Our current Spark API is designed for batch processing only
- Always run with long transaction
- Not able to access to dataset from closure (partly due to long transaction, partly due to dataset buffering design)
- Definitely don't want long transaction when running spark streaming
- Not compatible with PySpark and is difficult to adapt to it
- Mainly due to the SparkContext is created in CDAP instead of from user program
- Not following common Spark idioms
- Spark program do a new SparkContext() and pass that context to other high level contexts, e.g. StreamingContext, HBaseContext
- Not integrated with dataset schema
- Burden on developer to convert dataset/stream RDD into DataFrame
...
In Spark Streaming, actions performed on each RDD provided by DStream
is actually submitted and executed as a regular Spark job, the transaction model described would still applies on each individual micro-batch of RDD.
Runtime
Spark Classes Isolation
We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:
...
To support explicit transaction, CDAP will start a new transaction when Transactional.execute
is invoked and set the transaction to the SparkContext.properties
, which is a thread local properties map. The properties map will be available to the job event in the SparkListener
callback methods. The Http service and the SparkListener
as mentioned in the Implicit Transaction section can be modified so that it will respond with the transaction in the job properties if there is one instead of starting a new one.
Dataset access from Closure
With the transaction support mentioned about, we can have the SparkExecutionContext
returns a DatasetContext
that is serializable so that it can be used inside closure function. The only challenge left is when to flush the dataset (mainly Table dataset). We can use a similar approach as the MapReduce case, which we do periodic flush. Also, we need to add a TaskCompletionListener
so that we can do a final flush when the task execution completed. The listener can be added when DatasetContext.getDataset
is called from the closure function.