...
Goal : Introduce the ability in CDAP Programs to create streams, datasets or register the use of Plugins.
Use Case:
Outside of ETL Artifacts - In regular CDAP applications:
Say we want to write to write a stream that is used only in a Worker. Instead of adding the Stream in an Application level, we can look add it the Worker level.Code Block public class MyApp extends AbstractApplication { @Override void configure() { addWorker(new MyWorker()); } } public class MyWorker extends AbstractWorker { @Override void configure() { addStream(new Stream("random")); setName("RandomWriter"); } @Override public void run() { for (int i = 0; i < 1000; i++) { getContext().write("random", Integer.toString( i )); } } }
Inside ETLArtifacts:
Code Block public class MyApp extends AbstractApplication<ETLBatchConfig> { @Override void configure() { addMapReduce(new ETLMapReduce(getConfig())); } } public class ETLMapReduce extends AbstractMapReduce { private ETLBatchConfig config; public ETLMapReduce(ETLBatchConfig config) { this.config = config; } @Override void configure() { BatchSource batchSource = usePlugin("batchSource", config.source.name, "source", config.source.properties); BatchSink batchSink = usePlugin("batchSink", config.sink.name, "sink", config.sink.properties); // Wrap the configurer to expose only the addition of stream/dataset/plugin and delegate it to MapReduceConfigurer ETLStageConfigurer configurer = new ETLStageConfigurer(getConfigurer()); batchSource.configurePipeline(configurer); batchSink.configurePipeline(configurer); } @Override void beforeSubmit() { // ... Invoke the prepareRun of all the plugins } }
Status Quo: Currently, we have the ability to create streams, datasets only at CDAP Application level. And Plugins can be registered only in Adapters (through AdapterConfigurer). So if we want to remove the concept of Application Templates and Adapters, we have couple of options:
i) Introduce the ability to register plugins in Application's configure method.
...
We will look at how the configure method of two program types will change:
Worker:
AbstractWorker {Code Block public class SimpleWorker extends
@Override
public void
{AbstractWorker { @Override public void configure()
{ createDataset("etlrealtimestate", KeyValueTable.class);
addStream(new Stream("hello"));
addDatasetModule("abcModule", ABCModule.class);
usePlugin("realtimesource", "kafka", "source", PluginProperties.EMPTY);
usePlugin("realtimesink", "stream", "sink", PluginProperties.EMPTY);
}} }
- MapReduce:
Very similar to what Worker looks like. Flows and Flowlets:
{Code Block public static class SimpleFlow extends AbstractFlow
public void
{{ public void configureFlow()
{ addFlowlet("abc", new SimpleFlowlet());
addStream(new Stream("hello"));
connectStream("hello", "abc");
}
}
public static class SimpleFlowlet extends AbstractFlowlet {
public
}
...
} } public static class SimpleFlowlet extends AbstractFlowlet { public void configure() { createDataset("data", KeyValueTable.class); setName("SimpleFlowlet"); } }