Custom Actions

Goals

Ability to include custom actions in the Hydrator pipelines.

Checklist

  • User stories documented (Shankar/Sagar)
  • User stories reviewed (Nitin)
  • Design documented (Shankar/Sagar)
  • Design reviewed (Terence/Andreas)
  • Feature merged ()
  • Examples and guides ()
  • Integration tests () 
  • Documentation for feature ()
  • Blog post

Use Cases

  1. The source data may be present locally on one of the machine in cluster, say 'M'. Before the Hydrator pipeline starts, that data need to moved to the HDFS, so that the pipeline can process it. Once the pipeline execution is complete, the data on 'M' is no longer required and can be deleted.
  2. Consider a hydrator pipeline which performs some transformations on the web access data which might contain some malformed records.  If the malformed records are below certain threshold, then only we should copy the products and users datasets to HDFS for the further processing by the pipeline. Once the data is in HDFS, it can be deleted from the source from where it was copied.
  3. Consider a scenario where data scientist wants to perform a join between customer profiles stored in the SQL server and customer browsing history stored on the HDFS. Customer profiles are generated on the fly using the SQL stored procedure. Data scientist wants to delay the generation of the customer profiles as mush as possible so that he can get more accurate view during analysis. He also wants to get the updated customer profiles each time he performs join. He can use the data pipeline which triggers the generation of the user profile asynchronously. Later pipeline continues to process the customers browsing history data. Once the browsing data is processed, pipeline should check if the customer profiles are generated. If not then it should wait till profiles are generated. At which point pipeline continues to execute join on browsing history and customer profiles.

User stories

As a creator of the data pipeline, I want an ability to:

  1. copy the data from one of the machines on the cluster to HDFS for processing.
  2. delete the data from the machine once it's copied to HDFS.
  3. execute the (Shell/Python/Perl/R) script located on the remote machine during the execution of the pipeline.
  4. archive the data in the HDFS generated by older pipeline runs.
  5. execute the SQL server stored procedure on the remote SQL server during the execution of the pipeline.
  6. wait during the pipeline execution till the required data is generated.
  7. extend the action to be able to add custom plugins.

Scope for 3.5:

  1. Support the data pipeline, which will only include the DAG of actions to be executed.
  2. Support running the actions before Source or after the Sink stage of the DataPipelineApp.
  3. Following are few actions we might want to support:
    1. SSH Action: To be able to execute Perl/Python/R/Shell/Revo R scripts located on the remote machine
    2. FileCopy action: To be able to copy files from FTP/SFTP to a unix machine, from a unix machine to the HDFS cluster, from FTP to HDFS
    3. FileMove action: To be able to move the files from a source location (HDFS/Unix machine/SFTP) to a destination location (HDFS/unix machine/SFTP)
    4. Impala/Hive action: To be able to run the hive scripts on CDH or HDP cluster
    5. SQL action: To be able to run the SQL stored procedures located on the remote SQL server, copy Hive data to SQL Server table
    6. Email action: To be able to send emails
 DescriptionProperties requiredJSON Example
FileCopy/FileMove
Action
To be able to copy files from FTP/SFTP
to unix machine, from unix machine to
the HDFS cluster, from FTP to HDFS.

To be able to move the files from source location A
(HDFS/Unix machine/SFTP) to destination location B
(HDFS/unix machine/SFTP).  
  • Source A (SSH into, see properties)
  • Source B
    • hostname
    • login credentials
    • /PATH to file
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"SCP-Copy-File",
            "plugin":{
               "name":"File-Manipulation",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "source-host": "hostA.com",
"source-login": "username",
"destination-host": "hostB.com",
"destination-login": "username",
"destination-file-path": "/filepath"
               }
            }
         },
         ...
	]	  
    }
}
SSH Script ActionTo be able to execute Perl/Python/R/Shell/Revo R/Hive Query
scripts located on the remote machine
  • hostname
  • login credentials (keyfiles?)
  • /PATH to script
  • arguments for script
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Run-Remote-Script",
            "plugin":{
               "name":"SSHShell",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "host": "example.com",
                  "script-path":"/tmp/myscript",
"login": "username",
"arguments": "{\"name"\":\"timeout\",\"value\:\"10\"},{\"name\":\"user\",\"value\":\"some_user\"}"
               }
            }
         },
         ...
	]	  
    }
}
SQL ActionTo be able to run the SQL stored procedures located on
the remote SQL server, Copy Hive data to SQL Server table
  • username
  • database name
  • file to push
similar to SSH properties
Email actionTo be able to send emails
  • recipient/sender email
  • message
  • subject
  • username/password auth (if needed)
  • protocol (smtp, smtps, tls)
  • smtp host
  • smtp port
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Email-Bob",
            "plugin":{
               "name":"Email",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "protocol": "smtp",
"recipient": "bob@example.com",
"subject": "PR Review",
"username": "username",
"password": "emailpass",
"message": "Hey Bob, could you take a look at this PR?"
               }
            }
         },
         ...
	]	  
    }
} 

