Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Hydrator Plugin Experience

Using and Installing plugins

Installing custom plugins

In CDAP 3.2, installing Hydrator plugins requires uploading the artifact containing the plugins, placing a .json the UI uses to configure widgets into a special folder on the UI machine, and adding docs to cdap's website.

This is problematic for a few reasons:

  1. Too complex.  There are multiple steps required, and they need to happen to multiple systems (against cdap and cdap-ui).

  2. The same UI configs are shared across all plugin artifacts, which is incorrect. Artifacts are versioned, and different versions may require different configs.  Additionally, artifacts with the same name and version may still be different if they are in different namespaces

  3. No way to add docs since they are on the Cask hosted docs page

The problem is that widgets and docs don't belong in CDAP, but they also don't really belong in the UI either. The UI can live on multiple machines, and also don't want to force operations on the filesystem when a user is simply installing a new set of plugins.  Long term, what we really would want is a Hydrator backend, which is an app that manages pipelines, plugins, etc.  In the shorter term, it would be desirable if there were some generic CDAP feature that could allow Hydrator to use CDAP to serve the widget and doc information.

 

The plan then, is to add properties to artifacts. Hydrator can store the widgets and docs for each plugin in the properties of the artifact containing that plugin. You can already set properties on streams and datasets, this would be an analogous feature. This solves the 3 problems mentioned above, while keeping CDAP ignorant of widgets and UI specific things.

Code Block
GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties
GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}?keys=key1,key2
PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}
PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties
DELETE /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}

Installing a set of plugins could then be a single command:

Code Block
load artifact <path/to/artifact> [config-file <config-file>] 

with properties added to the config file, and something of the form:

Code Block
{
  "properties": {
    "widgets.batchsource.database": "<widgets json>",
    "doc.batchsource.database": "<doc>",
    ...
  }
}

 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4280

Installing plugins from hydrator-plugins repository

To install the plugins from hydrator-plugins, a user simply installs the hydrator-plugins rpm or deb.  In addition, cdap-site.xml will have to set the app.artifact.dir setting to include the directory for those plugins (/opt/hydrator-plugins/artifacts).

 

The work for this will include setting up the hydrator-plugins build to create the rpms and debs.  The pom will be changed so that in addition to the jars, the config json will be created.  The json files will be created using an antrun script target that writes out the parents for the artifact, as well as properties for widget and doc information.  Prepare phases can then copy jars and jsons to the right place for packaging to be done.

 

The CDAP build will still need to bundle the results of the hydrator-plugins build in order to build a standalone zip that contains hydrator plugins. To accomplish this, we will add a property ('-Dadditional.artifacts.dir=</path/to/additional/artifacts>') that can be used to copy any additional jars and jsons from some external directory. This property can then be used in standalone builds to pull in the hydrator-plugins artifacts and configs.

Plugin Selection

User installs CDAP 3.3.0 by downloading the sdk or installing via packages.  Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts.  Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0.  The user creates a pipeline that reads from cassandra and writes to a Table.

...

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4230

Use case 2 (

...

selector):

The pipeline reads from twitter. If the tweet is english, we want to run it through an english categorizer transform.  If not, we want to send it to a translate transform before sending it on to the categorizer transform. If the translate transform is unable to translate, we want to write the record to an error dataset. Also, if the tweet is categorized as spam, we want to write the record to the error dataset.  Otherwise, we want to write our translated, categorized tweet to a Table. This could be represented purely with forks:

...

This seems unnatural though.  After every fork, we have a filter on both paths, with each filter the 'not' of the other filter.  Seems more natural to have conditionsa selector:

 

Code Block
{
  "connections": [
    { 
      "from": [ "twitter source",
      "to": "language tagger"
    },
    { 
      "from": "twitterlanguage sourcetagger",
      "toselector": "language tagger"
    },
    { {
        //switch based on the value of the "lang" field
        "fromfield": "language tagger",lang",
        "switch": {
          //output stage -> field value
          "tocategorizer": "categorizeren",
      "if": { },
       "script": "function (input) { return input.language == "en"; }",
 // optional. if doesn't match anything in outputs, go here.
       "scriptEngine": "javascript" // if absent, record is dropped
        "elseTodefault": "translator"
      }
    },
    {
      "from": "translator",,
      "selector": {
        "tofield": "categorizerlang",
        "ifswitch": {
          "scriptcategorizer": "en"
        },
 def (input):      "default": "invalid tweets table"
   return input['language'] == 'en'}
    },
   ", {
       "scriptEnginefrom": "jythoncategorizer",

       "elseToselector": "invalid tweets table"
 {
        // switch based on the value returned by the script
    }     },
 "script": "function (input) {
  {       "from": "categorizer",
 return input.spam;
     "to": "categorized tweets table}",
        "ifswitch": {
          "scriptcategorized tweets table": "function (input) { return !input.spam; }",true,
          "invalid tweets  table"scriptEngine": "javascript",
 false
      "elseTo": "invalid tweets table"    }
      }
    }
  ]
}

This would also allow more complex cases than just if-else. For would not support a use case where we would want a record to go to multiple outputs.  For example, suppose we are reading from an employees table, and we want to write the employee salary to a table that groups salaries by several categories. If an employee is over a certain age, we want to lookup their retirement plan. If the employee's nationality is x, y, or z, we want to do an immigration status lookup.  No matter what, we want to categorize employee performance before writing to the table:

 

For a use case like this, we could introduce an 'if' condition to the connection.

Code Block
{
  "connections": [
    {
      "from": "employees table",
      "to": "retirement plan lookup",
      "if": {
        "script": "function (input) { return input.age > 65; }",
        "scriptEngine": "javascript"
      }
    },
    {
      "from": "employees table",
      "to": "immigration status lookup",
      "if": {
        "script": "function (input) { return input.nationality == x || input.nationality == y || input.nationality == z; }",
        "scriptEngine": "javascript"
      }
    },
    {
      "from": "employees table",
      "to": "performance categorizer"
    },
    {
      "from": "performance categorizer",
      "to": "salary by category table"
    },
    {
      "from": "immigration status lookup",
      "to": "salary by category table"
    },
    {
      "from": "retirement plan lookup",
      "to": "salary by category table"
    }
  ]
}

One thing to note is that in this pipeline, an employee that is older than 65 with nationality x will get sent to all three branches to generate each type of category.

Also note that one side effect of this change would be that the filter transform would no longer be needed.This same thing could also be represented as a fork with a filter on the top and bottom branches. So we may not do this.

 

Realtime Stream source

Note

We may just add the ability to read from a stream to a worker instead of this.

...

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  for (Schema inputSchema :
stageConfigurer.getInputSchemas()) {     // perform validation
 
  if (!isValid(inputSchemastageConfigurer.getInputSchema())) {
      throw new IllegalArgumentException("reason")
    }
    stageConfigurer.addOutputSchemasetOutputSchema(getOutputSchema(inputSchema));
  }
}

If a plugin does not know what it's output schema will be, or if the output schema is not constant, it will return null. Plugins further in the pipeline will then get null as an input schema.

...