Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 DescriptionProperties requiredJSON Example
FileCopy/FileMove
Action
To be able to copy files from FTP/SFTP
to unix machine, from unix machine to
the HDFS cluster, from FTP to HDFS.

To be able to move the files from source location A
(HDFS/Unix machine/SFTP) to destination location B
(HDFS/unix machine/SFTP).  
  • Source A (SSH into, see properties)
  • Source B
    • hostname
    • login credentials
    • /PATH to file
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"SCP-Copy-File",
            "plugin":{
               "name":"File-Manipulation",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "source-host": "hostA.com",
"source-login": "username",
"destination-host": "hostB.com",
"destination-login": "username",
"destination-file-path": "/filepath"
               }
            }
         },
         ...
	]	  
    }
}
SSH Script ActionTo be able to execute Perl/Python/R/Shell/Revo R/Hive Query
scripts located on the remote machine
  • hostname
  • login credentials (keyfiles?)
  • /PATH to script
  • arguments for script
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Run-Remote-Script",
            "plugin":{
               "name":"SSHShell",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "host": "scripthost.com",
                  "script-path":"/tmp/myscript",
"login": "username",
"arguments": "{\"name"\":\"timeout\",\"value\:\"10\"},{\"name\":\"user\",\"value\":\"some_user\"}"
               }
            }
         },
         ...
	]	  
    }
}
SQL ActionTo be able to run the SQL stored procedures located on
the remote SQL server, Copy Hive data to SQL Server table
  • username
  • database name
  • file to push
similar to SSH properties
Email actionTo be able to send emails
  • recipient/sender email
  • message
  • subject
  • username/password auth (if needed)
  • protocol (smtp, smtps, tls)
  • smtp host
  • smtp port
{		
    ... 
    "config":{
        "connections":[
{
"from": "Copy-File,
  "to": *other end*,
},
...
],
        "stages":[
         {
            "name":"Email-Bob",
            "plugin":{
               "name":"Email",
               "type":"action",
               "artifact":{
                  "name":"core-action-plugins",
                  "version":"1.4.0-SNAPSHOT",
                  "scope":"SYSTEM"
               },
               "properties":{
	          "protocol": "smtp",
"recipient": "bob@cask.co",
"subject": "PR Review",
"username": "username",
"password": "emailpass",
"message": "Hey Bob, could you take a look at this PR?"
               }
            }
         },
         ...
	]	  
    }
} 

Action Java API:

  1. Action interface to be implemented by action plugins: 

    Code Block
    languagejava
    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public abstract interfaceclass Action extends PipelineConfigurable, StageLifecycle<ActionContext> { {
    	public static final String PLUGIN_TYPE = "action";
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
         * @param context the action context, containing information about the pipeline run
     	 * @throws Exception when there is failure in method execution
     	 */
    	public abstract void run(ActionContext context) throws Exception;	
    }

     

  2. ActionContext interface will be available to the action plugins. 

    Code Block
    languagejava
    /**
     * Represents context available to the action plugin during runtime.
     */
    public interface ActionContext extends DatasetContext, ServiceDiscoverer, PluginContext {
    	/**
     	 * Returns the logical start time of the batch job which triggers this instance of an action. 
         * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it 
     	 * would be the current time when the action runs.
     	 *
     	 * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
     	 */
    	 long getLogicalStartTime();
     
    	/**
    	 * @return runtime arguments of the Batch Job.
     	 */
    	 Map<String, String> getRuntimeArguments();
     
    	/**
     	 * @return thea state of the Action{@link WorkflowToken}.
    Useful in	 the {@link Action#destroy()} method, 
         * to perform any activity based on the the action state. 
         * This method can be moved to PluginContext so that its available to all plugins?
    	 */
    	ActionState getState();
    }

     

  3. How the ActionConfig will look?
  4. */
    	WorkflowToken getToken(); 
    }

Example Plugin Class:

 

 

Code Block
languagejava
package co.cask.hydrator.plugin.action;

import co.cask.cdap.api.annotation.Description;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;

import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.etl.api.PipelineConfigurer;

import java.util.Set;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A {@link } SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSH-Action")
@Description("Custom Action to run a script on remote machine before starting workflow")
public class SSHAction implements Action {
  private final SSHActionConfig config;

  private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class);

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run() throws Exception{
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();

      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));


      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    this.config.validate();
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("URL of host machine")
    public String host;

    @Nullable
    @Description("User login credentials")
    public String user;

    @Description("password or private key auth")
    public Boolean usingPassAuth;

    @Nullable
    @Description("Password login credentials")
    public String password;

    @Nullable
    @Description("Private Key File")
    public FileSet privateKeyFile;

    @Nullable
    @Description("Private Key Passphrase")
    public String privateKeyPassPhrase;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary

    public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password,
                           @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath,
                           String scriptCMD, @Nullable Set<String> arguments) {
      super(referenceName);
      this.host = host;
      this.port = port;
      this.user = user;
      this.privateKeyFile = privateKey;
      this.privateKeyPassPhrase = privateKeyPassPhrase;
      this.password = password;
      this.scriptPath = scriptPath;
      this.scriptCMD = scriptCMD;
      this.arguments = arguments;
    }

    public void validate() {
      //check that only password or privateKey is set
      if (!(password != null ^ privateKeyFile != null)) {
        throw new IllegalArgumentException("Must specify either a password or private key file");
      }

      //either password or privateKey has been passed
      usingPassAuth = (password != null);

      //check that port is not negative
      if (port < 0) {
        throw new IllegalArgumentException("Port cannot be negative");
      }
    }
  }
}

 

 

Storing SSH keys:

  1. SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
  2. User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
  3. User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same - 
    1. Store the private key as dataset in the HDFS
    2. Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.

Configurations for the action plugins:

...