Streams/Datasets/Plugins creation in CDAP Programs
Â
Goal : Introduce the ability in CDAP Programs to create streams, datasets or register the use of Plugins.Â
Use Case:
Outside of ETL Artifacts - In regular CDAP applications:
Say we want to write to write a stream that is used only in a Worker. Instead of adding the Stream in an Application level, we can look add it the Worker level.public class MyApp extends AbstractApplication { @Override void configure() { addWorker(new MyWorker()); } } public class MyWorker extends AbstractWorker { @Override void configure() { addStream(new Stream("random")); setName("RandomWriter"); } @Override public void run() { for (int i = 0; i < 1000; i++) { getContext().write("random", Integer.toString( i )); } } }
Inside ETLArtifacts:
Âpublic class MyApp extends AbstractApplication<ETLBatchConfig> { @Override void configure() { addMapReduce(new ETLMapReduce(getConfig())); } } Â public class ETLMapReduce extends AbstractMapReduce { private ETLBatchConfig config; public ETLMapReduce(ETLBatchConfig config) { this.config = config; } Â @Override void configure() { BatchSource batchSource = usePlugin("batchSource", config.source.name, "source", config.source.properties); BatchSink batchSink = usePlugin("batchSink", config.sink.name, "sink", config.sink.properties); // Wrap the configurer to expose only the addition of stream/dataset/plugin and delegate it to MapReduceConfigurer ETLStageConfigurer configurer = new ETLStageConfigurer(getConfigurer()); batchSource.configurePipeline(configurer); batchSink.configurePipeline(configurer); } @Override void beforeSubmit() { // ... Invoke the prepareRun of all the plugins } }Â
Status Quo: Currently, we have the ability to create streams, datasets only at CDAP Application level. And Plugins can be registered only in Adapters (through AdapterConfigurer). So if we want to remove the concept of Application Templates and Adapters, we have couple of options:
i) Introduce the ability to register plugins in Application's configure method.
- ETL Plugins need to have the ability to create streams/datasets.Â
- Pros: Don't need to depend on the programs to have the ability to add streams/datasets.
- Cons:Â
- Not the intuitive place to include it. For example, in the case of ETLRealtime, ETLBatch applications, we use these features in ETLWorker/ETLMapReduce and so it would be more intuitive if we do this in programs instead of Applications
- Will have to use Program properties to pass the plugin names to the programs to instantiate them and use them in the programs
ii) Â Introduce the ability to create streams/datasets/register plugins in CDAP Programs:
- Through the Program Configurers, users can create streams/datasets/register plugins etc in CDAP Programs (think more like local variables - create it when you need it)
- Pros: Simplifies some applications code since create and use it only where it is needed. Simplifies ETLRealtime, ETLBatch applications.Â
- Cons:Â
- Streams/Datasets created are NOT local variables since they are accessible to all applications/programs in that namespace
- Logic to handle creation of streams/datasets (and possibly with different properties) in different  places -> should we disallow it or allow it as long as it has the same properties
- Some ambiguous options in programs like Flows, Services, Workflows -> should we allow addition of streams/datasets in them or only in Flowlets/ServiceHandlers? (For example, stream connections are made only in Flows and not in Flowlets, even though Flows are just a collection of Flowlets)
- WorkflowAction cannot support these changes since it uses builder pattern (there is a JIRA filed for this).
- In some programs, certain features might not be useful. For example, creating a stream in a Service is useless since there is no way to use it programmatically in a Service/ServiceHandler
- Have to add Datasets created in the configure method to useDatasets of Programs which don't use DynamicDatasetContext.
- Streams/Datasets are local to a namespace but Plugins will be local to an application
 Assuming we go with option ii), we propose the following API changes:
We will look at how the configure method of two program types will change:
Â
Worker:
public class SimpleWorker extends AbstractWorker { @Override public void configure() { createDataset("etlrealtimestate", KeyValueTable.class); addStream(new Stream("hello")); addDatasetModule("abcModule", ABCModule.class); usePlugin("realtimesource", "kafka", "source", PluginProperties.EMPTY); usePlugin("realtimesink", "stream", "sink", PluginProperties.EMPTY); } }
- MapReduce:
Very similar to what Worker looks like. Â Flows and Flowlets:Â
public static class SimpleFlow extends AbstractFlow { public void configureFlow() { addFlowlet("abc", new SimpleFlowlet()); addStream(new Stream("hello")); connectStream("hello", "abc"); } } public static class SimpleFlowlet extends AbstractFlowlet { public void configure() { createDataset("data", KeyValueTable.class); setName("SimpleFlowlet"); } }
Â
Â
Â