Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Hydrator Plugin Experience

Using and Installing plugins

User installs CDAP 3.3.0 by downloading the sdk or installing via packages.  Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts.  Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0.  The user creates a pipeline that reads from cassandra and writes to a Table.

 

PUT /apps/cassdump -d '
{
  "artifact": { ... },
  "config": {
    "source": {
      "name": "users",
      "plugin": {
        "name": "Cassandra",
        "properties": { ... }
      }
    },
    "sinks": [
      {
        "name": "usersDump",
        "plugin": {
          "name": "Table",
          "properties": { ... }
        }
      }
    ]
  }
}'

 

There are bugs found in the cassandra plugin, and the hydrator-plugins repo creates a new 1.0.1 release.  The user wants to install the cassandra-1.0.1 plugin on CDAP 3.3.0.

User downloads or builds cassandra-1.0.1.  Ideally the user can deploy the plugin jar through the UI.  The user can also deploy through CLI or REST api directly.  The cassandra plugin also includes a .json file for the UI, which decides how settings are rendered and the documentation to show.  The user then updates the existing pipeline, which will pick up the new cassandra-1.0.1 plugin.

POST /apps/cassdump/update -d '
{
  "artifact": { ... }
}'

 

After some time, CDAP 3.3.1 is released.  Each CDAP release will include plugins from a specific hydrator-plugins release.  So CDAP 3.3.0 may include 1.0.0 plugins, CDAP 3.3.1 may still include 1.0.2 plugins, while CDAP 3.4.0 may include 2.0.0 plugins.  When the user installs CDAP, cassandra-1.0.2 will be added as well. None of the existing pipelines will change after the upgrade.  They will continue to use whatever plugin version they were created with.  Suppose the user wants to continue using the older 1.0.1 cassandra plugin, but wants to use the newer 1.0.2 table plugin.  They can update the app config:

POST /apps/cassdump/update -d '
{
  "artifact": { ... },
  "config": {
    "source": {
      "name": "users",
      "plugin": {
        "name": "Cassandra",
        "properties": { ... }
        "artifact": {
          "version": "1.0.1"
        }
      }
    },
    "sinks": [
      {
        "name": "usersDump",
        "plugin": {
          "name": "Table",
          "properties": { ... }
          // If no version is given, the latest version is used.
        }
      }
    ]
  }
}'

Note that the config is different than in 3.2.x.  Name no longer refers to the plugin name, but is a unique name for that stage.  There is also a object dedicated to the plugin.

Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Developing a new Plugin

Testing (unit and integration) a new plugin should be as easy as possible.  Pain points today are with integrations with external systems like cassandra or elastic search.  Ideally its not hard to add them.  Maybe use Docker?

 

Unit test pain points are it's hard to test a source or sink in isolation, you end up copying table source or something like it into your plugin project.  We could have a etl-test module that provides mock sources and sinks to help with this.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Building the sdk and packages

We want to ship cdap with plugins.  This will require changes if the plugins are in a different repo than cdap itself.  

To build the sdk and packages, we will have the build:

  1. checkout cdap
  2. mvn install cdap-etl
  3. checkout hydrator-plugins
  4. mvn package hydrator-plugins
  5. mvn command to build sdk
  6. mvn command to build packages

For steps 5 and 6, we can add a maven profile that takes a property that is a path for additional artifacts to include.  It can copy those to the right place, then do whatever it is doing today.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Upgrading Hydrator

We are making backwards incompatible changes to the etl config. This will require a Hydrator upgrade, which will have its own upgrade process that is not tied to the cdap upgrade process.

The Hydrator upgrade tool will have a command that converts an old config into a new config. It will also have a command that will take cdap host/port/credential information and convert all 3.2.x pipelines into 3.3.x pipelines. 

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

 

New ETL Application features

Swapping between MR and Spark

A user has created a batch pipeline that runs using MapReduce.

PUT /apps/mypipeline -d '
{
  "artifact": { ... },
  "config": {
    "engine": "mapreduce",
    ...
  }
}'

The user would like to update the pipeline to run with Spark instead. Ideally they just change on setting:

POST /apps/mypipeline/update -d '
{
  "artifact": { ... },
  "config": {
    "engine": "spark",
    ...
  }
}'

If no engine is given, it defaults to mapreduce.  So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.

Fork in a pipeline (support DAG)

A User wants to create a pipeline that forks in the middle.  For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform.  If not, we want to send it to a translate transform before sending it on to the tagger transform.

 

Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:

{
  "config": {
    "source": {
      "name": "twitter",
      "plugin": {
        "name": "Twitter"
      },
      ...
    },
    "sinks": [
      {
        "name": "sink",
        "plugin": {
          "name": "Table"
        },
        ...
      }
    ],
    "transforms": [
      {
        "name": "english filter",
        "plugin": {
          "name": "ScriptFilter"
        },
        ...
      },      
      {
        "name": "non-english filter",
        "plugin": {
          "name": "ScriptFilter"
        },
        ...
      },      
      {
        "name": "translator",
        "plugin": {
          "name": "Translator"
        },
        ...
      },     
      {
        "name": "tagger",
        "plugin": {
          "name": "Tagger"
        },
        ...
      }
    ],
    "connections": [
      { "from": "twitter", "to": "english filter" },
      { "from": "twitter", "to": "non-english filter" },
      { "from": "non-english filter", "to": "translator" }, 
      { "from": "translator", "to": "tagger" },
      { "from": "english filter", "to": "tagger" },
      { "from": "tagger", "to": "sink"}
    ]
  }
}