Action Java API:

  1. Action interface to be implemented by action plugins: 

    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public abstract class Action extends PipelineConfigurable {
    	public static final String PLUGIN_TYPE = "action";
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
         * @param context the action context, containing information about the pipeline run
     	 * @throws Exception when there is failure in method execution
     	 */
    	public abstract void run(ActionContext context) throws Exception;	
    }

     

  2. ActionContext interface will be available to the action plugins. 

    /**
     * Represents the context available to the Action plugin during runtime.
     */
    public interface ActionContext extends Transactional, PluginContext {
    
      /**
       * Returns the logical start time of the batch job which triggers this instance of an action.
       * Logical start time is the time when the triggering Batch job is supposed to start if it is
       * started by the scheduler. Otherwise it would be the current time when the action runs.
       *
       * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
       */
      long getLogicalStartTime();
    
      /**
       * Return the arguments which can be updated.
       */
      SettableArguments getArguments();
    }

Example SSH Plugin Class:
 

package co.cask.hydrator.plugin.action;

/**
 * SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSHAction")
@Description("Action to run a script on remote machine.")
public class SSHAction extends Action {
  private final SSHActionConfig config;

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run(ActionContext context) throws Exception {
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();
      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));

      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("Host machine")
    public String host;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Need to dig more about how to access the arguments.
  }
}

Additional Plugins:


SQL Action:

  1. Extends Action Class
  2. Similar logic to QueryAction Plugin
  3. Config Properties:
    1. use QueryConfig

 

HDFS Action:

  1. Extends Action Class
  2. Config Properties:
    1. sourcePath: Path of file/directory
    2. destPath: Path of desired destination
    3. fileRegex: wildcard used to identify types of files to run
  3. Run Method:
    1. Pull all files in given path and filter them if `fileRegex` is set using a `WildCardFileFilter`
    2. For each file, execute a `FileSystem.rename(src, dest)` call to move it to the desired location. Would require getting the hdfs fileSystem first.

     To-do:

  • Allow moving of files from FTP to HDFS.
    • Proposal: parse source path config to see if it is in FTP or HDFS

 

File Action:

  • Moves files between remote machines
  1. Extends SSHAction Class
  2. Config Properties:
    1. Authentication to ssh into destination machine
    2. Authentication to get file(s) from source machine
  3. Run Method:
    1. call super class run method.
    2. build scp or ftp command and pass it to super's constructor


Storing SSH keys:

  1. SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
  2. User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
  3. User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same - 
    1. Store the private key as dataset in the HDFS
    2. Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.

Transaction capabilities for custom actions:

 Currently custom action runs in a short transactions. Action in the pipeline may be executing hive script which might take longer than the configured timeout for the short transaction. Following are few possible approaches that can be taken:

  1. Have run method of the custom action execute in the long transaction.
  2. Instead of executing run() method in the transaction, CustomActionContext can expose a method which will give user the ability to control the transaction. This is similar to the WorkerContext.

    public interface CustomActionContext extends RuntimeContext, Transactional, WorkflowInfoProvider,
      PluginContext, SecureStore {
     
    	/**
     	 * Return the specification of the custom action.
     	 */
    	CustomActionSpecification getSpecification();
    
    	/**
     	 * Returns the logical start time of the batch job which triggers this instance of an action.
     	 * Logical start time is the time when the triggering Batch job is supposed to start if it is started
     	 * by the scheduler. Otherwise it would be the current time when the action runs.
     	 */
    	long getLogicalStartTime();
    
    	/**
     	 * Return the state of the custom action.
     	 */
    	ProgramState getState();
    }
  3. We can add support for executing Workers in the Workflow. However this will cause 2 additional containers to get launch - one for the application master and another for the worker itself. Also worker support running multiple instances which is not required in this case. Generally actions in the pipeline are not heavy duty and it would be ideal to execute them in the Workflow driver itself.

