...
Action interface to be implemented by action plugins:
Code Block language java /** * Represents custom action to be executed in the pipeline. */ public abstract class Action extends PipelineConfigurable { public static final String PLUGIN_TYPE = "action"; /** * Implement this method to execute the code as a part of action run. * @param context the action context, containing information about the pipeline run * @throws Exception when there is failure in method execution */ public abstract void run(ActionContext context) throws Exception; }
ActionContext interface will be available to the action plugins.
Code Block language java /** * Represents the context available to the actionAction plugin during runtime. */ public interface ActionContext extends DatasetContextTransactional, PluginContext { /** * Returns the logical start time of the batch job which triggers this instance of an action. * Logical start time is the time when the triggering Batch job is supposed to start if it is * started by the scheduler. Otherwise it * would be the current time when the action runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * @returnReturn runtimethe arguments ofwhich thecan Batchbe Jobupdated. */ Map<String, String> getRuntimeArguments(); /** * @return a {@link WorkflowToken}. */ WorkflowToken getTokenSettableArguments getArguments(); }
Example Plugin Class:
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action; /** * SSH into a remote machine and execute a script on that machine. */ @Plugin(type = "action") @Name("SSHAction") @Description("Action to run a script on remote machine.") public class SSHAction extends Action { private final SSHActionConfig config; public SSHAction(SSHActionConfig config) { super(config); this.config = config; } @Override public void run(ActionContext context) throws Exception { try { Connection connection = new Connection(config.host); connection.connect(); if (config.usingPassAuth) { if (!connection.authenticateWithPassword(config.user, config.password)) { throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d", config.user, config.host, config.port)); } } else { connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private); } Session session = connection.openSession(); session.execCommand(config.scriptCMD); // Read stdout and stderr InputStream stdout = new StreamGobbler(session.getStdout()); BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout)); InputStream stderr = new StreamGobbler(session.getStderr()); BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr)); StringBuilder outBuilder = new StringBuilder(); String line = outBuffer.readLine(); while (line != null) { outBuilder.append(line + "\n"); line = outBuffer.readLine(); } StringBuilder errBuilder = new StringBuilder(); line = errBuffer.readLine(); while (line != null) { errBuilder.append(line + "\n"); line = errBuffer.readLine(); } LOG.info("Output:"); LOG.info(outBuilder.toString()); LOG.info("Errors:"); LOG.info(errBuilder.toString()); session.close(); } catch (IOException e) { LOG.error("Unable to establish connection.", e); } } @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); } /** * Config class that contains all the properties needed to SSH into the remote machine and run the script. */ public static class SSHActionConfig extends PluginConfig { @Nullable @Description("Host machine") public String host; @Nullable @Description("Port to connect to") public int port; @Nullable @Description("Path to Script File") public String scriptPath; @Nullable @Description("Script command") public String scriptCMD; @Nullable @Description("Arguments to pass into script") public Set<String> arguments; //Need to dig more about how to access the arguments. } } |
...