Spark Revamp

Current problems

API

  • Our current Spark API is designed for batch processing only
    • Always run with long transaction
    • Not able to access to dataset from closure (partly due to long transaction, partly due to dataset buffering design)
    • Definitely don't want long transaction when running spark streaming
  • Not compatible with PySpark and is difficult to adapt to it
    • Mainly due to the SparkContext is created in CDAP instead of from user program
  • Not following common Spark idioms
    • Spark program do a new SparkContext() and pass that context to other high level contexts, e.g. StreamingContext, HBaseContext
  • Not integrated with dataset schema
    • Burden on developer to convert dataset/stream RDD into DataFrame

Runtime

  • Cannot embed Spark program in Service
    • Cannot use Spark as a service, which can leverage the RDD caching ability for adhoc query.
  • Not able to run concurrent Spark program (CDAP-349) in SDK
  • Not able to run fork Spark program in workflow (CDAP-3008)

Proposal

API

Core API

  • Create a new cdap-api-spark module in CDAP so that it have Spark library dependency in there

    • For spark program, they have to depend on that module
  • Provide a native Scala API and leverage the implicit support from Scala to provide better experience

    class SimpleSpark extends SparkProgram {
      // Rename the context from the CDAP as SparkExecutionContext to avoid confusion
      // With the implicit support mentioned below, user rarely need to interact with this class.
      override def run(implicit sec: SparkExecutionContext) {
    
        // User is responsible for SparkContext creation, just like normal Spark program
        val sc = new SparkContext()    
     
        // Extra methods are added on SparkContext through implicit class
        // We can inject all extra methods provided by CDAP, such as create RDD from dataset/stream,
        // getMetrics, discoverService, ... etc
        val rdd: RDD[(String, Int)] = sc.fromDataset("SourceDS")   
        rdd
          .map(t => (t._2, t._1))
           // Extra methods are added to RDD class for saving RDD as dataset
          .saveAsDataset("NewDS")
      }
    }
  • For the Java API, due to the limitation in Java, user still need to perform CDAP specific actions through the CDAP context.

    class SimpleSpark extends JavaSparkProgram {
      @Override
      public void run(JavaSparkExecutionContext sec) {
        JavaSparkContext sc = new JavaSparkContext();
        JavaPairRDD<String, Integer> rdd = sec.fromDataset(sc, "SourceDS");
        
        sec.saveAsDataset(sc, rdd, "NewDS");
      }
    }

    One big challenge is if we allow user to create the SparkContext or JavaSparkContext, CDAP won't have access to the SparkContext instance easily, hence not able to stop the Spark program unless the user stop it (by returning from the run method or calling SparkContext.stop() explicitly). Also for the Java case, the user need to pass in the JavaSparkContext for all spark related calls to the CDAP context (in Scala, we workaround it by using implicit in Scala). There are three possible ways to deal with this issue:

    1. Instead of having the user new the spark context, we create it from the CDAP side and make it accessible through the SparkExecutionContext. This is the approach taken by the current CDAP (v 3.3.0). In Scala, it looks like this:

      class SimpleSpark extends SparkProgram {
        // Rename the context from the CDAP as SparkExecutionContext to avoid confusion
        // With the implicit support mentioned below, user rarely need to interact with this class.
        override def run(implicit sec: SparkExecutionContext) {
          val sc = sec.getSparkContext()
       
          // Use SparkContext like normal
        }
      }

      The major downside of this approach is the user has no control on the creation of SparkContext, he will not be able to customize the context, such as setting preferable location for the executor containers to run on.

    2. The other way, which is a harder way, is to rewrite the SparkContext class constructors through ClassLoader so that CDAP "captures" the SparkContext created by the user. By doing so, CDAP can use the SparkContext (both for stopping and for Java API) internally. The up side for this approach is that the user doesn't need to change his code much. In fact, this allows CDAP runs Spark program without any modification by just invoking the main() method if the Spark program doesn't use any CDAP functionalities (dataset, metrics, discovery).
    3. Have the SparkProgram abstract class contains a constructor that requires user pass in the SparkContext. The main drawback is the user needs to move the SparkContext creation to the default constructor, which involves slightly more code.

      // Provided by CDAP, all Spark program must extends from this class.
      abstract class SparkProgram(private val sc: SparkContext) {
        def run(context: SparkExecutionContext)
       
        final def getSparkContext = sc
      }
       
      // Private constructor that call super (this is how it is done in Scala)
      class SimpleSpark private (val sc: SparkContext) extends SparkProgram(sc) {
       
        // Default constructor. Called from CDAP. SparkContext should be constructed here
        def this() {
          this(new SparkContext())
        }
      
        override def run(implicit sec: SparkContext) {
          // Use the SparkContext
          sc.fromDataset(...)
        }
      }
    Option b, even though it's the hardest to implementation, it is the best from the developer point of view. In fact, in order to resolve CDAP-349, the runtime need to use a different ClassLoader for different invocation of Spark Job, hence adding a small routine for bytecode rewrite shouldn't be a big issue.

