Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Hydrator Plugin Experience

...

Installing custom plugins

User installs In CDAP 3.3.0 by downloading the sdk or installing via packages.  Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts.  Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0.  The user creates a pipeline that reads from cassandra and writes to a Table.

 

Code Block
PUT /apps/cassdump -d '
{
  "artifact": { ... },
  "config": {
    "source": {
      "name": "users",
      "plugin": {
        "name": "Cassandra",
        "properties": { ... }
      }
    },
    "sinks": [
      {
        "name": "usersDump",
        "plugin": {
          "name": "Table",
          "properties": { ... }
        }
      }
    ]
  }
}'

 

There are bugs found in the cassandra plugin, and the hydrator-plugins repo creates a new 1.0.1 release.  The user wants to install the cassandra-1.0.1 plugin on CDAP 3.3.0.

User downloads or builds cassandra-1.0.1.  Ideally the user can deploy the plugin jar through the UI.  The user can also deploy through CLI or REST api directly.  The cassandra plugin also includes a .json file for the UI, which decides how settings are rendered and the documentation to show.  The user then updates the existing pipeline, which will pick up the new cassandra-1.0.1 plugin.

Code Block
POST /apps/cassdump/update -d '
{
  "artifact": { ... }
}'

 

After some time, CDAP 3.3.1 is released.  Each CDAP release will include plugins from a specific hydrator-plugins release.  So CDAP 3.3.0 may include 1.0.0 plugins, CDAP 3.3.1 may still include 1.0.2 plugins, while CDAP 3.4.0 may include 2.0.0 plugins.  When the user installs CDAP, cassandra-1.0.2 will be added as well. None of the existing pipelines will change after the upgrade.  They will continue to use whatever plugin version they were created with.  Suppose the user wants to continue using the older 1.0.1 cassandra plugin, but wants to use the newer 1.0.2 table plugin.  They can update the app config:

...

2, installing Hydrator plugins requires uploading the artifact containing the plugins, placing a .json the UI uses to configure widgets into a special folder on the UI machine, and adding docs to cdap's website.

This is problematic for a few reasons:

  1. Too complex.  There are multiple steps required, and they need to happen to multiple systems (against cdap and cdap-ui).

  2. The same UI configs are shared across all plugin artifacts, which is incorrect. Artifacts are versioned, and different versions may require different configs.  Additionally, artifacts with the same name and version may still be different if they are in different namespaces

  3. No way to add docs since they are on the Cask hosted docs page

The problem is that widgets and docs don't belong in CDAP, but they also don't really belong in the UI either. The UI can live on multiple machines, and also don't want to force operations on the filesystem when a user is simply installing a new set of plugins.  Long term, what we really would want is a Hydrator backend, which is an app that manages pipelines, plugins, etc.  In the shorter term, it would be desirable if there were some generic CDAP feature that could allow Hydrator to use CDAP to serve the widget and doc information.

 

The plan then, is to add properties to artifacts. Hydrator can store the widgets and docs for each plugin in the properties of the artifact containing that plugin. You can already set properties on streams and datasets, this would be an analogous feature. This solves the 3 problems mentioned above, while keeping CDAP ignorant of widgets and UI specific things.

Code Block
GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties
GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}?keys=key1,key2
PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}
PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties
DELETE /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}

Installing a set of plugins could then be a single command:

Code Block
load artifact <path/to/artifact> [config-file <config-file>] 

with properties added to the config file, and something of the form:

Code Block
{
  "properties": {
    "widgets.batchsource.database": "<widgets json>",
    "doc.batchsource.database": "<doc>",
    ...
  }
}

 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4280

Installing plugins from hydrator-plugins repository

To install the plugins from hydrator-plugins, a user simply installs the hydrator-plugins rpm or deb.  In addition, cdap-site.xml will have to set the app.artifact.dir setting to include the directory for those plugins (/opt/hydrator-plugins/artifacts).

 

The work for this will include setting up the hydrator-plugins build to create the rpms and debs.  The pom will be changed so that in addition to the jars, the config json will be created.  The json files will be created using an antrun script target that writes out the parents for the artifact, as well as properties for widget and doc information.  Prepare phases can then copy jars and jsons to the right place for packaging to be done.

 

