/
Pipeline Branches

Pipeline Branches

 

Overview

Pipelines present a logical view of your business logic. When executed using Spark, a single pipeline will break down into one or more Spark jobs. Each job reads data, processes it, and then writes out the data. One common misconception is that each pipeline stage will get processed completely before moving on to the next one. In reality, multiple pipeline stages will get grouped together into jobs based on the structure of the pipeline. More specifically, shuffles and sinks determine how the pipeline will get broken down. Most plugins in the Analytics section are shuffles (Joiner, Group By, Distinct, Deduplicate). There are several configuration knobs you can adjust to achieve better performance when a pipeline requires multiple Spark jobs.

Branches

Pipeline stages are grouped into Spark jobs by starting from a sink and tracing backwards to a source or a shuffle stage. This means that a pipeline with multiple branches will result in multiple Spark jobs. For example, consider the following pipeline with two branches.

The first Spark job consists of the DataGenerator, Wrangler, and Trash stages. The second Spark job consists of the DataGenerator, Wrangler2, and Trash2 stages. The first job will run to completion before the next job begins. You can tell this is the case because the metrics for the first stages will update and complete before the next stages start. 

In this example, the metrics for DataGenerator, Wrangler, and Trash grow to 1 million output records while Wrangler2 and Trash2 stay at zero. Once they complete, Wrangler2 and Trash2 are processed.



Caching

In the example above, even though the DataGenerator source stage is part of two Spark jobs, the number of output records stays at 1 million instead of 2 million. This means the source is not processed twice. This is achieved using Spark caching. As the first job runs, it caches the output of the DataGenerator stage so that it can be used during the second job. This is important in most pipelines because it prevents multiple reads from the source. This is especially true if it is possible to read different data from the source at different times. For example, suppose a pipeline is reading from a database table that is constantly being updated. If both Spark jobs were to read from the table, they would both read different data because they would be launched at different times. Caching ensures that only a single read is done so that all branches see the same data. Additionally, it is desirable for certain sources like the BigQuery source, where there is a monetary cost associated with each read.

Note that Spark performs caching as part of a Spark job. In the example above, it is performed as part of the first job and then used in the second job. There is not an initial job that reads the data and stops at the cache point. Cached data is kept for the lifetime of a single pipeline run. It is not persisted across runs.

In CDAP 6.1.3 and earlier, data is cached aggressively to try and prevent any type of reprocessing in the pipeline. Starting from CDAP 6.1.4, data is cached at fewer points, only to prevent sources from being read multiple times. This change was done because caching is often more expensive than re-computing a transformation.

Cache Levels

Spark supports several different persistence levels when caching data. You can set the cache level for a pipeline by setting spark.cdap.pipeline.caching.storage.level in the Engine Config section of the pipeline.

 

The full list of possible cache levels can be found at https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence, but we will focus on MEMORY_ONLY, MEMORY_AND_DISK, and DISK_ONLY.  

The MEMORY_ONLY cache level is the fastest because it avoids I/O. However, this requires the most knowledge about your dataset, as you will need to set executor memory high enough to hold all of the data in memory. If Spark runs out of memory, it will kick some part of the data out of memory, which will result in Spark re-computing data. As discussed above, this is often undesirable as it can cause sources to be read multiple times, potentially resulting in different data. In practice, we have found that Spark will sometimes fail to kick out enough cached data and will instead fail with out of memory exceptions.

The MEMORY_AND_DISK level tells Spark to try to cache data in memory first. If it is running low on memory, it will spill cached data to disk and read it from there. In theory, this gives all the benefits of in-memory caching and will also avoid the re-computation problem. In practice, we have found that large Spark jobs will often run out of memory and fail when using this cache level.

The DISK_ONLY level tells Spark to cache data directly to disk instead of trying to keep anything in memory. This is desirable when you know the cached data is much larger than what can fit in memory and will need to be written to disk. We have also found this to be the most reliable cache level, as it will never cause out of memory exceptions.

