Hydrator 3.3.0 User Stories
Hydrator Plugin Experience
Installing custom plugins
In CDAP 3.2, installing Hydrator plugins requires uploading the artifact containing the plugins, placing a .json the UI uses to configure widgets into a special folder on the UI machine, and adding docs to cdap's website.
This is problematic for a few reasons:
1. Too complex. There are multiple steps required, and they need to happen to multiple systems (against cdap and cdap-ui).
2. The same UI configs are shared across all plugin artifacts, which is incorrect. Artifacts are versioned, and different versions may require different configs. Additionally, artifacts with the same name and version may still be different if they are in different namespaces
3. No way to add docs since they are on the Cask hosted docs page
The problem is that widgets and docs don't belong in CDAP, but they also don't really belong in the UI either. The UI can live on multiple machines, and also don't want to force operations on the filesystem when a user is simply installing a new set of plugins. Long term, what we really would want is a Hydrator backend, which is an app that manages pipelines, plugins, etc. In the shorter term, it would be desirable if there were some generic CDAP feature that could allow Hydrator to use CDAP to serve the widget and doc information.
The plan then, is to add properties to artifacts. Hydrator can store the widgets and docs for each plugin in the properties of the artifact containing that plugin. You can already set properties on streams and datasets, this would be an analogous feature. This solves the 3 problems mentioned above, while keeping CDAP ignorant of widgets and UI specific things.
GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties GET /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}?keys=key1,key2 PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property} PUT /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties DELETE /namespaces/{namespace-id}/artifacts/{artifact-name}/versions/{artifact-version}/properties/{property}
Installing a set of plugins could then be a single command:
load artifact <path/to/artifact> [config-file <config-file>]
with properties added to the config file, and something of the form:
{ "properties": { "widgets.batchsource.database": "<widgets json>", "doc.batchsource.database": "<doc>", ... } }
Installing plugins from hydrator-plugins repository
To install the plugins from hydrator-plugins, a user simply installs the hydrator-plugins rpm or deb. In addition, cdap-site.xml will have to set the app.artifact.dir setting to include the directory for those plugins (/opt/hydrator-plugins/artifacts).
The work for this will include setting up the hydrator-plugins build to create the rpms and debs. The pom will be changed so that in addition to the jars, the config json will be created. The json files will be created using an antrun script target that writes out the parents for the artifact, as well as properties for widget and doc information. Prepare phases can then copy jars and jsons to the right place for packaging to be done.
The CDAP build will still need to bundle the results of the hydrator-plugins build in order to build a standalone zip that contains hydrator plugins. To accomplish this, we will add a property ('-Dadditional.artifacts.dir=</path/to/additional/artifacts>') that can be used to copy any additional jars and jsons from some external directory. This property can then be used in standalone builds to pull in the hydrator-plugins artifacts and configs.
Plugin Selection
User installs CDAP 3.3.0 by downloading the sdk or installing via packages. Included in CDAP is the Hydrator webapp, cdap-etl-batch-3.3.0, and cdap-etl-realtime-3.3.0 artifacts. Also included are all the v1.0.0 plugins in the hydrator-plugins repo, like cassandra-1.0.0. The user creates a pipeline that reads from cassandra and writes to a Table.
PUT /apps/cassdump -d ' { "artifact": { ... }, "config": { "source": { "name": "users", "plugin": { "name": "Cassandra", "properties": { ... } } }, "sinks": [ { "name": "usersDump", "plugin": { "name": "Table", "properties": { ... } } } ] } }'
There are bugs found in the cassandra plugin, and the hydrator-plugins repo creates a new 1.0.1 release. The user wants to install the cassandra-1.0.1 plugin on CDAP 3.3.0.
User downloads or builds cassandra-1.0.1. Ideally the user can deploy the plugin jar through the UI. The user can also deploy through CLI or REST api directly. The cassandra plugin also includes a .json file for the UI, which decides how settings are rendered and the documentation to show. The user then updates the existing pipeline, which will pick up the new cassandra-1.0.1 plugin.
POST /apps/cassdump/update -d ' { "artifact": { ... } }'
After some time, CDAP 3.3.1 is released. Each CDAP release will include plugins from a specific hydrator-plugins release. So CDAP 3.3.0 may include 1.0.0 plugins, CDAP 3.3.1 may still include 1.0.2 plugins, while CDAP 3.4.0 may include 2.0.0 plugins. When the user installs CDAP, cassandra-1.0.2 will be added as well. None of the existing pipelines will change after the upgrade. They will continue to use whatever plugin version they were created with. Suppose the user wants to continue using the older 1.0.1 cassandra plugin, but wants to use the newer 1.0.2 table plugin. They can update the app config:
POST /apps/cassdump/update -d ' { "artifact": { ... }, "config": { "source": { "name": "users", "plugin": { "name": "Cassandra", "properties": { ... } "artifact": { "version": "1.0.1" } } }, "sinks": [ { "name": "usersDump", "plugin": { "name": "Table", "properties": { ... } // If no version is given, the latest version is used. } } ] } }'
Note that the config is different than in 3.2.x. Name no longer refers to the plugin name, but is a unique name for that stage. There is also a object dedicated to the plugin.
Developing a new Plugin
Testing (unit and integration) a new plugin should be as easy as possible. Pain points today are with integrations with external systems like cassandra or elastic search. Ideally its not hard to add them. Maybe use Docker?
Unit test pain points are it's hard to test a source or sink in isolation, you end up copying table source or something like it into your plugin project. We could have a etl-test module that provides mock sources and sinks to help with this.
Building the sdk and packages
We want to ship cdap with plugins. This will require changes if the plugins are in a different repo than cdap itself.
To build the sdk and packages, we will have the build:
- checkout cdap
- mvn install cdap-etl
- checkout hydrator-plugins
- mvn package hydrator-plugins
- mvn command to build sdk
- mvn command to build packages
For steps 5 and 6, we can add a maven profile that takes a property that is a path for additional artifacts to include. It can copy those to the right place, then do whatever it is doing today.
Upgrading Hydrator
We are making backwards incompatible changes to the etl config. This will require a Hydrator upgrade, which will have its own upgrade process that is not tied to the cdap upgrade process.
The Hydrator upgrade tool will have a command that converts an old config into a new config. It will also have a command that will take cdap host/port/credential information and convert all 3.2.x pipelines into 3.3.x pipelines.
New ETL Application features
Swapping between MR and Spark
A user has created a batch pipeline that runs using MapReduce.
PUT /apps/mypipeline -d ' { "artifact": { ... }, "config": { "engine": "mapreduce", ... } }'
The user would like to update the pipeline to run with Spark instead. Ideally they just change on setting:
POST /apps/mypipeline/update -d ' { "artifact": { ... }, "config": { "engine": "spark", ... } }'
If no engine is given, it defaults to mapreduce. So far, it looks like the plugins will not need to change, that plugins that work for mapreduce will also work for spark.
DAG in a pipeline
A User wants to create a pipeline that forks in the middle. For example, the pipeline reads from twitter. If the tweet is english, we want to run it through an english sentiment tagger transform. If not, we want to send it to a translate transform before sending it on to the tagger transform.
Use case 1 (Fork):
The pipeline reads from some fileset. It writes to an avro fileset and a database. Before writing to the db, it needs to perform some lookups to add a couple fields.
Forks are not conditional, the same data gets sent to all forks. This is represented in the config as:
{ "config": { "source": { "name": "fileset", "plugin": { "name": "File", "properties": { ... } }, ... }, "sinks": [ { "name": "DB Sink", "plugin": { "name": "Database", "properties": { ... } }, ... }, { "name": "avro fileset", "plugin": { "name": "TPFSAvro", "properties": { ... } }, ... } ], "transforms": [ { "name": "lookup transform", "plugin": { "name": "Script" }, ... } ], "connections": [ { "from": "fileset", "to": "lookup transform" }, { "from": "lookup transform", "to": "DB Sink" }, { "from": "fileset", "to": "avro fileset" }, ] } }
One requirement here is that each stage will require a unique name. Today, "name" is really the plugin name. Since we need to support versions for plugins, we are going to change the config api to have a separate "plugin" object, and for "name" to be the actual unique name of the stage.
{ "config": { "source": { "name": "tweets", "plugin": { "name": "Twitter", "properties": { ... } "artifact": { "name": "etl-lib", "version": "3.3.0", "scope": "system" } }, } ... } }
plugin artifact information will be optional. If none is given, the most recent artifact will be chosen. Artifact fields are also optional. It will go through artifacts until it finds the first one that matches all fields given.
Use case 2 (selector):
The pipeline reads from twitter. If the tweet is english, we want to run it through an english categorizer transform. If not, we want to send it to a translate transform before sending it on to the categorizer transform. If the translate transform is unable to translate, we want to write the record to an error dataset. Also, if the tweet is categorized as spam, we want to write the record to the error dataset. Otherwise, we want to write our translated, categorized tweet to a Table. This could be represented purely with forks:
This seems unnatural though. After every fork, we have a filter on both paths, with each filter the 'not' of the other filter. Seems more natural to have a selector:
{ "connections": [ { "from": "twitter source", "to": "language tagger" }, { "from": "language tagger", "selector": { //switch based on the value of the "lang" field "field": "lang", "switch": { //output stage -> field value "categorizer": "en" }, // optional. if doesn't match anything in outputs, go here. // if absent, record is dropped "default": "translator" } }, { "from": "translator", "selector": { "field": "lang", "switch": { "categorizer": "en" }, "default": "invalid tweets table" } }, { "from": "categorizer", "selector": { // switch based on the value returned by the script "script": "function (input) { return input.spam; }", "switch": { "categorized tweets table": true, "invalid tweets table": false } } } ] }
This would not support a use case where we would want a record to go to multiple outputs. For example, suppose we are reading from an employees table, and we want to write the employee salary to a table that groups salaries by several categories. If an employee is over a certain age, we want to lookup their retirement plan. If the employee's nationality is x, y, or z, we want to do an immigration status lookup. No matter what, we want to categorize employee performance before writing to the table:
For a use case like this, we could introduce an 'if' condition to the connection.
{ "connections": [ { "from": "employees table", "to": "retirement plan lookup", "if": { "script": "function (input) { return input.age > 65; }", "scriptEngine": "javascript" } }, { "from": "employees table", "to": "immigration status lookup", "if": { "script": "function (input) { return input.nationality == x || input.nationality == y || input.nationality == z; }", "scriptEngine": "javascript" } }, { "from": "employees table", "to": "performance categorizer" }, { "from": "performance categorizer", "to": "salary by category table" }, { "from": "immigration status lookup", "to": "salary by category table" }, { "from": "retirement plan lookup", "to": "salary by category table" } ] }
This same thing could also be represented as a fork with a filter on the top and bottom branches. So we may not do this.
Realtime Stream source
We may just add the ability to read from a stream to a worker instead of this.
We would like to be able to read from a source in realtime. Workers don't allow us to do this, but Flows do. But flows would wrap everything in a transaction. What happens if it times out? What happens if there is an external call and it fails and the flowlet retries the operation? Can you have a one flowlet flow? Or would we make it multiple flowlets? For a flow, maybe you wouldn't have to do partitioning yourself. For flow, you have access to a tick flowlet as well.
One option is to have both a worker and flow implementation, and the config specifies which one is used:
{ "config": { "engine": "worker", ... } }
vs.
{ "config": { "engine": "flow", ... } }
Note: Flow requires plugin integration with flows.
Controlling Realtime source poll frequency
Today the source is forced to sleep itself if it wants to poll less frequently. We want to allow the source to change this. For example, it may want to implement exponential backoff if the system it is polling is down. To accomplish this, we can add a couple methods to RealtimeContext to get and set the pollDelay:
public interface RealtimeContext extends TransformContext { ... // set poll frequency. Will try to run every x amount of time void setPollFrequency(long freg); // set poll delay in the given time unit void setPollFrequency(long duration, TimeUnit unit); void getPollFrequency(); // get poll delay in the given time unit long getPollFrequency(TimeUnit unit); // set poll delay in milliseconds void setPollDelay(long duration); // set poll delay in the given time unit void setPollDelay(long duration, TimeUnit unit); // get poll delay in milliseconds long getPollDelay(); // get poll delay in the given time unit long getPollDelay(TimeUnit unit); } @Override public abstract SourceState poll(Emitter<T> writer, SourceState currentState) throws Exception { try { // perform request context.setPollDelay(100); } catch (Exception e) { context.setPollDelay(context.getPollDelay() * 2); return currentState; } }
Plugin configure time schema validation
Some plugins may want to perform some schema validation at configure time if the plugin does not work with all types of schemas. For example, Table sink does not support complex types. If the plugin knows the all records it gets as input always have a specific schema, it can fail early at configure time if it sees a complex type.
This assumes that for a particular pipeline, every plugin will always output records with the same schema. To do this, we can add getInputSchema() and setOutputSchema() methods to another configurer that is only for the stage and not the entire pipeline.
@Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); // perform validation if (!isValid(stageConfigurer.getInputSchema())) { throw new IllegalArgumentException("reason") } stageConfigurer.setOutputSchema(getOutputSchema(inputSchema)); }
If a plugin does not know what it's output schema will be, or if the output schema is not constant, it will return null. Plugins further in the pipeline will then get null as an input schema.
Stage name in log messages
When a user logs something in a plugin, the message has the classname but not the stage name. So if a pipeline contains multiple stages that use the same plugin, it's hard to tell which message is from which stage.
New CDAP Features
App Verification
After 3.2, a user can deploy an artifact, then create an app by specifying an artifact and a configuration. At configure time, an app can perform some validation on its config and throw an exception to fail app creation. Sometimes, it would be useful just to run the configure() method without actually creating any streams, datasets, or app, in order to let the app validate its config. For example, in Hydrator, there is a validate button, which is implemented completely in the UI. The UI really shouldn't be validating anything, as all that logic is specific to the application code.
We could let the application throw a specific exception for cases where its config is invalid
public class InvalidConfigurationException extends RuntimeException { // payload gets returned in the http response. private final String payload; public InvalidConfigurationException(String message) { this(message, null); } public InvalidConfigurationException(String message, @Nullable String payload) { super(message); this.payload = payload; } } public interface Application<T extends Config> { /** * Configures the Application. * * @param configurer Collects the Application configuration * @param context Used to access the environment, application configuration, and application (deployment) arguments * @throws InvalidConfigurationException if the config is invalid */ void configure(ApplicationConfigurer configurer, ApplicationContext<T> context); }
If thrown, app fabric will return a 400 with the message of the exception. We can also add a dryrun option to the create app request that will call configure() without creating anything. In the end, it returns the ApplicationSpecification, which is sent as the output of the request.
PUT /apps/{app-name} -H 'Content-Type: application/json' -d '{ "artifact": { ... }, "config": { ... }, "dryrun": true }' { "message": "....", "payload": "..." }
Specific to Hydrator, the etl apps could put the 'pipeline spec', which contains each stage, the artifact used for each plugin, and the input/output schema for each stage, in a program property. The UI could then read this information as well, instead of trying to manage schema themselves.
New Plugins
Kinesis source:
User wants to read from Kinesis source (AWS) and persist it into a dataset, as data in AWS kinesis stream is available only for 24hrs.
"config": { "source": { "name": "MyKinesisSource", "plugin": { "name": "Kinesis", "properties": { "credentials": "...", "shardId" : "id442222", "limit" : 5MBps // we can skip this if we use KCL rather than the REST API (have to investigate) } } .... } }
Kafka sink:
- User might want to move from existing streaming platform like kinesis to Apache Kafka?
HTTP source:
- User might want to read from an HTTP Endpoint exposed by services which provide public data, like YouTube, Foursquare, AWS API,etc instead of implementing separate clients for these.
Ingest data from existing system that serves data via HTTP API.
"config": { "source": { "name": "ExampleHTTPSource", "plugin": { "name": "Http", "properties": { "method": "GET", "url": "http://example.com", "headers": "{...}", "body": "file-location" } } .... } }
FTP Source:
"config": { "source": { "name": "FTPExampleSource", "plugin": { "name": "Ftp", "properties": { "hostname": "example.com", "port" : "4433", "credentials": "...", //optional configs "page-size" : 25, "keep-alive" : 5m, // there are few parameters to set if the remote server is in different time-zone or uses non-english data,etc "language-code" : "..", "time-zone" : "..." } } .... } }
Counting Dataset:
- User wants to use CounterTimeseriesTable as a sink to have counts data aggregated and stored in a table, and the time interval for the aggregation can be configured.