Hydrator Plugin Experience
Using and Installing plugins
User installs CDAP 3.3.0 by downloading the sdk or installing via packages. Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts. Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0. The user creates a pipeline that reads from cassandra and writes to a Table.
PUT /apps/cassdump -d ' { "artifact": { ... }, "config": { "source": { "name": "users", "plugin": { "name": "Cassandra", "properties": { ... } } }, "sink": { "name": "usersDump", "plugin": { "name": "Table", "properties": { ... } } } } }'
There are bugs found in the cassandra plugin, and the hydrator-plugins repo creates a new 1.0.1 release. The user wants to install the cassandra-1.0.1 plugin on CDAP 3.3.0.
User downloads or builds cassandra-1.0.1. Ideally the user can deploy the plugin jar through the UI. The user can also deploy through CLI or REST api directly. The cassandra plugin also includes a .json file for the UI, which decides how settings are rendered and the documentation to show. The user then updates the existing pipeline, which will pick up the new cassandra-1.0.1 plugin.
POST /apps/cassdump/update -d ' { "artifact": { ... } }'
After some time, CDAP 3.3.1 is released. Each CDAP release will include plugins from a specific hydrator-plugins release. So CDAP 3.3.0 may include 1.0.0 plugins, CDAP 3.3.1 may still include 1.0.2 plugins, while CDAP 3.4.0 may include 2.0.0 plugins. When the user installs CDAP, cassandra-1.0.2 will be added as well. None of the existing pipelines will change after the upgrade. They will continue to use whatever plugin version they were created with. Suppose the user wants to continue using the older 1.0.1 cassandra plugin, but wants to use the newer 1.0.2 table plugin. They can update the app config:
POST /apps/cassdump/update -d ' { "artifact": { ... }, "config": { "source": { "name": "users", "plugin": { "name": "Cassandra", "properties": { ... } "artifact": { "version": "1.0.1" } } }, "sink": { "name": "usersDump", "plugin": { "name": "Table", "properties": { ... } // If no version is given, the latest version is used. } } } }'
Note that the config is different than in 3.2.x. Name no longer refers to the plugin name, but is a unique name for that stage. There is also a object dedicated to the plugin.
Developing a new Plugin
Testing (unit and integration) a new plugin should be as easy as possible. Pain points today are with integrations with external systems like cassandra or elastic search. Ideally its not hard to add them. Maybe use Docker?
Unit test pain points are it's hard to test a source or sink in isolation, you end up copying table source or something like it into your plugin project. We could have a etl-test module that provides mock sources and sinks to help with this.
Building the sdk and packages
We want to ship cdap with plugins. This will require changes if the plugins are in a different repo than cdap itself.
To build the sdk and packages, we will have the build:
- checkout cdap
- mvn install cdap-etl
- checkout hydrator-plugins
- mvn package hydrator-plugins
- mvn command to build sdk
- mvn command to build packages
For steps 5 and 6, we can add a maven profile that takes a property that is a path for additional artifacts to include. It can copy those to the right place, then do whatever it is doing today.
New ETL Application features
Swapping between MR and Spark
A user has created a batch pipeline that runs using MapReduce.
PUT /apps/mypipeline -d ' { "artifact": { ... }, "config": { "engine": "mapreduce", ... } }'
The user would like to update the pipeline to run with Spark instead. Ideally they just change on setting:
POST /apps/mypipeline/update -d ' { "artifact": { ... }, "config": { "engine": "spark", ... } }'
If no engine is given, it defaults to mapreduce. So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.
Fork in a pipeline (support DAG)
A User wants to create a pipeline that forks in the middle. For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform. If not, we want to send it to a translate transform before sending it on to the tagger transform.
Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:
{ "config": { "source": { "name": "twitter", "plugin": { "name": "Twitter" }, ... }, "sinks": [ { "name": "sink", "plugin": { "name": "Table" }, ... } ], "transforms": [ { "name": "english filter", "plugin": { "name": "ScriptFilter" }, ... }, { "name": "non-english filter", "plugin": { "name": "ScriptFilter" }, ... }, { "name": "translator", "plugin": { "name": "Translator" }, ... }, { "name": "tagger", "plugin": { "name": "Tagger" }, ... } ], "connections": [ { "from": "twitter", "to": "english filter" }, { "from": "twitter", "to": "non-english filter" }, { "from": "non-english filter", "to": "translator" }, { "from": "translator", "to": "tagger" }, { "from": "english filter", "to": "tagger" }, { "from": "tagger", "to": "sink"} ] } }
One requirement here is that each stage will require a unique name. Today, "name" is really the plugin name. Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.
{ "config": { "source": { "name": "tweets", "plugin": { "name": "Twitter", "artifact": { "name": "etl-lib", "version": "3.3.0", "scope": "system" } }, "properties": { ... } }, ... } }
plugin artifact information will be optional. If none is given, the most recent artifact will be chosen. Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.
Realtime Stream source
We would like to be able to read from a source in realtime. Workers don't allow us to do this, but Flows do. But flows would wrap everything in a transaction. What happens if it times out? What happens if there is an external call and it fails and the flowlet retries the operation? Can you have a one flowlet flow? Or would we make it multiple flowlets? For a flow, maybe you wouldn't have to do partitioning yourself. For flow, you have access to a tick flowlet as well.
One option is to have both a worker and flow implementation, and the config specifies which one is used:
{ "config": { "engine": "worker", ... } }
vs.
{ "config": { "engine": "flow", ... } }
Note: Flow requires plugin integration with flows.
Controlling Realtime source poll frequency
Today the source is forced to sleep itself if it wants to poll less frequently. We want to be able to configure this:
{ "config": { "source": { "pollFrequency": "1s", ... } } }
Plugin configure time schema validation
Some plugins may want to perform some schema validation at configure time if the plugin does not work with all types of schemas. For example, Table sink does not support complex types. If the plugin knows the all records it gets as input always have a specific schema, it can fail early at configure time if it sees a complex type.
This assumes that for a particular pipeline, every plugin will always output records with the same schema. To do this, we can add getInputSchema() and setOutputSchema() methods to another configurer that is only for the stage and not the entire pipeline.
@Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); Schema inputSchema = stageConfigurer.getInputSchema(); // perform validation if (!isValid(inputSchema)) { throw new IllegalArgumentException("reason") } stageConfigurer.setOutputSchema(getOutputSchema(inputSchema)); }
If a plugin does not know what it's output schema will be, or if the output schema is not constant, it will return null. Plugins further in the pipeline will then get null as an input schema.
New Plugins
Kinesis source:
User wants to read from Kinesis source (AWS) and persist it into a dataset, as data in AWS kinesis stream is available only for 24hrs.
"config": { "source": { "name": "Kinesis", "properties": { "credentials": "...", "shardId" : "id442222", "limit" : 5MBps // we can skip this if we use KCL rather than the REST API (have to investigate) } .... } }
Kafka sink:
- User might want to move from existing streaming platform like kinesis to Apache Kafka?
HTTP source:
- User might want to read from an HTTP Endpoint exposed by services which provide public data, like YouTube, Foursquare, AWS API,etc instead of implementing separate clients for these.
Ingest data from existing system that serves data via HTTP API.
"config": { "source": { "name": "Http", "properties": { "method": "GET", "url": "http://example.com", "headers": "{...}", "body": "file-location" } .... } }
FTP Source:
"config": { "source": { "name": "Ftp", "properties": { "hostname": "example.com", "port" : "4433", "credentials": "...", //optional configs "page-size" : 25, "keep-alive" : 5m, // there are few parameters to set if the remote server is in different time-zone or uses non-english data,etc "language-code" : "..", "time-zone" : "..." } .... } }
Counting Dataset:
- User wants to use CounterTimeseriesTable as a sink to have counts data aggregated and stored in a table, and the time interval for the aggregation can be configured.