Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. copy the data from one of the machine on cluster to HDFS for processing.
  2. delete the data from the machine once its copied to HDFS.
  3. execute the (Shell/Python/Perl/R) script located on the remote machine during the execution of the pipeline.
  4. archive the data in the HDFS generated by older pipeline runs.
  5. execute the SQL server stored procedure on the remote SQL server during the execution of the pipeline.
  6. wait during the pipeline execution till the required data is generated.
  7. extend the action to be able to add custom plugins.
     

 

Scope for 3.5:

  1. Support new type of the data pipeline - Action Pipeline which will only include the DAG of actions to be executed.
  2. Support running the actions before Source or after the Sink stage of the DataPipelineApp.
  3. Following are few actions we might want to support
    1. SSH Action: To be able to execute Perl/Python/R/Shell/Revo R scripts located on the remote machine
    2. FileCopy action: To be able to copy files from FTP/SFTP to unix machine, from unix machine to the HDFS cluster, from FTP to HDFS
    3. FileMove action: To be able to move the files from source location(HDFS/Unix machine/SFTP) to destination loacation(HDFS/unix machine/SFTP)
    4. Impala/Hive action: To be able to run the hive scripts on CDH or HDP cluster
    5. SQL action: To be able to run the SQL stored procedures located on the remote SQL server, Copy Hive data to SQL Server table
    6. Email action: To be able to send emails
 DescriptionProperties requiredJSON Example
FileCopy/FileMove
Action
To be able to copy files from FTP/SFTP
to unix machine, from unix machine to
the HDFS cluster, from FTP to HDFS.

To be able to move the files from source location A
(HDFS/Unix machine/SFTP) to destination location B
(HDFS/unix machine/SFTP).  
  • Source A (SSH into, see properties)
  • Source B
    • hostname
    • login credentials
    • /PATH to file
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Copy-File",
            "plugin":{
               "name":"SCP",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "source": {
"host": "hostA.com",
"login": "username"
},
"destination": {
"host": "hostB.com",
"login": "username",
"file-path": "/filepath"
               }
            }
         },
         ...
	]	  
    }
}
SSH Script ActionTo be able to execute Perl/Python/R/Shell/Revo R/Hive Query
scripts located on the remote machine
  • hostname
  • login credentials (keyfiles?)
  • /PATH to script
  • arguments for script
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Copy-File",
            "plugin":{
               "name":"SSH",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "host": "scripthost.com",
                  "script-path":"/tmp/myscript",
"login": "username",
"arguments":[
				{ "name": "timeout", "value": 10 },
				{ "name": "user", "value": "some_user"}
			      ]
               }
            }
         },
         ...
	]	  
    }
}
SQL ActionTo be able to run the SQL stored procedures located on
the remote SQL server, Copy Hive data to SQL Server table
  
Email actionTo be able to send emails
  • recipient/sender email
  • message
  • subject
  • username/password auth (if needed)
  • protocol (smtp, smtps, tls)
  • smtp host
  • smtp port
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Email-Bob",
            "plugin":{
               "name":"Email",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "protocol": "smtp",
"recipient": "bob@cask.co",
"subject": "PR Review",
"auth": {
"username": "username",
"password": "emailpass",
"message": "Hey Bob, could you take a look at this PR?"
               }
            }
         },
         ...
	]	  
    }
}


 