The CDAP build will still need to bundle the results of the hydrator-plugins build in order to build a standalone zip that contains hydrator plugins. To accomplish this, we will add a property ('-Dadditional.artifacts.dir=</path/to/additional/artifacts>') that can be used to copy any additional jars and jsons from some external directory. This property can then be used in standalone builds to pull in the hydrator-plugins artifacts and configs.

Plugin Selection

User installs CDAP 3.3.0 by downloading the sdk or installing via packages.  Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts.  Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0.  The user creates a pipeline that reads from cassandra and writes to a Table.

 

Code Block
PUT /apps/cassdump -d '
{
  "artifact": { ... },
  "config": {
    "source": {
      "name": "users",
      "plugin": {
        "name": "Cassandra",
        "properties": { ... }
      }
    },
    "sinks": [
      {
        "name": "usersDump",
        "plugin": {
          "name": "Table",
          "properties": { ... }
        }
      }
    ]
  }
}'

 

There are bugs found in the cassandra plugin, and the hydrator-plugins repo creates a new 1.0.1 release.  The user wants to install the cassandra-1.0.1 plugin on CDAP 3.3.0.

User downloads or builds cassandra-1.0.1.  Ideally the user can deploy the plugin jar through the UI.  The user can also deploy through CLI or REST api directly.  The cassandra plugin also includes a .json file for the UI, which decides how settings are rendered and the documentation to show.  The user then updates the existing pipeline, which will pick up the new cassandra-1.0.1 plugin.

Code Block
POST /apps/cassdump/update -d '
{
  "artifact": { ... }
}'

 

After some time, CDAP 3.3.1 is released.  Each CDAP release will include plugins from a specific hydrator-plugins release.  So CDAP 3.3.0 may include 1.0.0 plugins, CDAP 3.3.1 may still include 1.0.2 plugins, while CDAP 3.4.0 may include 2.0.0 plugins.  When the user installs CDAP, cassandra-1.0.2 will be added as well. None of the existing pipelines will change after the upgrade.  They will continue to use whatever plugin version they were created with.  Suppose the user wants to continue using the older 1.0.1 cassandra plugin, but wants to use the newer 1.0.2 table plugin.  They can update the app config:

Code Block
POST /apps/cassdump/update -d '
{
  "artifact": { ... },
  "config": {
    "source": {
      "name": "users",
      "plugin": {
        "name": "Cassandra",
        "properties": { ... }
        "artifact": {
          "version": "1.0.1"
        }
      }
    },
    "sinks": [
      {
        "name": "usersDump",
        "plugin": {
          "name": "Table",
          "properties": { ... }
          // If no version is given, the latest version is used.
        }
      }
    ]
  }
}'

Note that the config is different than in 3.2.x.  Name no longer refers to the plugin name, but is a unique name for that stage.  There is also a object dedicated to the plugin.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4228

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4244

Developing a new Plugin

Testing (unit and integration) a new plugin should be as easy as possible.  Pain points today are with integrations with external systems like cassandra or elastic search.  Ideally its not hard to add them.  Maybe use Docker?

 

Unit test pain points are it's hard to test a source or sink in isolation, you end up copying table source or something like it into your plugin project.  We could have a etl-test module that provides mock sources and sinks to help with this.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4227

Building the sdk and packages

We want to ship cdap with plugins.  This will require changes if the plugins are in a different repo than cdap itself.  

To build the sdk and packages, we will have the build:

  1. checkout cdap
  2. mvn install cdap-etl
  3. checkout hydrator-plugins
  4. mvn package hydrator-plugins
  5. mvn command to build sdk
  6. mvn command to build packages

For steps 5 and 6, we can add a maven profile that takes a property that is a path for additional artifacts to include.  It can copy those to the right place, then do whatever it is doing today.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4226

Upgrading Hydrator

We are making backwards incompatible changes to the etl config. This will require a Hydrator upgrade, which will have its own upgrade process that is not tied to the cdap upgrade process.

The Hydrator upgrade tool will have a command that converts an old config into a new config. It will also have a command that will take cdap host/port/credential information and convert all 3.2.x pipelines into 3.3.x pipelines. 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4225

 

New ETL Application features

Swapping between MR and Spark

A user has created a batch pipeline that runs using MapReduce.

Code Block
PUT /apps/mypipeline -d '
{
  "artifact": { ... },
  "config": {
    "engine": "mapreduce",
    ...
  }
}'

