...
Description | Properties required | JSON Example | |
---|---|---|---|
FileCopy/FileMove Action | To be able to copy files from FTP/SFTP to unix machine, from unix machine to the HDFS cluster, from FTP to HDFS. To be able to move the files from source location A (HDFS/Unix machine/SFTP) to destination location B (HDFS/unix machine/SFTP). |
| { ... "config":{ "connections":[ "stages":[ { "name":"SCP-Copy-File", "plugin":{ "name":"File-Manipulation", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "source-host": "hostA.com", } } }, ... ] } } |
SSH Script Action | To be able to execute Perl/Python/R/Shell/Revo R/Hive Query scripts located on the remote machine |
| { ... "config":{ "connections":[ "stages":[ { "name":"Run-Remote-Script", "plugin":{ "name":"SSHShell", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "host": "scripthost.com", "script-path":"/tmp/myscript", } } }, ... ] } } |
SQL Action | To be able to run the SQL stored procedures located on the remote SQL server, Copy Hive data to SQL Server table |
| similar to SSH properties |
Email action | To be able to send emails |
| { ... "config":{ "connections":[ "stages":[ { "name":"Email-Bob", "plugin":{ "name":"Email", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "protocol": "smtp", }
}
},
...
]
}
} |
Action Java API:
Action interface to be implemented by action plugins:
Code Block language java /** * Represents custom action to be executed in the pipeline. */ public abstract interfaceclass Action extends PipelineConfigurable, StageLifecycle<ActionContext> { { public static final String PLUGIN_TYPE = "action"; /** * Implement this method to execute the code as a part of action run. * @param context the action context, containing information about the pipeline run * @throws Exception when there is failure in method execution */ public abstract void run(ActionContext context) throws Exception; }
ActionContext interface will be available to the action plugins.
Code Block language java /** * Represents context available to the action plugin during runtime. */ public interface ActionContext extends DatasetContext, ServiceDiscoverer, PluginContext { /** * Returns the logical start time of the batch job which triggers this instance of an action. * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it * would be the current time when the action runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * @return runtime arguments of the Batch Job. */ Map<String, String> getRuntimeArguments(); /** * @return thea state of the Action{@link WorkflowToken}. Useful in the {@link Action#destroy()} method, * to perform any activity based on the the action state. * This method can be moved to PluginContext so that its available to all plugins? */ ActionState getState(); }
- How the ActionConfig will look?
*/ WorkflowToken getToken(); }
Example Plugin Class:
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action; import co.cask.cdap.api.annotation.Description; import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler; import co.cask.cdap.api.plugin.PluginConfig; import co.cask.cdap.api.dataset.lib.FileSet; import co.cask.cdap.etl.api.PipelineConfigurer; import java.util.Set; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A {@link } SSH into a remote machine and execute a script on that machine. */ @Plugin(type = "action") @Name("SSH-Action") @Description("Custom Action to run a script on remote machine before starting workflow") public class SSHAction implements Action { private final SSHActionConfig config; private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class); public SSHAction(SSHActionConfig config) { super(config); this.config = config; } @Override public void run() throws Exception{ try { Connection connection = new Connection(config.host); connection.connect(); if (config.usingPassAuth) { if (!connection.authenticateWithPassword(config.user, config.password)) { throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d", config.user, config.host, config.port)); } } else { connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private); } Session session = connection.openSession(); session.execCommand(config.scriptCMD); // Read stdout and stderr InputStream stdout = new StreamGobbler(session.getStdout()); BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout)); InputStream stderr = new StreamGobbler(session.getStderr()); BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr)); StringBuilder outBuilder = new StringBuilder(); String line = outBuffer.readLine(); while (line != null) { outBuilder.append(line + "\n"); line = outBuffer.readLine(); } StringBuilder errBuilder = new StringBuilder(); line = errBuffer.readLine(); while (line != null) { errBuilder.append(line + "\n"); line = errBuffer.readLine(); } LOG.info("Output:"); LOG.info(outBuilder.toString()); LOG.info("Errors:"); LOG.info(errBuilder.toString()); session.close(); } catch (IOException e) { LOG.error("Unable to establish connection.", e); } } @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); this.config.validate(); } /** * Config class that contains all the properties needed to SSH into the remote machine and run the script. */ public static class SSHActionConfig extends PluginConfig { @Nullable @Description("URL of host machine") public String host; @Nullable @Description("User login credentials") public String user; @Description("password or private key auth") public Boolean usingPassAuth; @Nullable @Description("Password login credentials") public String password; @Nullable @Description("Private Key File") public FileSet privateKeyFile; @Nullable @Description("Private Key Passphrase") public String privateKeyPassPhrase; @Nullable @Description("Port to connect to") public int port; @Nullable @Description("Path to Script File") public String scriptPath; @Nullable @Description("Script command") public String scriptCMD; @Nullable @Description("Arguments to pass into script") public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password, @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath, String scriptCMD, @Nullable Set<String> arguments) { super(referenceName); this.host = host; this.port = port; this.user = user; this.privateKeyFile = privateKey; this.privateKeyPassPhrase = privateKeyPassPhrase; this.password = password; this.scriptPath = scriptPath; this.scriptCMD = scriptCMD; this.arguments = arguments; } public void validate() { //check that only password or privateKey is set if (!(password != null ^ privateKeyFile != null)) { throw new IllegalArgumentException("Must specify either a password or private key file"); } //either password or privateKey has been passed usingPassAuth = (password != null); //check that port is not negative if (port < 0) { throw new IllegalArgumentException("Port cannot be negative"); } } } } |
Storing SSH keys:
- SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
- User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
- User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same -
- Store the private key as dataset in the HDFS
- Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.
Configurations for the action plugins:
...