Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Action interface to be implemented by action plugins: 

    Code Block
    languagejava
    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public abstract class Action extends PipelineConfigurable {
    	public static final String PLUGIN_TYPE = "action";
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
         * @param context the action context, containing information about the pipeline run
     	 * @throws Exception when there is failure in method execution
     	 */
    	public abstract void run(ActionContext context) throws Exception;	
    }

     

  2. ActionContext interface will be available to the action plugins. 

    Code Block
    languagejava
    /**
     * Represents the context available to the actionAction plugin during runtime.
     */
    public interface ActionContext extends DatasetContextTransactional, PluginContext {
    	
      /**
      	 * Returns the logical start time of the batch job which triggers this instance of an action. 
         * Logical start time is the time when the triggering Batch job is supposed to start if it is
       * started by the scheduler. Otherwise it 
     	 * would be the current time when the action runs.
      	 *
     	  * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
     	  */
    	  long getLogicalStartTime();
    
       	/**
    	   * @returnReturn runtimethe arguments ofwhich thecan Batchbe Jobupdated.
     	  */
    	 Map<String, String> getRuntimeArguments();
     
    	/**
     	 * @return a {@link WorkflowToken}.
     	 */
    	WorkflowToken getTokenSettableArguments getArguments(); 
    }

Example Plugin Class:
 

Code Block
languagejava
package co.cask.hydrator.plugin.action;

/**
 * SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSHAction")
@Description("Action to run a script on remote machine.")
public class SSHAction extends Action {
  private final SSHActionConfig config;

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run(ActionContext context) throws Exception {
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();
      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));

      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("Host machine")
    public String host;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Need to dig more about how to access the arguments.
  }
}

...