...
Introduce a new interface,
SparkHttpServiceContext
, which provides access to theSparkContext
instance created in the Spark programCode Block language java linenumbers true public interface SparkHttpServiceContext extends HttpServiceContext { SparkContext getSparkContext(); }
User can add multiple
HttpServiceHandler
instances to the spark program in theSpark.configure
method through the SparkConfigurer- CDAP will call
HttpServiceHandler.initialize
method with aSparkHttpServiceContext
instance.CDAP will provide an abstract class,
AbstractSparkHttpServiceHandler
, to deal with the casting in the initialize method.Code Block language java linenumbers true public abstract class AbstractSparkHttpServiceHandler extends AbstractHttpServiceHandler { private SparkContext sparkContext; @Override public void initialize(HttpServiceContext context) throws Exception { super.initialize(context); // Shouldn't happen. The CDAP framework guarantees it. if (!(context instanceof SparkHttpServiceContext)) { throw new IllegalArgumentException("The context type should be SparkHttpServiceContext"); } this.sparkContext = ((SparkHttpServiceContext) context).getSparkContext(); } protected final SparkContext getSparkContext() { return sparkContext; } }
- Because CDAP needs to provide the
SparkContext
to the http handler, the Http Service and the initialization ofHttpServiceHandler
will only happen after the user Spark program instantiated theSparkContext
(see option b. above).
With the CDAP Spark Service support, for example, someone can build a service handler that can execute any Spark SQL against the SparkContext
.
Code Block | ||
---|---|---|
| ||
class SimpleSparkHandler extends AbstractSparkHttpServiceHandler {
@Path("/query")
@GET
def query(request: HttpServiceRequest, responder: HttpServiceResponder) {
val sqlContext = SQLContext.getOrCreate(getSparkContext)
val df = sqlContext.sql(Charsets.UTF_8.decode(request.getContent).toString);
val builder = new StringBuilder
df.collect().foreach(row => {
builder.append(...)
})
responder.sendString(builder.toString)
}
}
|
API for Dataframe/SparkSQL
...