...
- User stories documented (Shankar/Sagar)
- User stories reviewed (Nitin)
- Design documented (Shankar/Sagar)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
Description | Properties required | JSON Example | |
---|---|---|---|
FileCopy/FileMove Action | To be able to copy files from FTP/SFTP to unix machine, from unix machine to the HDFS cluster, from FTP to HDFS. To be able to move the files from source location A (HDFS/Unix machine/SFTP) to destination location B (HDFS/unix machine/SFTP). |
| { ... "config":{ "connections":[ "stages":[ { "name":"SCP-Copy-File", "plugin":{ "name":"File-Manipulation", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "source-host": "hostA.com", } } }, ... ] } } |
SSH Script Action | To be able to execute Perl/Python/R/Shell/Revo R/Hive Query scripts located on the remote machine |
| { ... "config":{ "connections":[ "stages":[ { "name":"Run-Remote-Script", "plugin":{ "name":"SSHShell", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "host": "scripthostexample.com", "script-path":"/tmp/myscript", } } }, ... ] } } |
SQL Action | To be able to run the SQL stored procedures located on the remote SQL server, Copy Hive data to SQL Server table |
| similar to SSH properties |
Email action | To be able to send emails |
| { ... "config":{ "connections":[ "stages":[ { "name":"Email-Bob", "plugin":{ "name":"Email", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "protocol": "smtp", }
}
},
...
]
}
} |
...
Action interface to be implemented by action plugins:
Code Block language java /** * Represents custom action to be executed in the pipeline. */ public abstract class Action extends PipelineConfigurable { public static final String PLUGIN_TYPE = "action"; /** * Implement this method to execute the code as a part of action run. * @param context the action context, containing information about the pipeline run * @throws Exception when there is failure in method execution */ public abstract void run(ActionContext context) throws Exception; }
ActionContext interface will be available to the action plugins.
Code Block language java /** * Represents the context available to the actionAction plugin during runtime. */ public interface ActionContext extends DatasetContextTransactional, PluginContext { /** * Returns the logical start time of the batch job which triggers this instance of an action. * Logical start time is the time when the triggering Batch job is supposed to start if it is * started by the scheduler. Otherwise it * would be the current time when the action runs. * * @return Time in milliseconds since epoch time (00:00:00 January 1, 1970 UTC). */ long getLogicalStartTime(); /** * @returnReturn runtimethe arguments ofwhich thecan Batchbe Jobupdated. */ Map<String, String>SettableArguments getRuntimeArgumentsgetArguments(); }
Example SSH Plugin Class:
...
Example Plugin Class:
...
language | java |
---|
...
Code Block | ||
---|---|---|
| ||
package co.cask.hydrator.plugin.action;
/**
* SSH into a remote machine and execute a script on that machine.
*/
@Plugin(type = "action")
@Name("SSHAction")
@Description("Action to run a script on remote machine.")
public class SSHAction extends Action {
private final SSHActionConfig config;
public SSHAction(SSHActionConfig config) {
super(config);
this.config = config;
}
@Override
public void run(ActionContext context) throws Exception {
try {
Connection connection = new Connection(config.host);
connection.connect();
if (config.usingPassAuth) {
if (!connection.authenticateWithPassword(config.user, config.password)) {
throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
config.user, config.host, config.port));
}
} else {
connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
}
Session session = connection.openSession();
session.execCommand(config.scriptCMD);
// Read stdout and stderr
InputStream stdout = new StreamGobbler(session.getStdout());
BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
InputStream stderr = new StreamGobbler(session.getStderr());
BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));
StringBuilder outBuilder = new StringBuilder();
String line = outBuffer.readLine();
while (line != null) {
outBuilder.append(line + "\n");
line = outBuffer.readLine();
}
StringBuilder errBuilder = new StringBuilder();
line = errBuffer.readLine();
while (line != null) {
errBuilder.append(line + "\n");
line = errBuffer.readLine();
}
LOG.info("Output:");
LOG.info(outBuilder.toString());
LOG.info("Errors:");
LOG.info(errBuilder.toString());
session.close();
} catch (IOException e) {
LOG.error("Unable to establish connection.", e);
}
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
}
/**
* Config class that contains all the properties needed to SSH into the remote machine and run the script.
*/
public static class SSHActionConfig extends PluginConfig {
@Nullable
@Description("Host machine")
public String host;
@Nullable
@Description("Port to connect to")
public int port;
@Nullable
@Description("Path to Script File")
public String scriptPath;
@Nullable
@Description("Script command")
public String scriptCMD;
@Nullable
@Description("Arguments to pass into script")
public Set<String> arguments; //Need to dig more about how to access the arguments.
}
}
|
Storing SSH keys:
...
Additional Plugins:
SQL Action:
- Extends Action Class
- Similar logic to QueryAction Plugin
- Config Properties:
- use QueryConfig
HDFS Action:
- Extends Action Class
- Config Properties:
- sourcePath: Path of file/directory
- destPath: Path of desired destination
- fileRegex: wildcard used to identify types of files to run
- Run Method:
- Pull all files in given path and filter them if `fileRegex` is set using a `WildCardFileFilter`
- For each file, execute a `FileSystem.rename(src, dest)` call to move it to the desired location. Would require getting the hdfs fileSystem first.
To-do:
- Allow moving of files from FTP to HDFS.
- Proposal: parse source path config to see if it is in FTP or HDFS
File Action:
- Moves files between remote machines
- Extends SSHAction Class
- Config Properties:
- Authentication to ssh into destination machine
- Authentication to get file(s) from source machine
- Run Method:
- call super class run method.
- build scp or ftp command and pass it to super's constructor
Storing SSH keys:
- SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
- User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
- User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same -
- Store the private key as dataset in the HDFS
Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.
...
- Have run method of the custom action execute in the long transaction.
Instead of executing run() method in the transaction, ActionContext (and WorkflowActionContext) CustomActionContext can expose a method which will give user the ability to control the transaction. This is similar to the WorkerContext.
Code Block public interface ActionContext ... {language java java public interface CustomActionContext extends RuntimeContext, Transactional, WorkflowInfoProvider, PluginContext, SecureStore { /** * Return the specification of the custom action. */ CustomActionSpecification getSpecification(); /** * ExecutesReturns athe setlogical ofstart operationstime viaof athe {@linkbatch TxRunnable}job thatwhich aretriggers committedthis asinstance aof singlean transactionaction. * TheLogical {@linkstart TxRunnable}time canis gainthe accesstime towhen {@linkthe Dataset}triggering throughBatch thejob {@linkis DatasetContext}supposed providedto start if *it tois it.started * by the *scheduler. @paramOtherwise runnableit thewould runnablebe tothe becurrent executedtime inwhen the transactionaction runs. * @throws RuntimeException if failed to execute/ long getLogicalStartTime(); /** * Return the givenstate {@linkof TxRunnable}the incustom aaction. transaction */ @Override void execute(TxRunnable runnableProgramState getState(); }
- We can add support for executing Workers in the Workflow. However this will cause 2 additional containers to get launch - one for the application master and another for the worker itself. Also worker support running multiple instances which is not required in this case. Generally actions in the pipeline are not heavy duty and it would be ideal to execute them in the Workflow driver itself.
...
Consider the sample pipeline containing all the action nodes.
Code Block language java (Script)----->(CopyToHDFS)------->(Hive)------->(SQL Exporter) | | ------>(HDFSArchive) Where: 1. Script action is responsible for executing (Shell, Perl, Python etc) script located on the specified machine. This action prepares the input data coming in multiple format(JSON, binary, CSV etc) into the single format(flatten records) as expected by the next stages. 2. CopyToHDFS action is responsible for copying the flattened files generated in the previous stage to specified HDFS directory. 3. Hive action is responsible for executing the hive script which populates the Hive tables based on the business logic contained in the script. 4. SQL Exporter exports the hive table to the relational database. 5. In parallel, HDFS files generated during the step 2 are archived by HDFSArchive action.
Possible configurations for the pipeline:
Code Block language java { "artifact":{ "name":"cdap-data-pipeline", "version":"3.5.0-SNAPSHOT", "scope":"SYSTEM" }, "name":"MyActionPipeline", "config":{ "connections":[ { "from":"Script", "to":"CopyToHDFS" }, { "from":"CopyToHDFS", "to":"Hive" }, { "from":"Hive", "to":"SQLExporter" }, { "from":"Hive", "to":"HDFSArchive" } ], "stages":[ { "name":"Script", "plugin":{ "name":"SSH", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "host": "scripthost.com", "scriptFileName":"/tmp/myfile", "command": "/bin/bash", "arguments": [ { "name": "timeout", "value": 10 }, { "name": "user", "value": "some_user" } ] } } }, { "name":"CopyToHDFS", "plugin":{ "name":"FileCopy", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "sourceHost": "source.host.com", "sourceLocation":"/tmp/inputDir", "wildcard": "*.txt", "targetLocation": "hdfs://hdfs.cluster.example.com/tmp/output" } } }, { "name":"Hive", "plugin":{ "name":"HIVE", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "hiveScriptURL": "URL of the hive script", "blocking": "true", "arguments": [ { "name": "timeout", "value": 10 }, { "name": "user", "value": "some_user" } ] } } }, { "name":"SQLExporter", "plugin":{ "name":"SQL", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "connection": "Connection configurations", "blocking": "true", "sqlFileName": "/home/user/sql_exporter.sql", "arguments": [ { "name": "timeout", "value": 10 }, { "name": "user", "value": "some_user" } ] } } }, { "name":"HDFSArchive", "plugin":{ "name":"FileMove", "type":"action", "artifact":{ "name":"core-action-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "sourceLocation": "hdfs://hdfs.cluster.example.com/data/output", "targetLocation": "hdfs://hdfs.cluster.example.com/data/archive/output", "wildcard": "*.txt" } } } ] } }
Based on the connection information specified in the above application configuration, DataPipelineApp will configure the Workflow, which will have one custom action corresponding to each plugin type action in the above config.
...