One requirement here is that each stage will require a unique name.  Today, "name" is really the plugin name.  Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.

{
  "config": {
    "source": {
      "name": "tweets",
      "plugin": {
        "name": "Twitter",
        "properties": { ... }
        "artifact": {
          "name": "etl-lib",
          "version": "3.3.0",
          "scope": "system"
        }
      },
    }
    ...
  }
}

plugin artifact information will be optional.  If none is given, the most recent artifact will be chosen.  Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Realtime Stream source

We may just add the ability to read from a stream to a worker instead of this.

 

We would like to be able to read from a source in realtime.  Workers don't allow us to do this, but Flows do.  But flows would wrap everything in a transaction. What happens if it times out?  What happens if there is an external call and it fails and the flowlet retries the operation?  Can you have a one flowlet flow? Or would we make it multiple flowlets?  For a flow, maybe you wouldn't have to do partitioning yourself.  For flow, you have access to a tick flowlet as well.

One option is to have both a worker and flow implementation, and the config specifies which one is used:

{
  "config": {
    "engine": "worker",
    ...
  }
}

vs.

{
  "config": {
    "engine": "flow",
    ...
  }
}

Note: Flow requires plugin integration with flows.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Controlling Realtime source poll frequency

Today the source is forced to sleep itself if it wants to poll less frequently.  We want to allow the source to change this. For example, it may want to implement exponential backoff if the system it is polling is down. To accomplish this, we can add a couple methods to RealtimeContext to get and set the pollDelay: 

public interface RealtimeContext extends TransformContext {
  ...
 
  // set poll delay in milliseconds
  void setPollDelay(long duration);
 
  // set poll delay in the given time unit
  void setPollDelay(long duration, TimeUnit unit);
 
  // get poll delay in milliseconds
  long getPollDelay();
 
  // get poll delay in the given time unit
  long getPollDelay(TimeUnit unit);
}
 
@Override
public abstract SourceState poll(Emitter<T> writer, SourceState currentState) throws Exception {
  try {
    // perform request
    context.setPollDelay(100);
  } catch (Exception e) {
    context.setPollDelay(context.getPollDelay() * 2);
    return currentState;
  }
}

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

Plugin configure time schema validation

Some plugins may want to perform some schema validation at configure time if the plugin does not work with all types of schemas. For example, Table sink does not support complex types.  If the plugin knows the all records it gets as input always have a specific schema, it can fail early at configure time if it sees a complex type. 

This assumes that for a particular pipeline, every plugin will always output records with the same schema. To do this, we can add getInputSchema() and setOutputSchema() methods to another configurer that is only for the stage and not the entire pipeline.

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  Schema inputSchema = stageConfigurer.getInputSchema();
  // perform validation
  if (!isValid(inputSchema)) {
    throw new IllegalArgumentException("reason")
  }
  stageConfigurer.setOutputSchema(getOutputSchema(inputSchema));
}

If a plugin does not know what it's output schema will be, or if the output schema is not constant, it will return null. Plugins further in the pipeline will then get null as an input schema.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

 

Stage name in log messages

When a user logs something in a plugin, the message has the classname but not the stage name.  So if a pipeline contains multiple stages that use the same plugin, it's hard to tell which message is from which stage.

Unable to locate Jira server for this macro. It may be due to Application Link configuration.

 

New Plugins

 

Kinesis source:

  • User wants to read from Kinesis source (AWS) and persist it into a dataset, as data in AWS kinesis stream is available only for 24hrs.

    "config": {
        "source": {
          "name": "MyKinesisSource",
          "plugin": {
            "name": "Kinesis",
            "properties": {        
              "credentials": "...",
              "shardId" : "id442222",
              "limit" : 5MBps // we can skip this if we use KCL rather than the REST API (have to investigate) 
            }
          }
          ....
      }
    }

Kafka sink:

  • User might want to move from existing streaming platform like kinesis to Apache Kafka?

HTTP source:

  • User might want to read from an HTTP Endpoint exposed by services which provide public data, like YouTube, Foursquare, AWS API,etc instead of implementing separate clients for these.
  • Ingest data from existing system that serves data via HTTP API.

    "config": {
        "source": {
          "name": "ExampleHTTPSource",
          "plugin": {
            "name": "Http",
            "properties": {        
              "method": "GET",
              "url": "http://example.com",
              "headers": "{...}",
              "body": "file-location"
            }
          }
          ....
      }
    }

FTP Source:

"config": {
    "source": {
      "name": "FTPExampleSource",
      "plugin": {
        "name": "Ftp",
        "properties": {        
          "hostname": "example.com",
          "port" : "4433",
          "credentials": "...",
          //optional configs
		  "page-size" : 25, 
		  "keep-alive" : 5m,
		  // there are few parameters to set if the remote server is in different time-zone or uses non-english data,etc
		  "language-code" : "..",
		  "time-zone" : "..."
        }
      }
      ....
  }
}

 

Counting Dataset:

  • User wants to use CounterTimeseriesTable as a sink to have counts data aggregated and stored in a table, and the time interval for the aggregation can be configured.    

 

  • No labels