Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
//-------------- CDAP API changes --------------
public interface ApplicationContext<T extends Config> {
  T getConfig();
}

public interface Application<T extends Config> {
  void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
}
 
public abstract class AbstractApplication<T> implements Application<T extends Config> {
  ...
  protected final ApplicationContext<T> getContext() { return context; }
}
 
//-------------- Example Application --------------
public class MyApp extends AbstractApplication<MyApp.MyConfig> {
 
  public static class MyConfig extends Config {
    @Nullable
    @Description("The name of the stream to read from. Defaults to 'A'.")
    private String stream;
 
    @Nullable
    @Description("The name of the table to write to. Defaults to 'X'.")
    private String table;
 
    @Name("flow")
    private MyFlowConfig flowConfig;
 
    private MyConfig() {
      this.stream = "A";
      this.table = "X";
    }
  }
 
  public void configure() {
    // ApplicationContext now has a method to get a custom config object whose fields will
    // be injected using the values given in the RESTful API
    MyConfig config = getContext().getConfig();
    addStream(new Stream(config.stream));
    createDataset(config.table, Table.class);
    addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
  }
}
 
public class MyFlow implements Flow {
  @Property
  private String stream;
  @Property
  private String table;
  @Property
  private FlowConfig flowConfig;
 
  public static final FlowConfig extends Config {
    private ReaderConfig reader;
    private WriterConfig writer;
  }
 
  MyFlow(String stream, String table, FlowConfig flowConfig) {
    this.stream = stream;
    this.table = table;
    this.flowConfig = flowConfig;
  }
 
  @Override
  public FlowSpecification configure() {
    return FlowSpecification.Builder.with()
      .setName("MyFlow")
      .setDescription("Reads from a stream and writes to a table")
      .withFlowlets()
        .add("reader", new StreamReader(flowConfig.reader))
        .add("writer", new TableWriter(flowConfig.writer))
      .connect()
        .fromStream(stream).to("reader")
        .from("reader").to("writer")
      .build();
  }
} 
 
public class StreamReader extends AbstractFlowlet {
  private OutputEmitter<Put> emitter;
  @Property
  private ReaderConfig readerConfig;
  private Reader reader; 

  public static class ReaderConfig extends Config {
    @Description("The name of the reader plugin to use.")
    String name; 

    @Description("The properties needed by the chosen reader plugin.")
    @PluginType("reader")
    PluginProperties properties;
  }

  public static interface Reader {
    Put read(StreamEvent);
  }
 
  StreamReader(ReaderConfig readerConfig) {
    this.readerConfig = readerConfig;
  }
 
  @Override
  public FlowletSpecification configure() {
    // arguments are: type, name, id, properties
    usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties);
  }

  @Override
  public void initialize(FlowletContext context) throws Exception {
    reader = context.newPluginInstance("streamReader");
  }
  @ProcessInput
  public void process(StreamEvent event) {
    emitter.emit(reader.read(event));
  }
}

 

Use Case Walkthrough

1. Deploying an Artifact

User builds their application jar the same way they build it today. They make a call to deploy their artifact (jar).

Code Block
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.0.jar

CDAP opens the jar, figures out the bundle-version as the artifact version, figures out what apps, programs, datasets, and plugins are in the artifact, then stores the artifact on the filesystem and metadata in a table.

The user can examine the metadata by making a call:

Code Block
GET /namespaces/default/artifacts/myapp/versions/1.0.0
 
{
 
@Plugin(type = "reader")
@Name("default")
@Description("Default stream reader. Expects the row key to come as a header.")
public class DefaultStreamReader implements StreamReader.Reader {
  private DefaultConfig config;
 
  public static class DefaultConfig extends PluginConfig {
    private String rowkey;
    
    private DefaultConfig() {
      rowkey = "rowkey";
    }
  }
 
  public Put read(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
  }
}

 

Use Case Walkthrough

1. Deploying an Artifact

User builds their application jar the same way they build it today. They make a call to deploy their artifact (jar).

Code Block
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.0.jar

CDAP opens the jar, figures out the bundle-version as the artifact version, figures out what apps, programs, datasets, and plugins are in the artifact, then stores the artifact on the filesystem and metadata in a table.

The user can examine the metadata by making a call:

Code Block
GET /namespaces/default/artifacts/myapp/versions/1.0.0
 
{
  "name": "purchase",
  "version": "3.1.0",
  "meta": {
    "created": "1234567890000",
    ...
  },
  "classes": {
    "apps": [
      {
        "className": "co.cask.cdap.examples.myapp.MyApp",
        "properties": {
          "stream": { 
            "name": "stream", 
            "description": "The name of the stream to read from. Defaults to 'A'.", 
            "type": "string", 
            "required": false 
          },
          "table": {
            "name": "table",
            "description": "The name of the table to write to. Defaults to 'X'.",
            "type": "string",
            "required": false,
          },
          "flowConfig": {
            "name": "flow",
            "description": "",
            "type": "config",
            "fields": {
              "reader": {
                "name": "purchasereader",
                "versiondescription": "3.1.0",
   "meta": {              "createdtype": "1234567890000config",
    ...   },   "classes": {     "appsrequired": [true,
       {         "classNamefields": "co.cask.cdap.examples.myapp.MyApp",{
        "properties": {
          "streamname": { 
                    "name": "streamname", 
                    "description": "The name of the stream to read from. Defaults to 'A'.", reader plugin to use.",
                    "type": "string", 

           "required": false         "required": true
 },           "table": {     },
       "name": "table",             "descriptionproperties": "The name{
of the table to write to. Defaults to 'X'.",               "typename": "stringproperties",
            "required": false,       "description": "The properties needed },by the chosen reader plugin.",
      "flowConfig": {             "nametype": "flowplugin",
            "description": "",             "typeplugintype": "configreader",
            ""        "required": true
              }    }
        "fields":  {      }
      "id": { "name": "id", "type": "long", "required": true },
              "digitswriter": { "name": "phoneNumber", "type": "string", "required": true... }
            }
          }
        }
      }
    ],
    "plugins": [
    ]
  }
}

...