Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

The purpose of the feature is to introduce the control flow in the CDAP data pipelines. This allows user to do the early termination of the data pipelines based on the input arguments. This also allows user execute only certain stages based on the input arguments and thereby saving resources.

Goals

Goal of the feature is to support use cases of controlling the execution of the data pipeline based on the input arguments or based on the number of output/error records produced by the previous stages. Note that this does not cover the use case where decisions are taken based on the field value in each of the individual records.

User Stories 

  • As a data scientist creating a pipeline to generate an ML model at a large healthcare organization

    I would like the capability to have my pipeline use distinct branches for generating the model based on the chosen algorithm

    so that

    1. I can maintain a single pipeline that can run any algorithm passed as a runtime argument to generate models
    2. I can compare the results of all these models and potentially use to make an informed decision on which model best suits my data
    3. The comparison is on identical items and is hence reliable, since the only thing that changes is the algorithm that's used to generate the model

  • As a data engineer focusing on quality of data,

    I would like to have the capability to terminate my pipeline and optionally clean up any partial output generated so far, if a certain condition in the pipeline determines that the quality of the data being generated by the pipeline does not meet the required standards

    so that

    1. I can guarantee to downstream processes that the output of my pipeline always meets certain standards
    2. By terminating my pipeline, I do not waste any processing resources in my own pipeline for bad data
    3. I also save resources of the downstream processes

  • Checked with Bhooshan and this use case is not for 4.3: As a data engineer developing a pipeline to process healthcare records at a large insurance company,

    I would like to have the capability to define custom conditions in my pipelines that can be written using Java, Scala, Python or Javascript code

    so that

    1. I can use complex healthcare processing logic in the form of conditions that determine which downstream branch of my pipeline should be executed
    2. I do not have to only rely upon simple conditions based on runtime arguments and data statistics

Design

We will introduce new plugin type "condition" in the pipeline.

Consider the following pipeline for the design purpose:

																			TRUE
File (Source) -> CSV Parser(Transform) -> Filter (Transform) -> Condition1--------> Logistic Regression (Sink) 
																	|		           
															FALSE	| 				   TRUE
																	|-----> Condition2-------> Random Forest (Sink)
																				|
																		FALSE	|
																				|		TRUE
																			Condition3--------> Decision Tree (Sink) 

In the above pipeline, we want to execute the classification algorithm based on the runtime argument 'input.algorithm'. We also do not want to run the expensive model generation process if the Filter transform did not produce the records enough to proceed further.

The pipeline is configured with 3 condition stages:

  1. Condition1: output.filter Greater Than 1000 AND input.algorithm Equals 'Logistic Regression'
  2. Condition2: output.filter Greater Than 1000 AND input.algorithm Equals 'Random Forest'
  3. Condition3: output.filter Greater Than 1000 AND input.algorithm Equals 'Decision Tree'

Representation of the Condition in the Pipeline config

Since conditions are individual stages in the pipeline, they will appear in the connections and states section in the pipeline config json.

{  
   "connections":[  
      {  
         "from":"File",
         "to":"CSV Parser"
      },
	  ....	
      {  
         "from":"Condition1",
         "to":"Logistic Regression",
         "condition":true
      },
      {  
         "from":"Condition1",
         "to":"Condition2",
         "condition":false
      },
      {  
         "from":"Condition2",
         "to":"Random Forest",
         "condition":true
      },
      {  
         "from":"Condition2",
         "to":"Condition3",
         "condition":false
      },
      {  
         "from":"Condition3",
         "to":"Decision Tree",
         "condition":true
      }
   ],
   "stages":[  
	  { "name":"File", ...},
	  ...	
      {  
         "name":"Condition1",
         "plugin":{  
            "name":"Condition",
            "type":"condition",
            "label":"Condition1",
            "artifact":{  
               "name":"condition-plugins",
               "version":"1.7.0",
               "scope":"SYSTEM"
            },
            "properties":{
			   "condition" : "output.filer > 1000 && input.algorithm == 'Logistic Regression'"	  
            }
         }
      }
   ]
}

 

Java API for conditions

Condition plugin interface will be as follows:

/**
 * Represents condition to be executed in pipeline.
 */
public abstract class Condition implements PipelineConfigurable {
  public static final String PLUGIN_TYPE = "condition";

  /**
   * Implement this method to execute the code as a part of execution of the condition.
   * If this method returns {@code true}, true branch will get executed, otherwise false
   * branch will get executed.		
   * @param context the condition context, containing information about the pipeline run
   * @throws Exception when there is failure in method execution
   */
  public abstract boolean apply(ConditionContext context) throws Exception;
 
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    //no-op
  }
}
 
/**
 * Represents the context available to the condition plugin during runtime.
 */
public interface ConditionContext extends StageContext, Transactional, SecureStore, SecureStoreManager {

  /**
   * Return the arguments which can be updated.
   */
  SettableArguments getArguments();
 
  /**
   * Return the statistics (such as number of input records, output records, error records etc.) associated with the stages in the pipeline.
   * Note that these will be predefined statistics.	
   */
  StageStatistics getStageStatistics(String stageName);
}
 
public interface StageStatistics {
	long getValue(string property);
	long getInputRecordsCount();
	long getOutputRecordsCount();
	long getErrorRecordsCount();
}

 

Condition evaluation:

Conditions can be applied on either the runtime arguments or the data statistics such as number of errors from a stage, the number of records output from a stage, etc.

Following are few approaches to get the data statistics:

  1. Get the statistics from the the metric system, since these statistics are already emitted as a metric. However, metrics are non transactional and there might be delay in updating these metrics which is why we cannot rely on them.
  2. Emit these statistics as MapReduce counters (when running the job with MapReduce as an engine) or Spark accumulators (when running the job as a Spark engine). 

Pipeline planner changes:

Adding condition node in the pipeline involves changes in the pipeline planner. However adding conditions might result into the dag which cannot be configured into the Workflow.

For example - consider following pipeline

 
									|----------->Test(Sink)
								   F|	
S1 (Source) ------->Condition1------|
								   T|
									|------------>Rule(Transform)--------->Parser(Transform)---------->Avro(Sink)
																				^
																				|	
S2 (Source) ---------															|
					|------Joiner-----------------------------------------------|
S3 (Source) ---------
 
 
This would result into the following Workflow which we currently do not allow.
 
									
								   	          F
(S1-C1 Connector MapReduce)------->Condition------>(C1 Connector - Test MapReduce)
								   	  T|
									   |
									   |
									   |																				
(S2/S3 - C2 Connector MapReduce)------------------>(C2 Connector - Rule-Parser-Avro MapReduce)

 

Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

  • No labels