In CDAP 6.1.3 and earlier, the default cache level is MEMORY_AND_DISK. In CDAP 6.1.4 and later, the default cache level is DISK_ONLY. This change was made because Spark pipelines can run out of memory and fail when using the MEMORY_AND_DISK level. 

Manual Caching

Pipelines will automatically determine which points in the pipeline need to be cached. In some more advanced use cases, it can be desirable to turn this off and manually set cache points. For example, you may be using a transform that makes an API call to an external service, with cost associated with each API call. You may want to manually cache the output of that stage in order to prevent the stage from being reprocessed and making additional API calls.  This can be done by disabling auto caching and manually placing the Data Cacher plugin at points in the pipeline. The Data Cacher plugin can be found in the Analytics section in CDAP 6.1.4 and later.

Automatic caching can be disabled by setting the spark.cdap.pipeline.autocache.enable property to false in the Engine Config.

Stage Consolidation

In certain circumstances, it is possible to consolidate several logical branches of a pipeline into a single physical branch. This reduces the number of Spark jobs and caches that are needed, resulting in better execution times. In our experiments, this has led to up to a 4x improvement in pipeline execution times. 

Consider a simple pipeline with two branches.

Without stage consolidation, this pipeline is executed using two Spark jobs, with data from the source cached to prevent multiple source reads. When stage consolidation is enabled, the Wrangler, Wrangler2, Trash, and Trash2 stages are consolidated into a single physical block, which allows the pipeline to be executed with just a single job.

Stage consolidation can only be done if there are no shuffles being performed on the branch. Most plugins in the Analytics section (Joiner, Group By, Distinct, Deduplicate) require a shuffle. If the pipeline is modified to contain a Distinct plugin after Wrangler2, the branches cannot be consolidated.

Stage consolidation is a performance boost in almost all situations. However, it does require a little more executor memory if a lot of branches are being consolidated. We have conducted experiments with 2 branch, 4 branch, 8 branch, and 16 branch pipelines. Each branch processes 100 million records, with each record around 1 KB in size. Without stage consolidation, the pipeline can be executed with 1 GB executors. With stage consolidation, the 4 and 8 branch pipelines needed 2 GB executors, and the 16 branch pipeline needed 3 GB executors. 

Stage consolidation was added as a Beta feature in CDAP 6.1.4. It can turned on by setting the spark.cdap.pipeline.consolidate.stages runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level.

Parallel Jobs

By default, Spark jobs are executed serially. Starting from CDAP 6.1.0, jobs can be executed in parallel by setting the pipeline.spark.parallel.sinks.enabled runtime argument to true. Like any runtime argument, this can be set at the pipeline, namespace, or instance level. Parallel execution is conceptually similar to breaking a pipeline apart into multiple pipelines. You will need a larger cluster to take full advantage of the parallelism, and you will need to be ok with source data being read multiple times.

Parallel sinks are enabled in the example pipeline below. The pipeline is configured to read 1 million records at the source.

With parallel execution enabled, metrics show for both branches at the same time instead of one branch completing entirely before the next one starts.

Note that the source has output more than the 1 million records it is configured to output. Remember that Spark caches data during a job and not as a separate job. This means that caching cannot be relied upon to prevent multiple reads from a source. In most situations, it is not worth caching anything in a pipeline with parallel execution enabled. As such, parallel execution should only be used if multiple reads from the source is not a problem.

Also note that one branch has processed twice as many records as the other. Even though both Spark jobs are launched at the same time, they may not begin at the same time. Also, if your cluster does not have enough resources to run the entirety of the jobs at the same time, one job may receive more resources than the other, causing them to complete in different amounts of time. In this example, since there are two Spark jobs, your cluster will need to be twice as big as the cluster needed if parallel execution was off. Similarly, a pipeline with N jobs will need a cluster N times as large to take full advantage of the parallelism.