Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Introduce a new interface, SparkHttpServiceContext, which provides access to the SparkContext instance created in the Spark program

    Code Block
    languagejava
    linenumberstrue
    public interface SparkHttpServiceContext extends HttpServiceContext {
      SparkContext getSparkContext();
    }
  2. User can add multiple HttpServiceHandler instances to the spark program in the Spark.configure method through the SparkConfigurer

  3. CDAP will call HttpServiceHandler.initialize method with a SparkHttpServiceContext instance.
    1. CDAP will provide an abstract class, AbstractSparkHttpServiceHandler, to deal with the casting in the initialize method.

      Code Block
      languagejava
      linenumberstrue
      public abstract class AbstractSparkHttpServiceHandler extends AbstractHttpServiceHandler {
      
        private SparkContext sparkContext;
      
        @Override
        public void initialize(HttpServiceContext context) throws Exception {
          super.initialize(context);
      
          // Shouldn't happen. The CDAP framework guarantees it.
          if (!(context instanceof SparkHttpServiceContext)) {
            throw new IllegalArgumentException("The context type should be SparkHttpServiceContext");
          }
          this.sparkContext = ((SparkHttpServiceContext) context).getSparkContext();
        }
      
        protected final SparkContext getSparkContext() {
          return sparkContext;
        }
      }
  4. Because CDAP needs to provide the SparkContext to the http handler, the Http Service and the initialization of HttpServiceHandler will only happen after the user Spark program instantiated the SparkContext (see option b. above).

With the CDAP Spark Service support, for example, someone can build a service handler that can execute any Spark SQL against the SparkContext.

Code Block
languagescala
class SimpleSparkHandler extends AbstractSparkHttpServiceHandler {


  @Path("/query")
  @GET
  def query(request: HttpServiceRequest, responder: HttpServiceResponder) {
    val sqlContext = SQLContext.getOrCreate(getSparkContext)
    val df = sqlContext.sql(Charsets.UTF_8.decode(request.getContent).toString);
    
    val builder = new StringBuilder
    df.collect().foreach(row => {
      builder.append(...)
    })
    responder.sendString(builder.toString)
  }
}

 

API for Dataframe/SparkSQL

...