Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

The purpose of the feature is to introduce the control flow in the CDAP data pipelines. This allows user to do the early termination of the data pipelines based on the input arguments. This also allows user execute only certain stages based on the input arguments and thereby saving resources.

Goals

Goal of the feature is to support use cases of controlling the execution of the data pipeline based on the input arguments or based on the number of output/error records produced by the previous stages. Note that this does not cover the use case where decisions are taken based on the field value in each of the individual records.

User Stories 

As a data scientist creating a pipeline to generate an ML model at a large healthcare organization

I would like the capability to have my pipeline use distinct branches for generating the model based on the chosen algorithm

so that

  • I can maintain a single pipeline that can run any algorithm passed as a runtime argument to generate models
  • I can compare the results of all these models and potentially use to make an informed decision on which model best suits my data
  • The comparison is on identical items and is hence reliable, since the only thing that changes is the algorithm that's used to generate the model

    As a data engineer focusing on quality of data,

    I would like to have the capability to terminate my pipeline and optionally clean up any partial output generated so far, if a certain condition in the pipeline determines that the quality of the data being generated by the pipeline does not meet the required standards

    so that

  • I can guarantee to downstream processes that the output of my pipeline always meets certain standards
  • By terminating my pipeline, I do not waste any processing resources in my own pipeline for bad data
  • I also save resources of the downstream processes
  • Checked with Bhooshan and this use case is not for 4.3: As a data engineer developing a pipeline to process healthcare records at a large insurance company,

    I would like to have the capability to define custom conditions in my pipelines that can be written using Java, Scala, Python or Javascript code

    so that

    1. I can use complex healthcare processing logic in the form of conditions that determine which downstream branch of my pipeline should be executed
    2. I do not have to only rely upon simple conditions based on runtime arguments and data statistics
  • Design

    We will introduce new plugin type "condition" in the pipeline.

    Consider the following pipeline for the design purpose:

    Code Block
    																			TRUE
    File (Source) -> CSV Parser(Transform) -> Filter (Transform) -> Condition1--------> Logistic Regression (Sink) 
    																	|		           
    															FALSE	| 				   TRUE
    																	|-----> Condition2-------> Random Forest (Sink)
    																				|
    																		FALSE	|
    																				|		TRUE
    																			Condition3--------> Decision Tree (Sink) 

    In the above pipeline, we want to execute the classification algorithm based on the runtime argument 'input.algorithm'. We also do not want to run the expensive model generation process if the Filter transform did not produce the records enough to proceed further.

    The pipeline is configured with 3 condition stages:

    1. Condition1: output.filter Greater Than 1000 AND input.algorithm Equals 'Logistic Regression'
    2. Condition2: output.filter Greater Than 1000 AND input.algorithm Equals 'Random Forest'
    3. Condition3: output.filter Greater Than 1000 AND input.algorithm Equals 'Decision Tree'

    Representation of the Condition in the Pipeline config

    Since conditions are individual stages in the pipeline, they will appear in the connections and states section in the pipeline config json.

    Code Block{ "connections":[ { "from":"File", "to":"CSV Parser" }, .... { "from":"Condition1", "to":"Logistic Regression", "condition":true }, { "from":"Condition1", "to":"Condition2", "condition":false }, { "from":"Condition2", "to":"Random Forest", "condition":true }, { "from":"Condition2", "to":"Condition3", "condition":false }, { "from":"Condition3", "to":"Decision Tree", "condition":true } ], "stages":[ { "name":"File", ...}, ... { "name":"Condition1", "plugin":{ "name":"Condition", "type":"condition", "label":"Condition1", "artifact":{

    Table of Contents

    Checklist

    •  User Stories Documented
    •  User Stories Reviewed
    •  Design Reviewed
    •  APIs reviewed
    •  Release priorities assigned
    •  Test cases reviewed
    •  Blog post

    Introduction 

    The purpose of the feature is to introduce the control flow in the CDAP data pipelines. This allows user to do the early termination of the data pipelines based on the input arguments. This also allows user execute only certain stages based on the input arguments and thereby saving resources.

    Goals

    Goal of the feature is to support use cases of controlling the execution of the data pipeline based on the input arguments or based on the number of output/error records produced by the previous stages. Note that this does not cover the use case where decisions are taken based on the field value in each of the individual records.

    User Stories 

    • As a data scientist creating a pipeline to generate an ML model at a large healthcare organization

      I would like the capability to have my pipeline use distinct branches for generating the model based on the chosen algorithm

      so that

      1. I can maintain a single pipeline that can run any algorithm passed as a runtime argument to generate models
      2. I can compare the results of all these models and potentially use to make an informed decision on which model best suits my data
      3. The comparison is on identical items and is hence reliable, since the only thing that changes is the algorithm that's used to generate the model

    • As a data engineer focusing on quality of data,

      I would like to have the capability to terminate my pipeline and optionally clean up any partial output generated so far, if a certain condition in the pipeline determines that the quality of the data being generated by the pipeline does not meet the required standards

      so that

      1. I can guarantee to downstream processes that the output of my pipeline always meets certain standards
      2. By terminating my pipeline, I do not waste any processing resources in my own pipeline for bad data
      3. I also save resources of the downstream processes

    • Checked with Bhooshan and this use case is not for 4.3: As a data engineer developing a pipeline to process healthcare records at a large insurance company,

      I would like to have the capability to define custom conditions in my pipelines that can be written using Java, Scala, Python or Javascript code

      so that

      1. I can use complex healthcare processing logic in the form of conditions that determine which downstream branch of my pipeline should be executed
      2. I do not have to only rely upon simple conditions based on runtime arguments and data statistics

    Design

    We will introduce new plugin type "condition" in the pipeline.

    Consider the following pipeline for the design purpose:

    Code Block
    																			TRUE
    File (Source) -> CSV Parser(Transform) -> Filter (Transform) -> Condition1--------> Logistic Regression (Sink) 
    																	|		           
    															FALSE	| 				   TRUE
    																	|-----> Condition2-------> Random Forest (Sink)
    																				|
    																		FALSE	|
    																				|		TRUE
    																			Condition3--------> Decision Tree (Sink) 

    In the above pipeline, we want to execute the classification algorithm based on the runtime argument 'input.algorithm'. We also do not want to run the expensive model generation process if the Filter transform did not produce the records enough to proceed further.

    The pipeline is configured with 3 condition stages:

    1. Condition1: output.filter Greater Than 1000 AND input.algorithm Equals 'Logistic Regression'
    2. Condition2: output.filter Greater Than 1000 AND input.algorithm Equals 'Random Forest'
    3. Condition3: output.filter Greater Than 1000 AND input.algorithm Equals 'Decision Tree'

    Representation of the Condition in the Pipeline config

    Since conditions are individual stages in the pipeline, they will appear in the connections and states section in the pipeline config json.

    Code Block
    {  
       "connections":[  
          {  
             "from":"File",
             "to":"CSV Parser"
          },
    	  ....	
          {  
             "from":"Condition1",
             "to":"Logistic Regression",
             "condition":true
          },
          {  
             "from":"Condition1",
             "to":"Condition2",
             "condition":false
          },
          {  
             "from":"Condition2",
             "to":"Random Forest",
             "condition":true
          },
          {  
             "from":"Condition2",
             "to":"Condition3",
             "condition":false
          },
          {  
             "from":"Condition3",
             "to":"Decision Tree",
             "condition":true
          }
       ],
       "stages":[  
    	  { "name":"File", ...},
    	  ...	
          {  
             "name":"Condition1",
             "plugin":{  
                "name":"Condition",
                "type":"condition",
                "label":"Condition1",
                "artifact":{  
                   "name":"condition-plugins",
                   "version":"1.7.0",
                   "scope":"SYSTEM"
                },
                "properties":{
    			   "condition" : "output.filer > 1000 && input.algorithm == 'Logistic Regression'"	  
                }
             }
          }
       ]
    }

     

    Java API for conditions

    Condition plugin interface will be as follows:

    Code Block
    /**
     * Represents condition to be executed in pipeline.
     */
    public abstract class Condition implements PipelineConfigurable {
      public static final String PLUGIN_TYPE = "condition";
    
      /**
       * Implement this method to execute the code as a part of execution of the condition.
       * If this method returns {@code true}, true branch will get executed, otherwise false
       * branch will get executed.		
       * @param context the condition context, containing information about the pipeline run
       * @throws Exception when there is failure in method execution
       */
      public abstract boolean apply(ConditionContext context) throws Exception;
     
      @Override
      public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
        //no-op
      }
    }
     
    /**
     * Represents the context available to the condition plugin during runtime.
     */
    public interface ConditionContext extends StageContext, Transactional, SecureStore, SecureStoreManager {
    
      /**
       * Return the arguments which can be updated.
       */
      SettableArguments getArguments();
     
      /**
       * Return the statistics (such as number of input records, output records, error records etc.) associated with the stages in the pipeline.
       * Note that these will be predefined statistics.	
       */
      StageStatistics getStageStatistics(String stageName);
    }
     
    public interface StageStatistics {
    	long getValue(string property);
    	long getInputRecordsCount();
    	long getOutputRecordsCount();
    	long getErrorRecordsCount();
    }

     

    Condition evaluation:

    Conditions can be applied on either the runtime arguments or the data statistics such as number of errors from a stage, the number of records output from a stage, etc.

    Following are few approaches to get the data statistics:

    1. Get the statistics from the the metric system, since these statistics are already emitted as a metric. However, metrics are non transactional and there might be delay in updating these metrics which is why we cannot rely on them.
    2. Emit these statistics as MapReduce counters (when running the job with MapReduce as an engine) or Spark accumulators (when running the job as a Spark engine). 

    Pipeline planner changes:

    Adding condition node in the pipeline involves changes in the pipeline planner. However adding conditions might result into the dag which cannot be configured into the Workflow.

    For example - consider following pipeline

    Code Block
     
    									|----------->Test(Sink)
    								   F|	
    S1 (Source) ------->Condition1------|
    								   T|
    									|------------>Rule(Transform)--------->Parser(Transform)---------->Avro(Sink)
    																				^
    																				|	
    S2 (Source) ---------															|
    					|------Joiner---------Aggregator----------------------------|
    S3 (Source) ---------
     
     
    This would result into the following Workflow which we currently do not allow.
     
    									
    								   	          F
    (S1-C1 Connector MapReduce)------->Condition------>(C1 Connector - Test MapReduce)
    								   	  T|
    									   |
    									   |
    									   |																				
    (S2/S3 - C2 Connector MapReduce)------------------>(C2 Connector - Rule-Parser-Avro MapReduce)

     

    Meeting Notes (July 14, 2017)

    Albert, Terence, Andreas, and Sagar had a discussion about conditional execution in the pipeline with respect to the above pipeline.

    There are following few issues in supporting it.

    1. It is not clear from the pipeline that when Condition1 executes to false, should the path (S2 and S3 --> Avro sink) get executed? Answer to this type of question will usually depend on the user who develops the pipeline. We thought about not allowing any external input on the branches of condition, so that the pipeline above is invalid, however this limits the simple use cases for example you will want to perform join only if the data in one of the sources contains certain number of valid records. 

       

       

      Code Block
      													  T
      S1 (source)--->Filter (transform)------>Condition---------->Joiner-------->Sink
      											|				  ^	
      										F	|				  |
      											|				  |
      										(Terminate)			  |
      														S2 (Source)	
      Above use case will not be solved if we do not allow external incoming edge on the condition branch.													

       

       

    2. Generating plan for the pipeline where control flow(conditions) is added in the data flow is not trivial and based on the current pipeline generation algorithm it might generate the Workflow which is not allowed in CDAP.

      Code Block
      (S1-C1 Connector MapReduce)------->Condition------>(C1 Connector - Test MapReduce)
                             
    "name":"condition-plugins",
    1.                T|
        
    "version":"1.7.0",
    1.                  
    "scope":"SYSTEM"
    1.              
    },
    1.        |
           
    "properties":{
    1.  
    1.    
    "condition"
    1.  
    :
    1.  
    "output.filer
    1.  
    >
    1.  
    1000
    1.  
    &&
    1.  
    input.algorithm
    1.  
    ==
    1.  
    'Logistic Regression'"
    1.                  
    }
    1.      |
          
    }
    1.        
    }
    1.     
    ]
    1.  
    }

     

    Java API for conditions

    Condition plugin interface will be as follows:

    Code Block/**
    1.   
    *
    1.  
    Represents
    1.  
    condition
    1.  
    to
    1.  
    be
    1.  
    executed
    1.  
    in
    1.  
    pipeline.
    1.   
    */
    1.  
    public
    1.  
    abstract
    1.  
    class
    1.  
    Condition
    1.  
    implements
    1.  
    PipelineConfigurable
    1.  
    {
    1.    
    public
    1.  
    static
    1.  
    final
    1. | 
    String
    1.  
    PLUGIN_TYPE
    1.  
    =
    1.  
    "condition";
    1.      
    /**
    1.     
    *
    1.  
    Implement
    1.  
    this
    1.  
    method
    1.  
    to
    1.  
    execute
    1.  
    the
    1.  
    code
    1.  
    as
    1.  
    a
    1.  
    part
    1.  
    of
    1.  
    execution
    1.  
    of
    1.  
    the
    1.  
    condition.
    1.     
    *
    1.  
    If
    1.  
    this
    1.  
    method
    1.  
    returns
    1.  
    {@code
    1.  
    true},
    1.  
    true
    1.  
    branch
    1.  
    will
    1.  
    get
    1.  
    executed,
    1.  
    otherwise
    1.  
    false
    1.     
    *
    1.  
    branch
    1.  
    will
    1.  
    get
    1.  
    executed.
    1.     
    *
    1.  
    @param
    1.  
    context
    1.  
    the
    1.  
    condition
    1.  
    context,
    1.  
    containing
    1.  
    information
    1.  
    about
    1.  
    the
    1.  
    pipeline
    1.  
    run
    1.     
    *
    1.  
    @throws
    1.  
    Exception
    1.  
    when
    1.  
    there
    1.  
    is
    1.  
    failure
    1.  
    in
    1. 
      
    method execution */ public abstract boolean apply(ConditionContext context) throws Exception;   @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { //no-op } }   /** * Represents the context available to the condition plugin during runtime. */ public interface ConditionContext extends StageContext, Transactional, SecureStore, SecureStoreManager { /** * Return the arguments which can be updated. */ SettableArguments getArguments(); }

     

    Condition evaluation:

    Conditions can be applied on either the runtime arguments or the data statistics such as number of errors from a stage, the number of records output from a stage, etc.

    Following are few approaches to get the data statistics:

    1. Get the statistics from the the metric system, since these statistics are already emitted as a metric. However, metrics are non transactional and there might be delay in updating these metrics which is why we cannot rely on them.
    2. Emit these statistics as MapReduce counters (when running the job with MapReduce as an engine) or Spark accumulators (when running the job as a Spark engine). 
      (S2/S3 - C2 Connector MapReduce)------------------>(C2 Connector - Rule-Parser-Avro MapReduce)
       
      In Workflow API we do not allow any incoming edge to the condition branches.


    3. Representing it in the UI to make it clear that which stages will be executed when, might require significant work. One of the approach we thought of is when the pipeline is being developed, if the condition node is added, then we add two boxes for the sub-pipelines, one will get executed when the condition evaluates to true and other when it results into false. There wont be any incoming edge to any of the stages in the sub pipeline from the stage outside the box.

      Image Added

      Making control flow more explicit, makes it clear for the user that when the condition is false, sources S2 and S3 will not get evaluated. However challenge here is once the pipeline is developed and user wants to add the conditions in the pipeline, then stages already in the pipeline will need to be re-adjusted to put them in the correct box, which may be tedious. Also there can be more nested conditions inside each of the boxes as well, which would be hard to represent in the UI.


    Conclusion:
    We propose that in 4.3 we support the use case where we allow termination of the pipeline, provided that no other incoming edge on the condition branches. The other use case of conditionally executing the sub-pipelines can be satisfied by trigger functionality.

    Approach

    Approach #1

    Approach #2

    API changes

    New Programmatic APIs

    New Java APIs introduced (both user facing and internal)

    Deprecated Programmatic APIs

    New REST APIs

    PathMethodDescriptionResponse CodeResponse
    /v3/apps/<app-id>GETReturns the application spec for a given application

    200 - On success

    404 - When application is not available

    500 - Any internal errors

     

         

    Deprecated REST API

    PathMethodDescription
    /v3/apps/<app-id>GETReturns the application spec for a given application

    CLI Impact or Changes

    • Impact #1
    • Impact #2
    • Impact #3

    UI Impact or Changes

    • Impact #1
    • Impact #2
    • Impact #3

    Security Impact 

    What's the impact on Authorization and how does the design take care of this aspect

    Impact on Infrastructure Outages 

    System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

    Test Scenarios

    Test IDTest DescriptionExpected Results
       
       
       
       

    Releases

    Release X.Y.Z

    Release X.Y.Z

    Related Work

    • Work #1
    • Work #2
    • Work #3

     

    Future work