Table of Contents |
---|
Current problems
API
- Our current Spark API is designed for batch processing only
- Always run with long transaction
- Not able to access to dataset from closure (partly due to long transaction, partly due to dataset buffering design)
- Definitely don't want long transaction when running spark streaming
- Not compatible with PySpark and is difficult to adapt to it
- Mainly due to the SparkContext is created in CDAP instead of from user program
- Not following common Spark idioms
- Spark program do a new SparkContext() and pass that context to other high level contexts, e.g. StreamingContext, HBaseContext
- Not integrated with dataset schema
- Burden on developer to convert dataset/stream RDD into DataFrame
- PySpark (Python Spark) is not supported
Runtime
- Cannot embed Spark program in Service
- Cannot use Spark as a service, which can leverage the RDD caching ability for adhoc query.
- Not able to run concurrent Spark program (CDAP-349) in SDK
- Not able to run fork Spark program in workflow (CDAP-3008)
...
Code Block | ||||
---|---|---|---|---|
| ||||
class SimpleSpark extends SparkProgram { override def run(implicit sec: SparkExecutionContext) { sec.execute(new TxRunnable { override def run(dsContext: DatasetContext) { val rdd[(String, Int)] = sc.fromDataset(...) .map(...) // First action rdd.saveToDataset(...); // Second action rdd.collectAsList(); // Some action on dataset directly val table: Table = dsContext.getDataset("table"); table.put(...); } }); } } |
Transaction in Spark Streaming
In Spark Streaming, actions performed on each RDD provided by DStream
is actually submitted and executed as a regular Spark job, the transaction model described would still applies on each individual micro-batch of RDD.
Runtime
Spark Classes Isolation
We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:
- User can easily switch to use a different version of Spark than the one ship with Cask (in SDK).
- Provide a proper fix for CDAP-4547 so that we don't pollute the container classpath
- Allow us to rewrite the
SparkContext
easily (see below) - Be able to support concurrent run of Spark programs (in SDK).
SparkContext Class Rewrite
As described above, we need to insert code to SparkContext
constructor to capture the SparkContext
instance created from the user code. It can be done by using a custom classloader, which we needed anyway for the class isolation purpose. What we need is to modify the SparkContext
constructor bytecode so that before the constructor returns (intercept through ASM AdviceAdapter
), we store the SparkContext
reference in a global concurrent map (runId -> context), so that we can always get the SparkContext
based on the runId.
Implicit Transaction
To support implicit transaction, we need to start a new long transaction when a job start and commit/abort the transaction when the job end. Since Spark allows submitting multiple jobs (i.e. perform action on RDD) concurrently from multiple threads, the start/stop transaction needs to be tied to the job ID.
We can use the SparkListener
to get callback when a job start/end. However, there are couple complications
- Calls to
SparkListener
is asynchronous, meaning the task may already be submitted or even running before the callback gets called. - The transaction need to be started/committed from the driver process, but there is no easy way to ship the
Transaction
object to the executor node. - Inside the executor node (hence the task), the job ID is not available.
In order to overcome those complications, we need to rely on the fact that stage ID is globally unique within a spark program (see http://spark.apache.org/docs/latest/monitoring.html#rest-api) and have the driver setup a HTTP service for the executor to acquire transaction information based on the stage ID. This is the sequence of events when a new job is submitted by Spark.
- When
SparkListener.onJobStart
is called, we add all stage IDs under that job (accessible throughSparkListenerJobStart.stageInfos
) to a global map. - Inside the executor, whenever
DatasetContext.getDataset
is called, it gets the stage ID from theTaskContext
from Spark and make a call to the driver with the stageID to get theTransaction
information. The transaction will be used to setup the transaction of the dataset. - In the driver HTTP service, when it received a call from the executor, it will:
- If there is already a
Transaction
for the given stage, it will just respond - If the stage is known (based on the map set by step 1) but without transaction, it will start a new transaction, associate the transaction with the job ID and respond.
- If the stage if unknown, it can be
- The listener in step 1 hasn't be triggered yet. In this case, it will block until the listener is triggered. After unblock, it will rerun the logic as described in a and b.
- The listener in step 1 has already been triggered. This shouldn't happen and is an error. It will respond with an error.
- If there is already a
- When
SparkListener.onJobEnd
is called, if there was transaction started for the job, based on the job completion status, it will either commit or abort the transaction associated with the job.
Explicit Transaction
To support explicit transaction, CDAP will start a new transaction when Transactional.execute
is invoked and set the transaction to the SparkContext.properties
, which is a thread local properties map. The properties map will be available to the job event in the SparkListener
callback methods. The Http service and the SparkListener
as mentioned in the Implicit Transaction section can be modified so that it will respond with the transaction in the job properties if there is one instead of starting a new oneWith the explicit transaction support, it is easy to construct a Spark Streaming program to consume from Kafka with exactly once semantics.
Code Block | ||||
---|---|---|---|---|
| ||||
class SimpleSpark extends SparkProgram {
override def run(implicit sec: SparkExecutionContext) {
// Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
sec.execute(new TxRunnable {
override def run(dsContext: DatasetContext) {
// Operate on the RDD.
rdd.map(...).saveAsDataset(...);
// This happen in the driver
for (o <- offsetRanges) {
context.getDataset("kafkaOffsets").save(offsetRanges)
}
}
})
}
} |
Runtime
Spark Classes Isolation
We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:
- User can easily switch to use a different version of Spark than the one ship with Cask (in SDK).
- Provide a proper fix for CDAP-4547 so that we don't pollute the container classpath
- Allow us to rewrite the
SparkContext
easily (see below) - Be able to support concurrent run of Spark programs (in SDK).
SparkContext Class Rewrite
As described above, we need to insert code to SparkContext
constructor to capture the SparkContext
instance created from the user code. It can be done by using a custom classloader, which we needed anyway for the class isolation purpose. What we need is to modify the SparkContext
constructor bytecode so that before the constructor returns (intercept through ASM AdviceAdapter
), we store the SparkContext
reference in a global concurrent map (runId -> context), so that we can always get the SparkContext
based on the runId.
Implicit Transaction
To support implicit transaction, we need to start a new long transaction when a job start and commit/abort the transaction when the job end. Since Spark allows submitting multiple jobs (i.e. perform action on RDD) concurrently from multiple threads, the start/stop transaction needs to be tied to the job ID.
We can use the SparkListener
to get callback when a job start/end. However, there are couple complications
- Calls to
SparkListener
is asynchronous, meaning the task may already be submitted or even running before the callback gets called. - The transaction need to be started/committed from the driver process, but there is no easy way to ship the
Transaction
object to the executor node. - Inside the executor node (hence the task), the job ID is not available.
In order to overcome those complications, we need to rely on the fact that stage ID is globally unique within a spark program (see http://spark.apache.org/docs/latest/monitoring.html#rest-api) and have the driver setup a HTTP service for the executor to acquire transaction information based on the stage ID. This is the sequence of events when a new job is submitted by Spark.
- When
SparkListener.onJobStart
is called, we add all stage IDs under that job (accessible throughSparkListenerJobStart.stageInfos
) to a global map. - Inside the executor, whenever
DatasetContext.getDataset
is called, it gets the stage ID from theTaskContext
from Spark and make a call to the driver with the stageID to get theTransaction
information. The transaction will be used to setup the transaction of the dataset. - In the driver HTTP service, when it received a call from the executor, it will:
- If there is already a
Transaction
for the given stage, it will just respond - If the stage is known (based on the map set by step 1) but without transaction, it will start a new transaction, associate the transaction with the job ID and respond.
- If the stage if unknown, it can be
- The listener in step 1 hasn't be triggered yet. In this case, it will block until the listener is triggered. After unblock, it will rerun the logic as described in a and b.
- The listener in step 1 has already been triggered. This shouldn't happen and is an error. It will respond with an error.
- If there is already a
- When
SparkListener.onJobEnd
is called, if there was transaction started for the job, based on the job completion status, it will either commit or abort the transaction associated with the job.
Explicit Transaction
To support explicit transaction, CDAP will start a new transaction when Transactional.execute
is invoked and set the transaction to the SparkContext.properties
, which is a thread local properties map. The properties map will be available to the job event in the SparkListener
callback methods. The Http service and the SparkListener
as mentioned in the Implicit Transaction section can be modified so that it will respond with the transaction in the job properties if there is one instead of starting a new one.
Dataset access from Closure
With the transaction support mentioned about, we can have the SparkExecutionContext
returns a DatasetContext
that is serializable so that it can be used inside closure function. The only challenge left is when to flush the dataset (mainly Table dataset). We need to modify the Table
implementation hierarchy to have the effect of BufferingTable
optional. The effect of turning off buffering will impact direct writer through dataset (not the one that done by saving of RDD to dataset), however, it should be acceptable, because it's never a good idea to write to dataset from a Spark function, as function as expected to have no side effect and most of those writes can be done by saving RDD to dataset.