Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Action interface to be implemented by action plugins: 

    Code Block
    languagejava
    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public abstract class Action extends PipelineConfigurable {
    	public static final String PLUGIN_TYPE = "action";
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
         * @param context the action context, containing information about the pipeline run
     	 * @throws Exception when there is failure in method execution
     	 */
    	public abstract void run(ActionContext context) throws Exception;	
    }

     

  2. ActionContext interface will be available to the action plugins. 

    Code Block
    languagejava
    /**
     * Represents the context available to the Action plugin during runtime.
     */
    public interface ActionContext extends Transactional, PluginContext {
    
      /**
       * Returns the logical start time of the batch job which triggers this instance of an action.
       * Logical start time is the time when the triggering Batch job is supposed to start if it is
       * started by the scheduler. Otherwise it would be the current time when the action runs.
       *
       * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
       */
      long getLogicalStartTime();
    
      /**
       * Return the arguments which can be updated.
       */
      SettableArguments getArguments();
    }

Example SSH Plugin Class:
 

Code Block
languagejava
package co.cask.hydrator.plugin.action;

/**
 * SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSHAction")
@Description("Action to run a script on remote machine.")
public class SSHAction extends Action {
  private final SSHActionConfig config;

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run(ActionContext context) throws Exception {
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();
      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));

      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("Host machine")
    public String host;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Need to dig more about how to access the arguments.
  }
}

Additional Plugins:


SQL Action:

  1. Extends Action Class
  2. Similar logic to QueryAction Plugin
  3. Config Properties:
    1. use QueryConfig

 

HDFS Action:

  1. Extends Action Class
  2. Config Properties:
    1. sourcePath: Path of file/directory
    2. destPath: Path of desired destination
    3. fileRegex: wildcard used to identify types of files to run
  3. Run Method:
    1. Pull all files in given path and filter them if `fileRegex` is set using a `WildCardFileFilter`
    2. For each file, execute a `FileSystem.rename(src, dest)` call to move it to the desired location. Would require getting the hdfs fileSystem first.

     To-do:

  • Allow moving of files from FTP to HDFS.
    • Proposal: parse source path config to see if it is in FTP or HDFS

 

File Action:

  • Moves files between remote machines
  1. Extends SSHAction Class
  2. Config Properties:
    1. Authentication to ssh into destination machine
    2. Authentication to get file(s) from source machine
  3. Run Method:
    1. call super class run method.
    2. build scp or ftp command and pass it to super's constructor


Storing SSH keys:

  1. SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
  2. User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
  3. User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same - 
    1. Store the private key as dataset in the HDFS
    2. Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.

...