The user would like to update the pipeline to run with Spark instead. Ideally they just change on setting:

Code Block
POST /apps/mypipeline/update -d '
{
  "artifact": { ... },
  "config": {
    "engine": "spark",
    ...
  }
}'

If no engine is given, it defaults to mapreduce.  So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.

DAG in a pipeline

A User wants to create a pipeline that forks in the middle.  For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform.  If not, we want to send it to a translate transform before sending it on to the tagger transform.

Use case 1 (Fork):

The pipeline reads from some fileset. It writes to an avro fileset and a database. Before writing to the db, it needs to perform some lookups to add a couple fields.  

Image Added

Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:

Code Block
{
  "config": {
    "source": {
      "name": "fileset",
      "plugin": {
        "name": "File",
        "properties": { ... }
      },
      ...
    },
    "sinks": [
      {
        "name": "DB Sink",
        "plugin": {
          "name": "Database",
          "properties": { ... }
        },
        ...
      },
      {
        "name": "avro fileset",
        "plugin": {
          "name": "TPFSAvro",
          "properties": { ... }
        },
        ...
      }
    }],
    "sinkstransforms": [   
      {
        "name": "usersDumplookup transform",
        "plugin": {
          "name": "Table",Script"
        },
        ...
      }
   "properties ],
    "connections": [
{ ... }    { "from": "fileset", "to": "lookup transform" },
// If no version is given, the latest version is used.
  { "from": "lookup transform", "to": "DB Sink" },
     } { "from": "fileset", "to": "avro fileset" },
    ]
  }
}'

Note that the config is different than in 3.2.x.  Name no longer refers to the plugin name, but is a unique name for that stage.  There is also a object dedicated to the plugin.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4228

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4244

Developing a new Plugin

Testing (unit and integration) a new plugin should be as easy as possible.  Pain points today are with integrations with external systems like cassandra or elastic search.  Ideally its not hard to add them.  Maybe use Docker?

 

Unit test pain points are it's hard to test a source or sink in isolation, you end up copying table source or something like it into your plugin project.  We could have a etl-test module that provides mock sources and sinks to help with this.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4227

Building the sdk and packages

We want to ship cdap with plugins.  This will require changes if the plugins are in a different repo than cdap itself.  

To build the sdk and packages, we will have the build:

  1. checkout cdap
  2. mvn install cdap-etl
  3. checkout hydrator-plugins
  4. mvn package hydrator-plugins
  5. mvn command to build sdk
  6. mvn command to build packages

For steps 5 and 6, we can add a maven profile that takes a property that is a path for additional artifacts to include.  It can copy those to the right place, then do whatever it is doing todayOne requirement here is that each stage will require a unique name.  Today, "name" is really the plugin name.  Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.

Code Block
{
  "config": {
    "source": {
      "name": "tweets",
      "plugin": {
        "name": "Twitter",
        "properties": { ... }
        "artifact": {
          "name": "etl-lib",
          "version": "3.3.0",
          "scope": "system"
        }
      },
    }
    ...
  }
}

plugin artifact information will be optional.  If none is given, the most recent artifact will be chosen.  Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-

...

Upgrading Hydrator

We are making backwards incompatible changes to the etl config. This will require a Hydrator upgrade, which will have its own upgrade process that is not tied to the cdap upgrade process.

The Hydrator upgrade tool will have a command that converts an old config into a new config. It will also have a command that will take cdap host/port/credential information and convert all 3.2.x pipelines into 3.3.x pipelines. 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4225

 

New ETL Application features

Swapping between MR and Spark

A user has created a batch pipeline that runs using MapReduce.

Code Block
PUT /apps/mypipeline -d '
{
  "artifact": { ... },
  "config": {
    "engine": "mapreduce",
    ...
  }
}'

The user would like to update the pipeline to run with Spark instead. Ideally they just change on setting:

Code Block
POST /apps/mypipeline/update -d '
{
  "artifact": { ... },
  "config": {
    "engine": "spark",
    ...
  }
}'

If no engine is given, it defaults to mapreduce.  So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.

Fork in a pipeline (support DAG)

A User wants to create a pipeline that forks in the middle.  For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform.  If not, we want to send it to a translate transform before sending it on to the tagger transform.

 

Image Removed

Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:

...

