...
Action interface to be implemented by action plugins:
Code Block language java /** * Represents custom action to be executed in the pipeline. */ public interface Action extends PipelineConfigurable, StageLifecycle<ActionContext> { /** * Implement this method to execute the code as a part of action run. * @throws Exception when there is failure in method execution */ void run() throws Exception; }
ActionContext interface will be available to the action plugins.
Code Block language java /** * Represents context available to the action plugin during runtime. */ public interface ActionContext extends DatasetContext, ServiceDiscoverer, PluginContext { /** * Returns the logical start time of the batch job which triggers this instance of an action. * Logical start time is the time when the triggering Batch job is supposed to start if it is started by the scheduler. Otherwise it * would be the current time when the action runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * @return runtime arguments of the Batch Job. */ Map<String, String> getRuntimeArguments(); /** * @return the state of the Action. Useful in the {@link Action#destroy()} method, * to perform any activity based on the the action state. * This method can be moved to PluginContext so that its available to all plugins? */ ActionState getState(); }
- How the ActionConfig will look?
Example Plugin Class:
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action;
import co.cask.cdap.api.annotation.Description;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.etl.api.PipelineConfigurer;
import java.util.Set;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link } SSH into a remote machine and execute a script on that machine.
*/
@Plugin(type = "action")
@Name("SSH-Action")
@Description("Custom Action to run a script on remote machine before starting workflow")
public class SSHAction implements Action {
private final SSHActionConfig config;
private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class);
public SSHAction(SSHActionConfig config) {
super(config);
this.config = config;
}
@Override
public void run() throws Exception{
try {
Connection connection = new Connection(config.host);
connection.connect();
if (config.usingPassAuth) {
if (!connection.authenticateWithPassword(config.user, config.password)) {
throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
config.user, config.host, config.port));
}
} else {
connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
}
Session session = connection.openSession();
session.execCommand(config.scriptCMD);
// Read stdout and stderr
InputStream stdout = new StreamGobbler(session.getStdout());
BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
InputStream stderr = new StreamGobbler(session.getStderr());
BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));
StringBuilder outBuilder = new StringBuilder();
String line = outBuffer.readLine();
while (line != null) {
outBuilder.append(line + "\n");
line = outBuffer.readLine();
}
StringBuilder errBuilder = new StringBuilder();
line = errBuffer.readLine();
while (line != null) {
errBuilder.append(line + "\n");
line = errBuffer.readLine();
}
LOG.info("Output:");
LOG.info(outBuilder.toString());
LOG.info("Errors:");
LOG.info(errBuilder.toString());
session.close();
} catch (IOException e) {
LOG.error("Unable to establish connection.", e);
}
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
this.config.validate();
}
/**
* Config class that contains all the properties needed to SSH into the remote machine and run the script.
*/
public static class SSHActionConfig extends PluginConfig {
@Nullable
@Description("URL of host machine")
public String host;
@Nullable
@Description("User login credentials")
public String user;
@Description("password or private key auth")
public Boolean usingPassAuth;
@Nullable
@Description("Password login credentials")
public String password;
@Nullable
@Description("Private Key File")
public FileSet privateKeyFile;
@Nullable
@Description("Private Key Passphrase")
public String privateKeyPassPhrase;
@Nullable
@Description("Port to connect to")
public int port;
@Nullable
@Description("Path to Script File")
public String scriptPath;
@Nullable
@Description("Script command")
public String scriptCMD;
@Nullable
@Description("Arguments to pass into script")
public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary
public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password,
@Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath,
String scriptCMD, @Nullable Set<String> arguments) {
super(referenceName);
this.host = host;
this.port = port;
this.user = user;
this.privateKeyFile = privateKey;
this.privateKeyPassPhrase = privateKeyPassPhrase;
this.password = password;
this.scriptPath = scriptPath;
this.scriptCMD = scriptCMD;
this.arguments = arguments;
}
public void validate() {
//check that only password or privateKey is set
if (!(password != null ^ privateKeyFile != null)) {
throw new IllegalArgumentException("Must specify either a password or private key file");
}
//either password or privateKey has been passed
usingPassAuth = (password != null);
//check that port is not negative
if (port < 0) {
throw new IllegalArgumentException("Port cannot be negative");
}
}
}
}
|
Configurations for the action plugins:
...