Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
linenumberstrue
class SimpleSpark extends SparkProgram {
  override def run(implicit sec: SparkExecutionContext) {
    sec.execute(new TxRunnable {
      override def run(dsContext: DatasetContext) {
        val rdd[(String, Int)] = 
          sc.fromDataset(...)
            .map(...)
 
        // First action
        rdd.saveToDataset(...);
 
        // Second action
        rdd.collectAsList();
 
        // Some action on dataset directly
        val table: Table = dsContext.getDataset("table");
        table.put(...);
      }
    });
  }
}

Transaction in Spark Streaming

In Spark Streaming, actions performed on each RDD provided by DStream is actually submitted and executed as a regular Spark job, the transaction model described would still applies on each individual micro-batch of RDD.

With the explicit transaction support, it is easy to construct a Spark Streaming program to consume from Kafka with exactly once semantics.

Code Block
languagescala
linenumberstrue
class SimpleSpark extends SparkProgram {
  override def run(implicit sec: SparkExecutionContext) {

  // Create a DStream with the direct Kafka API in Spark. Copied from the Kafka example in Spark
  val directKafkaStream = KafkaUtils.createDirectStream[
    [key class], [value class], [key decoder class], [value decoder class] ](
    streamingContext, [map of Kafka parameters], [set of topics to consume])
 
  // Hold a reference to the current offset ranges, so it can be used downstream
  var offsetRanges = Array[OffsetRange]()
	
  directKafkaStream.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }.map {
    ...
  }.foreachRDD { rdd =>
    sec.execute(new TxRunnable {
      override def run(dsContext: DatasetContext) {
        // Operate on the RDD.
        rdd.map(...).saveAsDataset(...);
        // This happen in the driver
        for (o <- offsetRanges) {
          context.getDataset("kafkaOffsets").save(offsetRanges)
        }
      }
    })
  }
}

 

Runtime

Spark Classes Isolation

...