Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Really, the goal of templates was to be able to write one piece of Application code that could be used to create multiple Applications. To do this requires that an Application can be configured at creation time instead of at compile time. For example, a user should be able to set the name of their dataset based on configuration instead of hardcoding it in the code. To support this, we plan on making it possible to get a configuration object from the ApplicationContext available in Application's configure() method. This allows somebody to pass in a config when creating an Application through the RESTful API, which can be used to configure an Application. The relevant programmatic API changes are shown below, with an example of how they might be used. We will use this example to walk through some use cases.

 

Code Block
//-------------- CDAP API changes --------------
public interface ApplicationContext<T extends Config> {
  T getConfig();
}

public interface Application<T extends Config> {
  void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
}
 
public abstract class AbstractApplication<T> implements Application<T extends Config> {
  ...
  protected final ApplicationContext<T> getContext() { return context; }
}
 
//-------------- Example Application --------------

 

Use Case Walkthrough

 We will use this example to walk through some use cases.

 

Code Block
public class MyApp extends AbstractApplication<MyApp.MyConfig> {

...

 
  public static class MyConfig extends Config {
    @Nullable
    @Description("The name of the stream to read from. Defaults to 'A'.")
    private String stream;
 

...


    @Nullable
    @Description("The name of the table to write to. Defaults to 'X'.")
    private String table;

...

 
    @Name("flow")
    private MyFlowConfig flowConfig;
 

...


    private MyConfig() {
      this.stream = "A";
      this.table = "X";
    }
  }

...

 
  public void configure() {
    // ApplicationContext now has a method to get a custom config object whose fields will
    // be injected using the values given in the RESTful API
    MyConfig config = getContext().getConfig();
    addStream(new Stream(config.stream));
    createDataset(config.table, Table.class);
    addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
  }
}

...

 
public class MyFlow implements Flow {
  @Property
  private String stream;
  @Property
  private String table;
  @Property
  private FlowConfig flowConfig;
 

...


  public static final FlowConfig extends Config {
    private ReaderConfig reader;
    private WriterConfig writer;
  }

...

 
  MyFlow(String stream, String table, FlowConfig flowConfig) {
    this.stream = stream;
    this.table = table;
    this.flowConfig = flowConfig;
  }

...

 
  @Override
  public FlowSpecification configure() {
    return FlowSpecification.Builder.with()
      .setName("MyFlow")
      .setDescription("Reads from a stream and writes to a table")
      .withFlowlets()
        .add("reader", new StreamReader(flowConfig.reader))
        .add("writer", new TableWriter(flowConfig.writer))
      .connect()
        .fromStream(stream).to("reader")
        .from("reader").to("writer")
      .build();
  }
} 

...

 
public class StreamReader extends AbstractFlowlet {
  private OutputEmitter<Put> emitter;
  @Property
  private ReaderConfig readerConfig;
  private Reader reader;

...

 

  public static class ReaderConfig extends Config {
    @Description("The name of the reader plugin to use.")
    String name; 

    @Description("The properties needed by the chosen reader plugin.")
    @PluginType("reader")
    PluginProperties properties;
  }

  public static interface Reader {
    Put read(StreamEvent);
  }
 

...


  StreamReader(ReaderConfig readerConfig) {
    this.readerConfig = readerConfig;
  }

...

 
  @Override
  public FlowletSpecification configure() {
    // arguments are: type, name, id, properties
    usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties);
  }

  @Override
  public void initialize(FlowletContext context) throws Exception {
    reader = context.newPluginInstance("streamReader");
  }
  @ProcessInput
  public void process(StreamEvent event) {
    emitter.emit(reader.read(event));
  }
}

...

 
@Plugin(type = "reader")
@Name("default")
@Description("Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.")
public class DefaultStreamReader implements StreamReader.Reader {
  private DefaultConfig config;

...

 
  public static class DefaultConfig extends PluginConfig {
    @Description("The header that should be used as the row key to write to. Defaults to 'rowkey'.")
    @Nullable
    private String rowkey;
    
    private DefaultConfig() {
      rowkey = "rowkey";
    }
  }

...

 
  public Put read(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
    return put;
  }
}

 

...

1. Deploying an Artifact

A development team creates a project built on top of CDAP. Their CI build runs and produces a jar file. An administrator deploys the jar by making a REST call:

...