Spark Service

Given the ability of caching dataset in Spark, one can have a Spark program first build up RDDs caches and then expose a network service (e.g. HTTP service) to allow querying those RDDs interactively. This gives user a way to easily build interactive query service over a large dataset with relatively low latency.

Here is the proposed CDAP API for service integration in Spark program

  1. Introduce a new interface, SparkHttpServiceContext, which provides access to the SparkContext instance created in the Spark program

    public interface SparkHttpServiceContext extends HttpServiceContext {
      SparkContext getSparkContext();
    }
  2. User can add multiple HttpServiceHandler instances to the spark program in the Spark.configure method through the SparkConfigurer

  3. CDAP will call HttpServiceHandler.initialize method with a SparkHttpServiceContext instance.
    1. CDAP will provide an abstract class, AbstractSparkHttpServiceHandler, to deal with the casting in the initialize method.

      public abstract class AbstractSparkHttpServiceHandler extends AbstractHttpServiceHandler {
      
        private SparkContext sparkContext;
      
        @Override
        public void initialize(HttpServiceContext context) throws Exception {
          super.initialize(context);
      
          // Shouldn't happen. The CDAP framework guarantees it.
          if (!(context instanceof SparkHttpServiceContext)) {
            throw new IllegalArgumentException("The context type should be SparkHttpServiceContext");
          }
          this.sparkContext = ((SparkHttpServiceContext) context).getSparkContext();
        }
      
        protected final SparkContext getSparkContext() {
          return sparkContext;
        }
      }
  4. Because CDAP needs to provide the SparkContext to the http handler, the Http Service and the initialization of HttpServiceHandler will only happen after the user Spark program instantiated the SparkContext (see option b. above).

With the CDAP Spark Service support, for example, someone can build a service handler that can execute any Spark SQL against the SparkContext.

class SimpleSparkHandler extends AbstractSparkHttpServiceHandler {

  @Path("/query")
  @GET
  def query(request: HttpServiceRequest, responder: HttpServiceResponder) {
    val sqlContext = SQLContext.getOrCreate(getSparkContext)
    val df = sqlContext.sql(Charsets.UTF_8.decode(request.getContent).toString);
    
    val builder = new StringBuilder
    df.collect().foreach(row => {
      builder.append(...)
    })
    responder.sendString(builder.toString)
  }
}

 

API for Dataframe/SparkSQL

TBD

Transaction

Most of the system/core datasets in CDAP are TransactionAware, hence it is important for Spark to be able to read/write to those datasets transactionally. In the current version of CDAP (v3.3.0), a Spark program is always executed inside one long transaction, which the starting and committing of the transaction happens before and after the Spark execution. However, this is far from ideal, because

  1. It doesn't work for Spark Streaming, which is a long running job that may never end, hence there is no commit of the transaction.
  2. A Spark program can have loop inside and periodically writing out dataset that it wish to make it visible to other programs (e.g. building an incremental model).
  3. A Spark program can have loop and wanted to read the latest committed copy of the dataset.

Inside a Spark program, we only need transaction when reading/writing from/to dataset is actually performed. It happens when a RDD created from dataset is getting materialized or when saving a RDD to a dataset, which are triggered by an action (see https://spark.apache.org/docs/latest/programming-guide.html#actions) performed on RDD. Internally, this is roughly what happen inside Spark to perform an action on RDD.

  1. Create a Job. Spark will based on the RDD lineage to create multiple stages and tasks to be execute on the executor nodes.
    1. A Job contains a list of stages
      1. Each stage is separated by a shuffle boundary
      2. Each stage contains multiple tasks.
        1. A task is the unit work that needs to perform. E.g. reading from source, transformation.. etc
        2. A task operates on a partition (split) of the RDD it is operating on
    2. Stage ID is globally unique inside the Spark program (see http://spark.apache.org/docs/latest/monitoring.html#rest-api)
  2. Schedule execution of tasks based on the DAG connecting different stages
    1. Tasks from the same stage will be executed in parallel
  3. The job is completed when all stages in the DAG is completed

Implicit Transaction

We can support transaction by wrapping the execution of an action (job) inside a transaction, such that read/write on dateset within that job are performed with the same transaction. The commit of the transaction happen when the job ended successfully or otherwise get aborted. Since the support of transaction is implicit, there is no impact on the user code at all.

Explicit Transaction

We can also expose the Transactional interface so that user can choose to execute multiple actions within the same transaction. E.g.

class SimpleSpark extends SparkProgram {
  override def run(implicit sec: SparkExecutionContext) {
    sec.execute(new TxRunnable {
      override def run(dsContext: DatasetContext) {
        val rdd[(String, Int)] = 
          sc.fromDataset(...)
            .map(...)
 
        // First action
        rdd.saveToDataset(...)
 
        // Second action
        rdd.collectAsList()
 
        // Some action on dataset directly
        val table: Table = dsContext.getDataset("table")
        table.put(...)
      }
    });
  }
}

Transaction in Spark Streaming

In Spark Streaming, actions performed on each RDD provided by DStream is actually submitted and executed as a regular Spark job, the transaction model described would still applies on each individual micro-batch of RDD.

With the explicit transaction support, it is easy to construct a Spark Streaming program to consume from Kafka with exactly once semantics.

class SimpleSpark extends SparkProgram {
  override def run(implicit sec: SparkExecutionContext) {

  // Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark
  val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])
 
  // Hold a reference to the current offset ranges, so it can be used downstream
  var offsetRanges = Array[OffsetRange]()
	
  directKafkaStream.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }.map {
    ...
  }.foreachRDD { rdd =>
    sec.execute(new TxRunnable {
      override def run(dsContext: DatasetContext) {
        // Operate on the RDD.
        rdd.map(...).saveAsDataset(...);
        // This happen in the driver
        for (o <- offsetRanges) {
          context.getDataset("kafkaOffsets").save(offsetRanges)
        }
      }
    })
  }
}

 

