Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If no engine is given, it defaults to mapreduce.  So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.

...

DAG in a pipeline

...

A User wants to create a pipeline that forks in the middle.  For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform.  If not, we want to send it to a translate transform before sending it on to the tagger transform.

 

Use case 1 (Fork):

The pipeline reads from some fileset. It writes to an avro fileset and a database. Before writing to the db, it needs to perform some lookups to add a couple fields.  

Use case 2:

The pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform.  If not, we want to send it to a translate transform before sending it on to the tagger transform.

Image Removed

Use case 3:

The pipeline reads purchase events from a stream.  If the required userid field is null, it wants to write that record to an error table.  It then performs a transform to add user email to the records. If email is invalid, it also wants to write that record to the same error table.

Image Removed

 

Use case 4:

The pipeline reads from an employees database table.  It wants to write salaries for different categories of employees, like male/female, manager/ic, age group.  One employee may fall into multiple categories.  If the employee is in a certain age group, we want to add information about retirement and health plans, but we don't need it for other categories.

Image Removed

 

 

Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:

...

Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:

Code Block
{
  "config": {
    "source": {
      "name": "fileset",
      "plugin": {
        "name": "File",
        "properties": { ... }
      },
      ...
    },
    "sinks": [
      {
        "name": "twitterDB Sink",
        "plugin": {
          "name": "TwitterDatabase",
      },    "properties":  { ... }
        },
        ...
},     "sinks": [ },
      {
        "name": "sinkavro fileset",
        "plugin": {
          "name": "TableTPFSAvro",
          "properties": { ... }
        },
        ...
      }
    ],
    "transforms": [   
      {
        "name": "englishlookup filtertransform",
        "plugin": {
          "name": "ScriptFilter"Script"
        },
        ...
      }
    }],
    "connections": [
  ...    { "from": "fileset", "to": "lookup transform" },
      { "from": "lookup transform", "to": "DB {Sink" },
      { "namefrom": "non-english filterfileset", "to": "avro       "plugin": {fileset" },
    ]
  }
}

One requirement here is that each stage will require a unique name.  Today, "name" is really the plugin name.  Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.

Code Block
{
   "nameconfig": "ScriptFilter"{
        },
        ...
      },      
      "source": {
        "name": "translatortweets",
 
      "plugin": {
 
        "name": "Translator"
        }Twitter",
        ...    "properties": { ... },
        "artifact": {
 {         "name": "taggeretl-lib",
          "pluginversion": {"3.3.0",
          "namescope": "Taggersystem"
        }
      },
    }
   ...
      }
    ],
    "connections": [
      { "from": "twitter", "to": "english filter" },
      { "from": "twitter", "to": "non-english filter" },
      { "from": "non-english filter", "to": "translator" }, 
      { "from": "translator", "to": "tagger" },
      { "from": "english filter", "to": "tagger" },
      { "from": "tagger", "to": "sink"}
    ]
  }
}

One requirement here is that each stage will require a unique name.  Today, "name" is really the plugin name.  Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.

Code Block
{
  "config": {
    "source": {  ...
  }
}

plugin artifact information will be optional.  If none is given, the most recent artifact will be chosen.  Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-4230

Use case 2 (conditions):

The pipeline reads from twitter. If the tweet is english, we want to run it through an english categorizer transform.  If not, we want to send it to a translate transform before sending it on to the categorizer transform. If the translate transform is unable to translate, we want to write the record to an error dataset. Also, if the tweet is categorized as spam, we want to write the record to the error dataset.  Otherwise, we want to write our translated, categorized tweet to a Table. This could be represented purely with forks:

Image Added

This seems unnatural though.  After every fork, we have a filter on both paths, with each filter the 'not' of the other filter.  Seems more natural to have a condition node:

Image Added

Code Block
{
  "conditions": [
    {
      "name": "english condition",
      "script": "function (input) { return input.language != None; }",
      "nameengine": "tweetsjavascript",
    }
  ],
  "pluginconnections": [
    { 
      "from": "twitter source",
      "nameto": "Twitter",language tagger"
    },
    {
      "propertiesfrom": { ... }
 "language tagger",
      "artifactcondition": {
          "namescript": "etl-libfunction (input) { return input.language != None; }",
          "versionengine": "3.3.0javascript",
 
        "scopeifTrue": "systemcategorizer",
        }
"ifFalse": "translator"
     },     }
    ...}
  }]
}

plugin artifact information will be optional.  If none is given, the most recent artifact will be chosen.  Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.

Jira LegacyserverCask Community Issue TrackerserverId45b48dee-c8d6-34f0-9990-e6367dc2fe4bkeyCDAP-4230 

Realtime Stream source

Note

We may just add the ability to read from a stream to a worker instead of this.

...