Action Java API:

  1. Action interface to be implemented by action plugins: 
    Code Block
    languagejava
    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public interface Action extends PipelineConfigurable, StageLifecycle<ActionContext> {
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
     	 * @throws Exception when there is failure in method execution
     	 */
    	void run() throws Exception;	
    }
     
  2. ActionContext interface will be available to the action plugins. 
    Code Block
    languagejava
    /**
     * Represents context available to the action plugin during runtime.
     */
    public interface ActionContext extends DatasetContext, ServiceDiscoverer, PluginContext {
    	/**
     	 * Returns the logical start time of the batch job which triggers this instance of an action. 
         * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it 
     	 * would be the current time when the action runs.
     	 *
     	 * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
     	 */
    	 long getLogicalStartTime();
     
    	/**
    	 * @return runtime arguments of the Batch Job.
     	 */
    	 Map<String, String> getRuntimeArguments();
     
    	/**
    	 * @return the state of the Action. Useful in the {@link Action#destroy()} method, 
         * to perform any activity based on the the action state. 
         

...

  1. * This method can be moved to PluginContext so that 

...

Approach:

  1. DAG of only actions will result into the new ActionPipelineApp.

  2. Consider the sample action pipeline:
    its available to all plugins?
    	 */
    	ActionState getState();
    }
     
  3. How the ActionConfig will look?

Configurations for the action plugins:

  1. Consider the sample pipeline containing all the action nodes. 

    Code Block
    languagejava
    (Script)----->(CopyToHDFS)------->(Hive)------->(SQL Exporter)
    									  | 	
    									  | 	
    									   ------>(HDFSArchive) 
     
    Where:
    1. Script action is responsible for executing (Shell, Perl, Python etc) script located on the specified machine. This action prepares the input data coming in multiple format(JSON, binary, CSV etc) into the single format(flatten records) as expected by the next stages.
    2. CopyToHDFS action is responsible for copying the flattened files generated in the previous stage to specified HDFS directory.
    3. Hive action is responsible for executing the hive script which populates the Hive tables based on the business logic contained in the script.
    4. SQL Exporter exports the hive table to the relational database.
    5. In parallel, HDFS files generated during the step 2 are archived by HDFSArchive action.
  2. Possible configurations for the pipeline:

    Code Block
    languagejava
    {		
        "artifact":{
          "name":"cdap-actiondata-pipeline",
          "version":"3.5.0-SNAPSHOT",
          "scope":"SYSTEM"
        },
        "name":"MyActionPipeline",  
        "config":{
            "connections":[
             {
                "from":"Script",
                "to":"CopyToHDFS"
             },
             {
                "from":"CopyToHDFS",
                "to":"Hive"
             },
             {
                "from":"Hive",
                "to":"SQLExporter"
             },
             {
                "from":"Hive",
                "to":"HDFSArchive"
             }
            ],
            "stages":[
             {
                "name":"Script",
                "plugin":{
                   "name":"SSH",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "host": "scripthost.com",
                      "scriptFileName":"/tmp/myfile",
    				  "command": "/bin/bash",
    				  "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]			
                   }
                }
             },
             {
                "name":"CopyToHDFS",
                "plugin":{
                   "name":"FileCopy",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "sourceHost": "source.host.com",
                      "sourceLocation":"/tmp/inputDir",
    				  "wildcard": "*.txt",
    				  "targetLocation": "hdfs://hdfs.cluster.com/tmp/output"	
                   }
                }
             },
             {
                "name":"Hive",
                "plugin":{
                   "name":"HIVE",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				 "hiveScriptURL": "URL of the hive script",
    				 "blocking": "true",
    				 "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]				 
                   }
                }
    		},
    		{
                "name":"SQLExporter",
                "plugin":{
                   "name":"SQL",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				 "connection": "Connection configurations",
    				 "blocking": "true",
    				 "sqlFileName": "/home/user/sql_exporter.sql",
    				 "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]				 
                   }
                }
    		},
            {
                "name":"HDFSArchive",
                "plugin":{
                   "name":"FileMove",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "sourceLocation": "hdfs://hdfs.cluster.com/data/output",
    				  "targetLocation": "hdfs://hdfs.cluster.com/data/archive/output",
    				  "wildcard": "*.txt"	
                   }
                }
             }
    	   ]	  
        }
    }
    
    
     

     

     

  3. Based on the connection information specified in the above application configuration, ActionPipelineApp DataPipelineApp will configure the Workflow, which will have one custom action corresponding to each stage plugin type action in the above config. 


Open Questions:

...