Runtime

Spark Classes Isolation

We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:

  1. User can easily switch to use a different version of Spark than the one ship with Cask (in SDK).
  2. Provide a proper fix for CDAP-4547 so that we don't pollute the container classpath
  3. Allow us to rewrite the SparkContext easily (see below)
  4. Be able to support concurrent run of Spark programs (in SDK).

SparkContext Class Rewrite

As described above, we need to insert code to SparkContext constructor to capture the SparkContext instance created from the user code. It can be done by using a custom classloader, which we needed anyway for the class isolation purpose. What we need is to modify the SparkContext constructor bytecode so that before the constructor returns (intercept through ASM AdviceAdapter), we store the SparkContext reference in a global concurrent map (runId -> context), so that we can always get the SparkContext based on the runId.

Implicit Transaction

To support implicit transaction, we need to start a new long transaction when a job start and commit/abort the transaction when the job end. Since Spark allows submitting multiple jobs (i.e. perform action on RDD) concurrently from multiple threads, the start/stop transaction needs to be tied to the job ID.

We can use the SparkListener to get callback when a job start/end. However, there are couple complications

  1. Calls to SparkListener is asynchronous, meaning the task may already be submitted or even running before the callback gets called.
  2. The transaction need to be started/committed from the driver process, but there is no easy way to ship the Transaction object to the executor node.
  3. Inside the executor node (hence the task), the job ID is not available.

In order to overcome those complications, we need to rely on the fact that stage ID is globally unique within a spark program (see http://spark.apache.org/docs/latest/monitoring.html#rest-api) and have the driver setup a HTTP service for the executor to acquire transaction information based on the stage ID. This is the sequence of events when a new job is submitted by Spark.

  1. When SparkListener.onJobStart is called, we add all stage IDs under that job (accessible through SparkListenerJobStart.stageInfos) to a global map.
  2. Inside the executor, whenever DatasetContext.getDataset is called, it gets the stage ID from the TaskContext from Spark and make a call to the driver with the stageID to get the Transaction information. The transaction will be used to setup the transaction of the dataset.
  3. In the driver HTTP service, when it received a call from the executor, it will:
    1. If there is already a Transaction for the given stage, it will just respond
    2. If the stage is known (based on the map set by step 1) but without transaction, it will start a new transaction, associate the transaction with the job ID and respond.
    3. If the stage if unknown, it can be 
      1. The listener in step 1 hasn't be triggered yet. In this case, it will block until the listener is triggered. After unblock, it will rerun the logic as described in a and b.
      2. The listener in step 1 has already been triggered. This shouldn't happen and is an error. It will respond with an error.
  4. When SparkListener.onJobEnd is called, if there was transaction started for the job, based on the job completion status, it will either commit or abort the transaction associated with the job.

Explicit Transaction

To support explicit transaction, CDAP will start a new transaction when Transactional.execute is invoked and set the transaction to the SparkContext.properties, which is a thread local properties map. The properties map will be available to the job event in the SparkListener callback methods. The Http service and the SparkListener as mentioned in the Implicit Transaction section can be modified so that it will respond with the transaction in the job properties if there is one instead of starting a new one.

Dataset access from Closure

With the transaction support mentioned about, we can have the SparkExecutionContext returns a DatasetContext that is serializable so that it can be used inside closure function. The only challenge left is when to flush the dataset (mainly Table dataset). We need to modify the Table implementation hierarchy to have the effect of BufferingTable optional. The effect of turning off buffering will impact direct writer through dataset (not the one that done by saving of RDD to dataset), however, it should be acceptable, because it's never a good idea to write to dataset from a Spark function, as function as expected to have no side effect and most of those writes can be done by saving RDD to dataset.