Configurations for the action plugins:


  1. Consider the sample pipeline containing all the action nodes. 

    (Script)----->(CopyToHDFS)------->(Hive)------->(SQL Exporter)
    									  | 	
    									  | 	
    									   ------>(HDFSArchive) 
     
    Where:
    1. Script action is responsible for executing (Shell, Perl, Python etc) script located on the specified machine. This action prepares the input data coming in multiple format(JSON, binary, CSV etc) into the single format(flatten records) as expected by the next stages.
    2. CopyToHDFS action is responsible for copying the flattened files generated in the previous stage to specified HDFS directory.
    3. Hive action is responsible for executing the hive script which populates the Hive tables based on the business logic contained in the script.
    4. SQL Exporter exports the hive table to the relational database.
    5. In parallel, HDFS files generated during the step 2 are archived by HDFSArchive action.
  2. Possible configurations for the pipeline:

    {		
        "artifact":{
          "name":"cdap-data-pipeline",
          "version":"3.5.0-SNAPSHOT",
          "scope":"SYSTEM"
        },
        "name":"MyActionPipeline",  
        "config":{
            "connections":[
             {
                "from":"Script",
                "to":"CopyToHDFS"
             },
             {
                "from":"CopyToHDFS",
                "to":"Hive"
             },
             {
                "from":"Hive",
                "to":"SQLExporter"
             },
             {
                "from":"Hive",
                "to":"HDFSArchive"
             }
            ],
            "stages":[
             {
                "name":"Script",
                "plugin":{
                   "name":"SSH",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "host": "scripthost.com",
                      "scriptFileName":"/tmp/myfile",
    				  "command": "/bin/bash",
    				  "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]			
                   }
                }
             },
             {
                "name":"CopyToHDFS",
                "plugin":{
                   "name":"FileCopy",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "sourceHost": "source.host.com",
                      "sourceLocation":"/tmp/inputDir",
    				  "wildcard": "*.txt",
    				  "targetLocation": "hdfs://hdfs.cluster.example.com/tmp/output"	
                   }
                }
             },
             {
                "name":"Hive",
                "plugin":{
                   "name":"HIVE",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				 "hiveScriptURL": "URL of the hive script",
    				 "blocking": "true",
    				 "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]				 
                   }
                }
    		},
    		{
                "name":"SQLExporter",
                "plugin":{
                   "name":"SQL",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				 "connection": "Connection configurations",
    				 "blocking": "true",
    				 "sqlFileName": "/home/user/sql_exporter.sql",
    				 "arguments": [
    								 { "name": "timeout", "value": 10 },
    								 { "name": "user", "value": "some_user" }
    				  			   ]				 
                   }
                }
    		},
            {
                "name":"HDFSArchive",
                "plugin":{
                   "name":"FileMove",
                   "type":"action",
                   "artifact":{
                      "name":"core-action-plugins",
                      "version":"1.4.0-SNAPSHOT",
                      "scope":"SYSTEM"
                   },
                   "properties":{
    				  "sourceLocation": "hdfs://hdfs.cluster.example.com/data/output",
    				  "targetLocation": "hdfs://hdfs.cluster.example.com/data/archive/output",
    				  "wildcard": "*.txt"	
                   }
                }
             }
    	   ]	  
        }
    }
    
    

     

  3. Based on the connection information specified in the above application configuration, DataPipelineApp will configure the Workflow, which will have one custom action corresponding to each plugin type action in the above config. 


Open Questions:
1. There are multiple SSH libraries are available for Java. We need to figure out which one best suit our use cases -  for example ability to collect logs emitted by script
2. Custom action executes on the Workflow driver which runs as a YARN container. For SSHing to the remote host which is not in cluster, container requires access to the private key file. One way is to have this private file stored on the HDFS where all containers running in cluster can access it. 

Potential SSH Java Libraries

 SSHJJSCHGanymed SSH2
AdvantagesClear API with plenty of examples
in the docs. Fewer lines of code
are required to do the same
thing compared to jsch.
Sticks very close to SSH specs, so could
prove easier to learn for devs
who are very familiar with them.
It also gives devs more control with
low-level SSH protocols, which sshj
and trilead do not as much.
Simplicity at its finest. Community claims getting
simply SSH actions working significantly
easier/faster than when attempting to use jsch.
DrawBacksSame as Ganymed SSH2.Documentation is nonexistent and it is not
necessarily the most intuitive, therefore
most likely leading to a
high learning curve.
That being said, there are many examples
on the web.
It is not possible to have low level access to the
SSH protocol, because it is meant to provide
abstraction to the developer.
Info LinksExamples: https://github.com/
hierynomus/sshj/tree/master/
examples/src/main/java/net/
schmizz/sshj/examples
Argument against using jsch:
https://techtavern.wordpress.com/
2008/09/30/about-jsch-open-source-project/
Example: http://www.jcraft.com/jsch/examples/ 

Examples: http://www.programcreek.com/
java-api-examples/index.php?api=com.trilead.
ssh2.Session

Can we pull logs from
the remote machines? 
Yes! https://github.com/hierynomus/
sshj/blob/master/examples/src/main/
java/net/schmizz/sshj/examples/
RudimentaryPTY.java
Yes! http://stackoverflow.com/questions/
6902386/how-to-read-jsch-command-output
Yes! Use the StreamGlobber class
Library Code Sourcehttps://github.com/hierynomus/sshjhttp://www.jcraft.com/jsch/https://code.google.com/archive/p/ganymed-ssh-2/
LicenseApache LicenseBSD LicenseBSD License

 

 Platform support required

  1. Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.
  2. Unable to locate Jira server for this macro. It may be due to Application Link configuration.