Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Create an Application that uses config

1.1 Deploying the Artifact

A developer writes a configurable Application Class that uses a Flow to read from a stream and write to a Table.

Code Block
public class MyApp extends AbstractApplication<MyApp.MyConfig> {
 
  public static class MyConfig extends Config {
    @Nullable
    @Description("The name of the stream to read from. Defaults to 'A'.")
    private String stream;
 
    @Nullable
    @Description("The name of the table to write to. Defaults to 'X'.")
    private String table;
 
    private MyConfig() {
      this.stream = "A";
      this.table = "X";
    }
  }
 
  public void configure() {
    // ApplicationContext now has a method to get a custom config object whose fields will
    // be injected using the values given in the RESTful API
    MyConfig config = getContext().getConfig();
    addStream(new Stream(config.stream));
    createDataset(config.table, Table.class);
    addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
  }
}
 
public class MyFlow implements Flow {
  @Property
  private String stream;
  @Property
  private String table;
 
  MyFlow(String stream, String table) {
    this.stream = stream;
    this.table = table;
    this.flowConfig = flowConfig;
  }
 
  @Override
  public FlowSpecification configure() {
    return FlowSpecification.Builder.with()
      .setName("MyFlow")
      .setDescription("Reads from a stream and writes to a table")
      .withFlowlets()
        .add("reader", new StreamReaderReader())
      .connect()
        .fromStream(stream).to("reader")
      .build();
  }
} 
 
public class StreamReaderReader extends AbstractFlowlet {
  @Property
  private String tableName;
  private Table table;
  
  StreamReaderReader(String tableName) {
    this.tableName = tableName;
  }  

  @Override
  public void initialize(FlowletContext context) throws Exception {
    table = context.getDataset(tableName);
  }
 
  @ProcessInput
  public void process(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
    table.put(put);
  }
}
 
public class TableWriter extends AbstractFlowlet {
  private
  private OutputEmitter<Put> emitter;
  @ProcessInput
  public void process(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
    emitter.emit(put);
  }
}

 

 

...

A jar named 'myapp-1.0.0.jar' is built which contains the Application Class. The jar is deployed via the RESTful API:

Code Block
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.0.jar

Version is determined from the Bundle-Version in the artifact Manifest. It can also be provided as a header. Artifact details are now visible through other RESTful API calls:

Code Block
GET /namespaces/default/artifacts
[
  {
    "name": "myapp",
    "version": "1.0.0"
  }
]
 
GET /namespaces/default/artifacts/myapp/versions/1.0.0
{
  "name": "myapp",
  "version": "1.0.0",
  "classes": {
    "apps": [
      {
        "className": "co.cask.cdap.examples.myapp.MyApp",
        "properties": {
          "stream": { 
            "name": "stream", 
            "description": "The name of the stream to read from. Defaults to 'A'.", 
            "type": "string", 
            "required": false 
          },
          "table": {
            "name": "table",
            "description": "The name of the table to write to. Defaults to 'X'.",
            "type": "string",
            "required": false,
          }
        }
      }
    ],
    "flows": [ ... ],
    "flowlets": [ ... ],
    "datasetModules": [ ... ]
  }
}

In addition, a call can be made to get all Application Classes:

Code Block
GET /namespaces/default/appClasses
[
  {
    "className": "co.cask.cdap.examples.myapp.MyApp",
    "artifact": {
      "name": "myapp",
      "version": "1.0.0"
    }
  }
]

1.2 Creating an Application

The user decides to create an application from the deployed artifact. From the calls above, the user gathers that input and output are both configurable. The user decides to create an Application that reads from the 'purchases' stream and writes to the 'events' table.

Code Block
PUT /namespaces/default/apps/purchaseDump -H 'Content-Type: application/json' -d '
{ 
  "artifact": {
    "name": "myapp",
    "version": "1.0.0"
  },
  "config": {
    "stream": "purchases",
    "table": "events" 
  }
}'

The Application now shows up in all the normal RESTful APIs, with all its programs, streams, and datasets.

 

1.3 Updating an Application

A bug is found in the code, a fix is provided, and a 'myapp-1.0.1.jar' release is made. The artifact is deployed:

Code Block
POST /namespaces/default/artifacts/myapp --data-binary @myapp-1.0.1.jar

A call can be made to determine if there are any Applications using the older artifact:

Code Block
GET /namespaces/default/apps?artifactName=myapp&artifactVersion=1.0.0
[ "purchaseDump" ]

Calls are made to stop running programs. Another call is then made to update the app:

Code Block
POST /namespaces/default/apps/purchaseDump/update -d '
{ 
  "artifact": {
    "name": "myapp",
    "version": "1.0.1"
  },
  "config": {
    "stream": "purchases",
    "table": "events" 
  }
}'

1.4 Rolling Back an Application

Actually, version 1.0.1 has a bug that's even worse and needs to be rolled back. The same update call can be made:

Code Block
POST /namespaces/default/apps/purchaseDump/update -d '
{ 
  "artifact": {
    "name": "myapp",
    "version": "1.0.0"
  },
  "config": {
    "stream": "purchases",
    "table": "events" 
  }
}'

1.5 Deploying an Artifact and Creating an App in one step

For backwards compatibility, the deploy app API will remain the same and will internally deploy an artifact and create the app in one call. An additional header will be supported specifying the Application Config.

Code Block
POST /namespaces/default/apps --data-binary @myapp-1.0.0.jar -H 'X-App-Config: { "stream": "purchases", "table": "events" }'

2. Create an Application that uses plugins

2.1 Code changes

Now we decide to update our Application Class to support pluggable ways of reading from a stream. We do this by introducing a 'Reader' interface:

Code Block
 

 

 

 

Code Block
public class MyApp extends AbstractApplication<MyApp.MyConfig> {
 
  public static class MyConfig extends Config {
    @Nullable
    @Description("The name of the stream to read from. Defaults to 'A'.")
    private String stream;
 
    @Nullable
    @Description("The name of the table to write to. Defaults to 'X'.")
    private String table;
 
    @Name("flow")
    private MyFlowConfig flowConfig;
 
    private MyConfig() {
      this.stream = "A";
      this.table = "X";
    }
  }
 
  public void configure() {
    // ApplicationContext now has a method to get a custom config object whose fields will
    // be injected using the values given in the RESTful API
    MyConfig config = getContext().getConfig();
    addStream(new Stream(config.stream));
    createDataset(config.table, Table.class);
    addFlow(new MyFlow(config.stream, config.table, config.flowConfig));
  }
}
 
public class MyFlow implements Flow {
  @Property
  private String stream;
  @Property
  private String table;
  @Property
  private FlowConfig flowConfig;
 
  public static final FlowConfig extends Config {
    private ReaderConfig reader;
    private WriterConfig writer;
  }
 
  MyFlow(String stream, String table, FlowConfig flowConfig) {
    this.stream = stream;
    this.table = table;
    this.flowConfig = flowConfig;
  }
 
  @Override
  public FlowSpecification configure() {
    return FlowSpecification.Builder.with()
      .setName("MyFlow")
      .setDescription("Reads from a stream and writes to a table")
      .withFlowlets()
        .add("reader", new StreamReader(flowConfig.reader))
        .add("writer", new TableWriter(flowConfig.writer))
      .connect()
        .fromStream(stream).to("reader")
        .from("reader").to("writer")
      .build();
  }
} 
 
public class StreamReader extends AbstractFlowlet {
  private OutputEmitter<Put> emitter;
  @Property
  private ReaderConfig readerConfig;
  private Reader reader; 

  public static class ReaderConfig extends Config {
    @Description("The name of the reader plugin to use.")
    String name; 

    @Description("The properties needed by the chosen reader plugin.")
    @PluginType("reader")
    PluginProperties properties;
  }

  public static interface Reader {
    Put read(StreamEvent);
  }
 
  StreamReader(ReaderConfig readerConfig) {
    this.readerConfig = readerConfig;
  }
 
  @Override
  public FlowletSpecification configure() {
    // arguments are: type, name, id, properties
    usePlugin("reader", readerConfig.name, "streamReader", readerConfig.properties);
  }

  @Override
  public void initialize(FlowletContext context) throws Exception {
    reader = context.newPluginInstance("streamReader");
  }
  @ProcessInput
  public void process(StreamEvent event) {
    emitter.emit(reader.read(event));
  }
}
 
@Plugin(type = "reader")
@Name("default")
@Description("Writes timestamp and body as two columns and expects the row key to come as a header in the stream event.")
public class DefaultStreamReader implements StreamReader.Reader {
  private DefaultConfig config;
 
  public static class DefaultConfig extends PluginConfig {
    @Description("The header that should be used as the row key to write to. Defaults to 'rowkey'.")
    @Nullable
    private String rowkey;
    
    private DefaultConfig() {
      rowkey = "rowkey";
    }
  }
 
  public Put read(StreamEvent event) {
    Put put = new Put(Bytes.toBytes(event.getHeaders().get(config.rowkey)));
    put.add("timestamp", event.getTimestamp());
    put.add("body", Bytes.toBytes(event.getBody()));
    return put;
  }
}

 

...