...
- copy the data from one of the machine on cluster to HDFS for processing.
- delete the data from the machine once its copied to HDFS.
- execute the (Shell/Python/Perl/R) script located on the remote machine during the execution of the pipeline.
- archive the data in the HDFS generated by older pipeline runs.
- execute the SQL server stored procedure on the remote SQL server during the execution of the pipeline.
- wait during the pipeline execution till the required data is generated.
- extend the action to be able to add custom plugins.
Scope for 3.5:
- Support the data pipeline - which will only include the DAG of actions to be executed.
- Support running the actions before Source or after the Sink stage of the DataPipelineApp.
- Following are few actions we might want to support
- SSH Action: To be able to execute Perl/Python/R/Shell/Revo R scripts located on the remote machine
- FileCopy action: To be able to copy files from FTP/SFTP to unix machine, from unix machine to the HDFS cluster, from FTP to HDFS
- FileMove action: To be able to move the files from source location(HDFS/Unix machine/SFTP) to destination loacation(HDFS/unix machine/SFTP)
- Impala/Hive action: To be able to run the hive scripts on CDH or HDP cluster
- SQL action: To be able to run the SQL stored procedures located on the remote SQL server, Copy Hive data to SQL Server table
- Email action: To be able to send emails
...
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action; /** * A {@link } SSH into a remote machine and execute a script on that machine. */ @Plugin(type = "action") @Name("SSH-ActionSSHAction") @Description("Custom Action to run a script on remote machine before starting workflow.") public class SSHAction implementsextends Action { private final SSHActionConfig config; private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class); public SSHAction(SSHActionConfig config) { super(config); this.config = config; } @Override public void run(ActionContext context) throws Exception { try { Connection connection = new Connection(config.host); connection.connect(); if (config.usingPassAuth) { if (!connection.authenticateWithPassword(config.user, config.password)) { throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d", config.user, config.host, config.port)); } } else { connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private); } Session session = connection.openSession(); session.execCommand(config.scriptCMD); // Read stdout and stderr InputStream stdout = new StreamGobbler(session.getStdout()); BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout)); InputStream stderr = new StreamGobbler(session.getStderr()); BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr)); StringBuilder outBuilder = new StringBuilder(); String line = outBuffer.readLine(); while (line != null) { outBuilder.append(line + "\n"); line = outBuffer.readLine(); } StringBuilder errBuilder = new StringBuilder(); line = errBuffer.readLine(); while (line != null) { errBuilder.append(line + "\n"); line = errBuffer.readLine(); } LOG.info("Output:"); LOG.info(outBuilder.toString()); LOG.info("Errors:"); LOG.info(errBuilder.toString()); session.close(); } catch (IOException e) { LOG.error("Unable to establish connection.", e); } } @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); this.config.validate(); } /** * Config class that contains all the properties needed to SSH into the remote machine and run the script. */ public static class SSHActionConfig extends PluginConfig { @Nullable @Description("URL ofHost host machine") public String host; @Nullable @Description("UserPort to loginconnect credentialsto") public Stringint userport; @Nullable @Description("passwordPath orto privateScript key authFile") public BooleanString usingPassAuthscriptPath; @Nullable @Description("PasswordScript login credentialscommand") public String passwordscriptCMD; @Nullable @Description("PrivateArguments Keyto File")pass public FileSet privateKeyFile; @Nullable @Description("Private Key Passphraseinto script") public StringSet<String> privateKeyPassPhrasearguments; //Need to dig more about @Nullable @Description("Port how to connectaccess to") public int port; @Nullable @Description("Path to Script File") public String scriptPath; @Nullable @Description("Script command") public String scriptCMD; @Nullable @Description("Arguments to pass into script") public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password, @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath, String scriptCMD, @Nullable Set<String> arguments) { super(referenceName); this.host = host; this.port = port; this.user = user; this.privateKeyFile = privateKey; this.privateKeyPassPhrase = privateKeyPassPhrase; this.password = password; this.scriptPath = scriptPath; this.scriptCMD = scriptCMD; this.arguments = arguments; } public void validate() { //check that only password or privateKey is set if (!(password != null ^ privateKeyFile != null)) { throw new IllegalArgumentException("Must specify either a password or private key file"); } //either password or privateKey has been passed usingPassAuth = (password != null); //check that port is not negative if (port < 0) { throw new IllegalArgumentException("Port cannot be negative"); } }the arguments. } } |
Storing SSH keys:
- SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
- User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
- User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same -
- Store the private key as dataset in the HDFS
Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.
...