Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Action interface to be implemented by action plugins: 

    Code Block
    languagejava
    /**
     * Represents custom action to be executed in the pipeline.	
     */
    public abstract class Action extends PipelineConfigurable {
    	public static final String PLUGIN_TYPE = "action";
     
    	/**
     	 * Implement this method to execute the code as a part of action run.
         * @param context the action context, containing information about the pipeline run
     	 * @throws Exception when there is failure in method execution
     	 */
    	public abstract void run(ActionContext context) throws Exception;	
    }

     

  2. ActionContext interface will be available to the action plugins. 

    Code Block
    languagejava
    /**
     * Represents context available to the action plugin during runtime.
     */
    public interface ActionContext extends DatasetContext, PluginContext {
    	/**
     	 * Returns the logical start time of the batch job which triggers this instance of an action. 
         * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it 
     	 * would be the current time when the action runs.
     	 *
     	 * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC).
     	 */
    	 long getLogicalStartTime();
     
    	/**
    	 * @return runtime arguments of the Batch Job.
     	 */
    	 Map<String, String> getRuntimeArguments();
     
    	/**
     	 * @return a {@link WorkflowToken}.
     	 */
    	WorkflowToken getToken(); 
    }

Example Plugin Class:
 

 

Code Block
languagejava
package co.cask.hydrator.plugin.action;

import co.cask.cdap.api.annotation.Description;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;

import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.etl.api.PipelineConfigurer;

import java.util.Set;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A {@link } SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSH-Action")
@Description("Custom Action to run a script on remote machine before starting workflow")
public class SSHAction implements Action {
  private final SSHActionConfig config;

  private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class);

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run() throws Exception{
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();

      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));


      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    this.config.validate();
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("URL of host machine")
    public String host;

    @Nullable
    @Description("User login credentials")
    public String user;

    @Description("password or private key auth")
    public Boolean usingPassAuth;

    @Nullable
    @Description("Password login credentials")
    public String password;

    @Nullable
    @Description("Private Key File")
    public FileSet privateKeyFile;

    @Nullable
    @Description("Private Key Passphrase")
    public String privateKeyPassPhrase;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary

    public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password,
                           @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath,
                           String scriptCMD, @Nullable Set<String> arguments) {
      super(referenceName);
      this.host = host;
      this.port = port;
      this.user = user;
      this.privateKeyFile = privateKey;
      this.privateKeyPassPhrase = privateKeyPassPhrase;
      this.password = password;
      this.scriptPath = scriptPath;
      this.scriptCMD = scriptCMD;
      this.arguments = arguments;
    }

    public void validate() {
      //check that only password or privateKey is set
      if (!(password != null ^ privateKeyFile != null)) {
        throw new IllegalArgumentException("Must specify either a password or private key file");
      }

      //either password or privateKey has been passed
      usingPassAuth = (password != null);

      //check that port is not negative
      if (port < 0) {
        throw new IllegalArgumentException("Port cannot be negative");
      }
    }
  }
}

...