...
Code Block | ||||
---|---|---|---|---|
| ||||
class SimpleSpark extends SparkProgram { override def run(implicit sec: SparkExecutionContext) { sec.execute(new TxRunnable { override def run(dsContext: DatasetContext) { val rdd[(String, Int)] = sc.fromDataset(...) .map(...) // First action rdd.saveToDataset(...); // Second action rdd.collectAsList(); // Some action on dataset directly val table: Table = dsContext.getDataset("table"); table.put(...); } }); } } |
Transaction in Spark Streaming
In Spark Streaming, actions performed on each RDD provided by DStream
is actually submitted and executed as a regular Spark job, the transaction model described would still applies on each individual micro-batch of RDD.
With the explicit transaction support, it is easy to construct a Spark Streaming program to consume from Kafka with exactly once semantics.
Code Block | ||||
---|---|---|---|---|
| ||||
class SimpleSpark extends SparkProgram {
override def run(implicit sec: SparkExecutionContext) {
// Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
sec.execute(new TxRunnable {
override def run(dsContext: DatasetContext) {
// Operate on the RDD.
rdd.map(...).saveAsDataset(...);
// This happen in the driver
for (o <- offsetRanges) {
context.getDataset("kafkaOffsets").save(offsetRanges)
}
}
})
}
} |
Runtime
Spark Classes Isolation
...