...
Action interface to be implemented by action plugins:
Code Block language java /** * Represents custom action to be executed in the pipeline. */ public abstract class Action extends PipelineConfigurable { public static final String PLUGIN_TYPE = "action"; /** * Implement this method to execute the code as a part of action run. * @param context the action context, containing information about the pipeline run * @throws Exception when there is failure in method execution */ public abstract void run(ActionContext context) throws Exception; }
ActionContext interface will be available to the action plugins.
Code Block language java /** * Represents context available to the action plugin during runtime. */ public interface ActionContext extends DatasetContext, PluginContext { /** * Returns the logical start time of the batch job which triggers this instance of an action. * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it * would be the current time when the action runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * @return runtime arguments of the Batch Job. */ Map<String, String> getRuntimeArguments(); /** * @return a {@link WorkflowToken}. */ WorkflowToken getToken(); }
Example Plugin Class:
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action; import co.cask.cdap.api.annotation.Description; import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler; import co.cask.cdap.api.plugin.PluginConfig; import co.cask.cdap.api.dataset.lib.FileSet; import co.cask.cdap.etl.api.PipelineConfigurer; import java.util.Set; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A {@link } SSH into a remote machine and execute a script on that machine. */ @Plugin(type = "action") @Name("SSH-Action") @Description("Custom Action to run a script on remote machine before starting workflow") public class SSHAction implements Action { private final SSHActionConfig config; private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class); public SSHAction(SSHActionConfig config) { super(config); this.config = config; } @Override public void run() throws Exception{ try { Connection connection = new Connection(config.host); connection.connect(); if (config.usingPassAuth) { if (!connection.authenticateWithPassword(config.user, config.password)) { throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d", config.user, config.host, config.port)); } } else { connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private); } Session session = connection.openSession(); session.execCommand(config.scriptCMD); // Read stdout and stderr InputStream stdout = new StreamGobbler(session.getStdout()); BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout)); InputStream stderr = new StreamGobbler(session.getStderr()); BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr)); StringBuilder outBuilder = new StringBuilder(); String line = outBuffer.readLine(); while (line != null) { outBuilder.append(line + "\n"); line = outBuffer.readLine(); } StringBuilder errBuilder = new StringBuilder(); line = errBuffer.readLine(); while (line != null) { errBuilder.append(line + "\n"); line = errBuffer.readLine(); } LOG.info("Output:"); LOG.info(outBuilder.toString()); LOG.info("Errors:"); LOG.info(errBuilder.toString()); session.close(); } catch (IOException e) { LOG.error("Unable to establish connection.", e); } } @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); this.config.validate(); } /** * Config class that contains all the properties needed to SSH into the remote machine and run the script. */ public static class SSHActionConfig extends PluginConfig { @Nullable @Description("URL of host machine") public String host; @Nullable @Description("User login credentials") public String user; @Description("password or private key auth") public Boolean usingPassAuth; @Nullable @Description("Password login credentials") public String password; @Nullable @Description("Private Key File") public FileSet privateKeyFile; @Nullable @Description("Private Key Passphrase") public String privateKeyPassPhrase; @Nullable @Description("Port to connect to") public int port; @Nullable @Description("Path to Script File") public String scriptPath; @Nullable @Description("Script command") public String scriptCMD; @Nullable @Description("Arguments to pass into script") public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password, @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath, String scriptCMD, @Nullable Set<String> arguments) { super(referenceName); this.host = host; this.port = port; this.user = user; this.privateKeyFile = privateKey; this.privateKeyPassPhrase = privateKeyPassPhrase; this.password = password; this.scriptPath = scriptPath; this.scriptCMD = scriptCMD; this.arguments = arguments; } public void validate() { //check that only password or privateKey is set if (!(password != null ^ privateKeyFile != null)) { throw new IllegalArgumentException("Must specify either a password or private key file"); } //either password or privateKey has been passed usingPassAuth = (password != null); //check that port is not negative if (port < 0) { throw new IllegalArgumentException("Port cannot be negative"); } } } } |
...