Metadata From Programs and Pipelines


Goal

We will like to access and emit metadata from CDAP pipelines/plugins during runtime.

User Stories

  • As a CDAP pipeline administrator, I want to able to access(read) metadata of predefined CDAP resources (entities or custom entities) during the run of the pipeline.

  • As a CDAP pipeline administrator, I want to emit(write) metadata for different CDAP resources during the run of the pipeline.

  • As a CDAP pipeline administrator, I want to propagate some or all metadata of source to the sink during the run of the pipeline.

  • As a CDAP plugin developer, I want to write a plugin which can programmatically access(read) metadata of different CDAP resources. These resources whose metadata need to be available to the plugin “may” be predefined by the plugin.

  • As a CDAP plugin developer, I want write plugin to programmatically emit metadata for custom entities. These custom entities need not be predefined by the plugin i.e. I should be able to emit metadata for any custom entity.

  • As a CDAP, plugin developer, I want write a custom plugin to programmatically propagate some or all the metadata from source and sink or it’s schema fields.

Design

Platform

MetadataService will be responsible for providing metadata to plugins

In Premise

In the case of the program runtime running on-premise, the program runtime can query the MetadataService  directly through HTTP and get the metadata. We can also cache this metadata in the program context for optimization. The MetadataService will be available in the initialize() method only.


For emitting metadata the plugin can use API exposed by the MetadataService which will just push them to TMS.

In Cloud

In case of in cloud install the MetadataService cannot talk to the MetadataStore so we will pre-populate the MetadataService with the metadata it needs before it is launched. This pre-population will happen in the prepareLaunch stage (once prepareLaunch is available). But even in cloud mode the way plugins access metadata from MetadataService remains the same. The MetadataService will be available in the initialize(). The only difference here will be that there will be an extra hook in prepareLaunch where we can say we need metadata for the following MetadataEntities, whose metadata we will ship to the MetadataService in the cloud which is local to that pipeline (Note: This is not scoped for 5.0 please see limitations section for details).


Emitting metadata will remains the same in both in-prem and cloud mode. The plugin can use API exposed by the MetadataService which will just push them to TMS in both the modes.


Program/Plugin APIs

A plugin can request for metadata by the API exposed by it which will from MetadataReaderContext and MetadataWriterContext.

https://github.com/caskdata/cdap/blob/develop/cdap-api/src/main/java/co/cask/cdap/api/metadata/MetadataReaderContext.java

https://github.com/caskdata/cdap/blob/develop/cdap-api/src/main/java/co/cask/cdap/api/metadata/MetadataWriterContext.java

Source/Sink Plugins

Source and Sinks are aware of the dataset they are operating and can access/emit metadata for these datasets or any sub-entities of these datasets. It is also possible for source and sinks to access/emit metadata for any custom entity by specifying the complete resource details. How this is done in a plugin is a contract between the plugin developer and the pipeline runner/administrator which we have have proposed later in this document.

Transforms Plugins

Transform are not aware of dataset the are operating on and hence cannot easily specify the resource whose metadata they want to access or emit. Although transform plugins can still access/emit metadata by specifying the complete resource details based on some contract between the plugin developer and the pipeline administrator similar to one for source/sink plugins.

Custom Action Plugins

Like other plugins custom actions plugins will have similar API exposed to it and it can be used to access/emit metadata and do any other kind of metadata processing by writing a custom action plugin for such case.

Specifying MetadataEntity In Plugins

The other question here is how will plugin specify which MetadataEntities’s metadata it needs. In case of the in-prem it needs to be specified while querying the MetadataService for the metadata and in case of cloud this also needed in prepareLaunch to specify which MetadataEntities metadata to populate in the MetadataService.


We purpose to specify Metadata through either runtime arguments or pipeline configs. Let’s take the runtime arguments as the choice for this design. A plugin developer while developing a plugin can create MetadataEntity using predefined runtime arguments keys. The values for this keys needs to be provided by the pipeline administrator while running the pipeline. It is the contract between the plugin developer and pipeline administrator to understand these runtime arguments used for metadata.


The below code snippet shows an example of metadata being accessed/emitted from program/pipeline


  @Override

  public void initialize(BatchRuntimeContext context) throws Exception {

    Arguments arguments = context.getArguments();

    String sourceDsName = arguments.get("source");

    String sinkDsName = arguments.get("sink");

    MetadataEntity sourceEntity = MetadataEntity.ofDataset(sourceDsName);

    MetadataEntity sinkEntity = MetadataEntity.ofDataset(sinkDsName);


    // propagate all source metadata to sink

    propogateMetadata(context, sourceEntity, sinkEntity);


    // Propagate metadata of some fields

    MetadataEntity sourceFieldEntity = sourceEntity.append("field", "empName");

    MetadataEntity sinkFieldEntity = sinkEntity.append("field", "empName");

    propogateMetadata(context, sourceFieldEntity, sinkFieldEntity);


    for (Schema.Field field : context.getInputSchema().getFields()) {

      sourceFieldEntity = sourceEntity.append("field", field.getName());

      Metadata metadata = context.getMetadata(MetadataScope.USER, sourceFieldEntity);

      if (metadata.getTags().contains("confidential")) {

        // do some thing

      }

    }

  }


Authorization and Auditing

Authorization and auditing for metadata access from pipeline/plugins is different than metadata access from REST/CLI/Clients etc. The below flowchart summarizes when authorization and auditing will be done when a program/pipeline access/emits metadata. 

Emit

The window of race condition in the below diagram is a time window where the privilege might get changed before the metadata change is read from TMS and stored in HBase. For example consider a pipeline is running as user "alice" and emitting metadata for a dataset called "employee" on which user "alice" has privilege. Due to some cluster issue either HBase was down and metadata changes are not synced up with TMS or the consumer is lagging behind leading to bigger window of time in which privilege changes can happen and in this case it did happen and now "alice" does not have privilege on the dataset. In this case the consumer who is responsible for reading the metadata changes and storing it on HBase will drop the the change and record it in log as a warning (ToDo: Consider if it it safe to log the metadata change information in plain text in a log file? What if metadata has sensitive information which need authorization?)

Read (In-Prem only)

The below flow chart is for in-prem read only. Reading metadata from program and pipeline in cloud mode is out of scope of 5.0 see limitations section for details.

Limitations

  • As discussed in the design section of the cloud mode to be able to access metadata in cloud we will need the plugin to declare the resources whose metadata it will need beforehand i.e. in prepareLaunch() which will run on cdap master side. Currently prepareLaunch() or any similar API which runs on cdap master is not scheduled for 5.0 so pipelines will not be able to read metadata in cloud mode although they can emit/write metadata. 

  • In 5.0 it will not be possible to to access/emit metadata on record level basis i.e they will not be available in transform() of the plugin. If this functionality is needed we can make MetadataService available in the transform() of plugins.

  • It is possible for the data pipeline app to  initialize the MetadataService with some default MetadataEntity and also propagates the Metadata of those from source to sink. In 5.0 due to lack of time we will not do any such default behavior. We can always add them later on once we have seen some use cases and know what defaults are common. In 5.0 we will expect the user to write the code to do such propagation.