4230

Use case 2 (selector):

The pipeline reads from twitter. If the tweet is english, we want to run it through an english categorizer transform.  If not, we want to send it to a translate transform before sending it on to the categorizer transform. If the translate transform is unable to translate, we want to write the record to an error dataset. Also, if the tweet is categorized as spam, we want to write the record to the error dataset.  Otherwise, we want to write our translated, categorized tweet to a Table. This could be represented purely with forks:

Image Added

This seems unnatural though.  After every fork, we have a filter on both paths, with each filter the 'not' of the other filter.  Seems more natural to have a selector:

Image Added

 

Code Block
{
  "connections": [
    { 
      "from": "twitter source",
      "to": "language tagger"
    },
    {
      "from": "language tagger",
      "selector": {
        //switch based on the value of the "lang" field
        "field": "lang",
        "switch": {
          //output stage -> field value
          "categorizer": "en"
        },
        // optional. if doesn't match anything in outputs, go here.
        // if absent, record is dropped
        "namedefault": "twittertranslator",
      "plugin":}
{    },
    "name": "Twitter"{
       }"from": "translator",
      ...
"selector": {
   },     "sinksfield": ["lang",
        "switch": {
          "namecategorizer": "sink",
   "en"
     "plugin": {  },
        "namedefault": "Table"invalid tweets table"
      },
    },
   ... {
     } "from": "categorizer",
  ],     "transformsselector": [{
      {  // switch based on the value returned "name": "english filter",by the script
        "pluginscript": "function (input) {
          "name": "ScriptFilter"return input.spam;
        }",
        ...
"switch": {
     },     "categorized tweets table": true,
    {      "invalid tweets  "nametable": "non-english filter",false
        "plugin":}
{      }
    "name": "ScriptFilter"}
         },
        ...
      },      
      {
        "name": "translator",
        "plugin": {
          "name": "Translator"
        },
        ...
      },     ]
}

This would not support a use case where we would want a record to go to multiple outputs.  For example, suppose we are reading from an employees table, and we want to write the employee salary to a table that groups salaries by several categories. If an employee is over a certain age, we want to lookup their retirement plan. If the employee's nationality is x, y, or z, we want to do an immigration status lookup.  No matter what, we want to categorize employee performance before writing to the table:

Image Added

For a use case like this, we could introduce an 'if' condition to the connection.

Code Block
{
  "connections": [
    {
      "from": "employees table",
      {
        "name"to": "taggerretirement plan lookup",
        "pluginif": {

         "namescript": "Tagger"function (input) { return  input.age > 65;  }",
        ..."scriptEngine": "javascript"
      }
    ]},
    "connections": [
{
     { "from": "twitter", "to": "english filter" },
employees table",
     { "fromto": "twitter", "to": "non-english filter" },
 immigration status lookup",
    { "from": "non-english filter", "toif": "translator" }, {
       { "fromscript": "translator", "to": "tagger" },
      { "from": "english filter", "to": "tagger" },
function (input) { return input.nationality == x || input.nationality == y || input.nationality == z; }",
     { "from": "tagger", "toscriptEngine": "sinkjavascript"}
      ]}
   } }

...

,

...

Code Block
{
  "config":  {
      "sourcefrom": {"employees table",
      "nameto": "tweetsperformance categorizer",
    },
 "plugin": {  {
      "namefrom": "Twitterperformance categorizer",
 
      "propertiesto": "salary {by ... }category table"
    },
   "artifact": {

         "namefrom": "etl-libimmigration status lookup",
      "to": "salary by category table"
   "version": "3.3.0", },
    {
      "scopefrom": "system"retirement plan lookup",
      }
      },"to": "salary by category table"
    }
    ...
  }
}

plugin artifact information will be optional.  If none is given, the most recent artifact will be chosen.  Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4230

]
}

This same thing could also be represented as a fork with a filter on the top and bottom branches. So we may not do this.

 

Realtime Stream source

Note

We may just add the ability to read from a stream to a worker instead of this.

...

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  Schema inputSchema = stageConfigurer.getInputSchema();
  // perform validation
  if (!isValid(stageConfigurer.getInputSchema(inputSchema))) {
    throw new IllegalArgumentException("reason")
  }
  stageConfigurer.setOutputSchema(getOutputSchema(inputSchema));
}

...