Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package co.cask.hydrator.plugin.action;

import co.cask.cdap.api.annotation.Description;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;

import co.cask.cdap.api.plugin.PluginConfig;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.etl.api.PipelineConfigurer;

import java.util.Set;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A {@link } SSH into a remote machine and execute a script on that machine.
 */
@Plugin(type = "action")
@Name("SSH-Action")
@Description("Custom Action to run a script on remote machine before starting workflow")
public class SSHAction implements Action {
  private final SSHActionConfig config;

  private static final Logger LOG = LoggerFactory.getLogger(SSHAction.class);

  public SSHAction(SSHActionConfig config) {
    super(config);
    this.config = config;
  }

  @Override
  public void run() throws Exception{
    try {
      Connection connection = new Connection(config.host);
      connection.connect();

      if (config.usingPassAuth) {
        if (!connection.authenticateWithPassword(config.user, config.password)) {
          throw new IOException(String.format("Unable to establish SSH connection for %s@%s on port %d",
                                              config.user, config.host, config.port));
        }
      } else {
        connection.authenticateWithPublicKey(config.user, config.privateKeyFile, config.private);
      }

      Session session = connection.openSession();

      session.execCommand(config.scriptCMD);

      // Read stdout and stderr
      InputStream stdout = new StreamGobbler(session.getStdout());
      BufferedReader outBuffer = new BufferedReader(new InputStreamReader(stdout));
      InputStream stderr = new StreamGobbler(session.getStderr());
      BufferedReader errBuffer = new BufferedReader(new InputStreamReader(stderr));


      StringBuilder outBuilder = new StringBuilder();
      String line = outBuffer.readLine();
      while (line != null) {
        outBuilder.append(line + "\n");
        line = outBuffer.readLine();
      }

      StringBuilder errBuilder = new StringBuilder();
      line = errBuffer.readLine();
      while (line != null) {
        errBuilder.append(line + "\n");
        line = errBuffer.readLine();
      }

      LOG.info("Output:");
      LOG.info(outBuilder.toString());
      LOG.info("Errors:");
      LOG.info(errBuilder.toString());

      session.close();
    } catch (IOException e) {
      LOG.error("Unable to establish connection.", e);
    }
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    this.config.validate();
  }

  /**
   * Config class that contains all the properties needed to SSH into the remote machine and run the script.
   */
  public static class SSHActionConfig extends PluginConfig {
    @Nullable
    @Description("URL of host machine")
    public String host;

    @Nullable
    @Description("User login credentials")
    public String user;

    @Description("password or private key auth")
    public Boolean usingPassAuth;

    @Nullable
    @Description("Password login credentials")
    public String password;

    @Nullable
    @Description("Private Key File")
    public FileSet privateKeyFile;

    @Nullable
    @Description("Private Key Passphrase")
    public String privateKeyPassPhrase;

    @Nullable
    @Description("Port to connect to")
    public int port;

    @Nullable
    @Description("Path to Script File")
    public String scriptPath;

    @Nullable
    @Description("Script command")
    public String scriptCMD;

    @Nullable
    @Description("Arguments to pass into script")
    public Set<String> arguments; //Unsure how the arguments would be passed. Can change this as necessary

    public SSHActionConfig(String referenceName, String host, int port, @Nullable String user, @Nullable String password,
                           @Nullable FileSet privateKey, @Nullable String privateKeyPassPhrase, String scriptPath,
                           String scriptCMD, @Nullable Set<String> arguments) {
      super(referenceName);
      this.host = host;
      this.port = port;
      this.user = user;
      this.privateKeyFile = privateKey;
      this.privateKeyPassPhrase = privateKeyPassPhrase;
      this.password = password;
      this.scriptPath = scriptPath;
      this.scriptCMD = scriptCMD;
      this.arguments = arguments;
    }

    public void validate() {
      //check that only password or privateKey is set
      if (!(password != null ^ privateKeyFile != null)) {
        throw new IllegalArgumentException("Must specify either a password or private key file");
      }

      //either password or privateKey has been passed
      usingPassAuth = (password != null);

      //check that port is not negative
      if (port < 0) {
        throw new IllegalArgumentException("Port cannot be negative");
      }
    }
  }
}

 

...

Storing SSH keys:

  1. SSH private and public keys will be generated for the specific user. These keys will be used to SSH to the external machine.
  2. User's public key can be hosted on a machine to which we want to do SSH from YARN container running action.
  3. User's private key will need to be stored on the YARN cluster so that it can be accessed by any container. Following are few options for the same - 
    1. Store the private key as dataset in the HDFS
    2. Store private key in the SecureStore. If program has access to the SecureStore then the ActionContext will need to expose it.

Transaction capabilities for custom actions:

 Currently custom action runs in a short transactions. Action in the pipeline may be executing hive script which might take longer than the configured timeout for the short transaction. Following are few possible approaches that can be taken:

  1. Have run method of the custom action execute in the long transaction.
  2. Instead of executing run() method in the transaction, ActionContext (and WorkflowActionContext) can expose a method which will give user the ability to control the transaction. This is similar to the WorkerContext.

    Code Block
    languagejava
    public interface ActionContext ... {
     
    	/**
     	 * Executes a set of operations via a {@link TxRunnable} that are committed as a single transaction.
     	 * The {@link TxRunnable} can gain access to {@link Dataset} through the {@link DatasetContext} provided
     	 * to it.
     	 *
     	 * @param runnable the runnable to be executed in the transaction
     	 * @throws RuntimeException if failed to execute the given {@link TxRunnable} in a transaction
     	*/
    	@Override
    	void execute(TxRunnable runnable);
    }
  3. We can add support for executing Workers in the Workflow. However this will cause 2 additional containers to get launch - one for the application master and another for the worker itself. Also worker support running multiple instances which is not required in this case. Generally actions in the pipeline are not heavy duty and it would be ideal to execute them in the Workflow driver itself.

Configurations for the action plugins:

...