...
Code Block | ||||
---|---|---|---|---|
| ||||
class SimpleSpark extends SparkProgram { override def run(implicit sec: SparkExecutionContext) { // Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume]) // Hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array[OffsetRange]() directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => sec.execute(new TxRunnable { override def run(dsContext: DatasetContext) { // Operate on the RDD. rdd.map(...).saveAsDataset(...); // This happen in the driver for (o <- offsetRanges) { context.getDataset("kafkaOffsets").save(offsetRanges) } } }) } } |
Runtime
Spark Classes Isolation
We need to have spark classes (and it's dependencies) isolated from the system classloader, meaning it cannot be in the system classpath, whether it's the SDK, master or container. It is already true for the master process in cluster mode. It is needed for the following reasons:
...