Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

CDAP currently captures lineage at the dataset level. With lineage, users can tell the program that read from or wrote to a dataset. It can help users determine which program wrote to/read from a dataset in a given timeframe.

However, as a platform, CDAP understands schemas for most datasets. Schemas contain fields. It would be useful to be able to drill into how a field in a particular dataset was used (CREATE/READ/WRITE/DELETE) in a given time period.

Goals

  • Provide CDAP platform support (in the form of API and storage) to track field level lineage.
  • Pipelines can then expose this functionality to the plugins.
  • Plugins (such as wrangler) will need to be updated to use this feature.

Use Cases 

Id
Use Case
FLL-1

As a data governance reviewer or information architect at a financial institution,

I would like to generate a report of how a PII field UID from the dataset DailyTransactions was consumed in the specified time period

so that

  1. UID is PII data, and I would like to know which processes accessed it in the given time period
  2. If the data is breached/compromised or improperly generated, I can understand its impact, and take appropriate remedial action
  3. I can also understand which fields may have been generated from the UID field in downstream processes, and judge the impact on those fields to take steps towards remediation
  4. I can generate compliance reports to certify that my organization does not breach contracts with third-party data providers. My licenses with third party providers require me to enforce strict retention policies on their data and any generated downstream data. I would like to ensure that those retention policies are adhered to at all times.
FLL-2

As a data scientist at a healthcare organization,

I would like to trace the provenance of the field patient_medical_score in the dataset PatientRecords over the last month

so that

  1. I can understand the true source(s) from which the patient_medical_score was generated
  2. I can understand the operations that were performed on various source fields to generate the patient_medical_score
  3. I can use this information to determine if patient_medical_score is a suitable field that can be relied upon for generating my ML model.

User Stories

  1. Spark program can perform various transformations on the input fields of the dataset to generate new fields. For example concatenate the first_name and last_name fields of the input dataset, so that resultant dataset only has Name as field. As a developer of CDAP program(for example CDAP Spark program), I should be able to provide these transformations so that I will know later about how the field Name was generated.
  2. Similar transformations on the fields can be done in the CDAP plugins as well. Plugin developer should be able to provide such transformations.
  3. Few plugins such as Javascript transform, Python transform etc execute the custom code provided by the pipeline developer. Pipeline developer in this case should be able to provide the field transformations through the plugin config UI.

Design

CDAP Platform API changes

We will add new class in the cdap-api to represent the individual field mutation.

 public class FieldMutation {
    private final Set<Field> inputFields; // Set of input fields which are participating in the current mutation operation.
    private final String mutation; // Represents the mutation that is being performed on the input fields.
    @Nullable
    private final description; // Optional description associated with the mutation
    @Nullable
    private final Field outputField; // Optional output field represents if new field is generated as a part of this mutation. This field is optional since not always the new field will be generated. For example deleting few fields from schema, or just applying some normalization to the one particular field (for example toUpper).
}

Once the FieldMutation objects are created, programs can then record these mutations through the context accessible to the programs during runtime(in the initialize method). We will expose the FieldMutationRecorder interface, which will be implemented by the runtime program context.

/**
 * This interface provides methods that will allow programs to record the field level mutations.
 */
public interface FieldMutationRecorder {
    /**
 	 * Record the field level mutations.
 	 *
 	 * @param datasetName The name of the dataset containing the fields for which the mutations need to be recorded
 	 * @param fieldMutations The list of field mutations
 	*/
    void record(String datasetName, List<FieldMutation> fieldMutations);
 
    /**
 	 * Record the field level mutations.
 	 *
 	 * @param namespace The name of the namespace containing the specified dataset
 	 * @param datasetName The name of the dataset containing the fields for which the mutations need to be recorded
 	 * @param fieldMutations The list of field mutations
 	*/
 	void record(String namespace, String datasetName, List<FieldMutation> fieldMutations);
}

In order for plugins to be able to record this information at runtime we can update the StageContext interface with the following method.

public interface StageContext {
...
    /**
 	 * Record the field level mutations.
 	 *
 	 * @param fieldMutations The list of field mutations
 	 /
	void recordFieldMutations(List<FieldMutation> fieldMutations); 
...
}

This method will then be available to the plugins in the prepareRun method where field mutations can be recorded. Note that we do not need to expose the dataset name parameter in this method. Since plugins operate on the individual records and they do not have any information about the dataset where the current record will land up. However data pipeline app knows about the datasets used in the pipeline and also the information about the DAG. It is possible to figure out from DAG which stage will lead to which dataset. So data pipeline can figure out this information and call the platform method with dataset name as an argument and field mutations accumulated from the various stages.

Pipeline config changes

For some of the plugins such as Javascript transform or Python transform, the prepareRun method is written by the plugin developer, however transform method is supplied by the pipeline developer. Since pipeline developer knows more about the transform method and can provide the field level mutations better, we will need an ability to provide the mutations via pipeline config as well.

Following are the proposed changes for the pipeline configs - 

{
 ...
    "stages" : [
		{
                "name": "Projection",
                "plugin": {
                    "name": "Projection",
                    "type": "transform",
                    "label": "Projection",
                    "artifact": {
                        "name": "core-plugins",
                        "version": "1.8.5-SNAPSHOT",
                        "scope": "SYSTEM"
                    },
                    "properties": {
                        "drop": "headers",
                        "rename": "ts:seqId,body:f1"
                    }
                },
 
                "fieldMutations": [
                    {
						"inputFields": [
							{\"name\":\"ts\",\"type\":\"long\"}
						],
						"mutation": "rename",
                        "description": "renaming timestamp to the sequence id",
						"outputField": {\"name\":\"seqId\",\"type\":\"long\"}						
					},
                    {
						"inputFields": [
							{\"name\":\"body\",\"type\":\"string\"}
						],
						"mutation": "rename",
                        "description": "renaming body to the f1",
						"outputField": {\"name\":\"f1\",\"type\":\"string\"}
					},
                    {
						"inputFields": [
							{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"keys\":\"string\",\"values\":\"string\"}
						],
						"mutation": "delete",
                        "description": "deleting headers"
					}
                ],
                "outputSchema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"seqId\",\"type\":\"long\"},{\"name\":\"f1\",\"type\":\"string\"}]}",
                "inputSchema": [
                    {
                        "name": "Stream",
                        "schema": "{\"name\":\"etlSchemaBody\",\"type\":\"record\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"},{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"keys\":\"string\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"string\"}]}"
                    }
                ],
                "type": "transform",
                "label": "Projection",
                "icon": "icon-projection",
            }
    ]
 ...
}

Scenarios for Schema propagation

  1. Configure time schema: When the pipeline is configured with the static schema, prepareRun method of the plugin will have access the schema. In this case the plugin will be able to emit the required lineage information.
  2. Dynamic schema through macro: It is possible that the schema is provided as a macro during configure time and the value for which is supplied through runtime arguments. In this case in order for schema to be available in the prepareRun method we might need some changes.[TBD]
  3. Unknown schema: It is possible to implement the custom plugins which work independent of the schema. For example consider the following pipeline - 

    File-------->CSV Parser--------->Javascript Transform-------->Custom Table Sink

    Assume that the Javascript transform just looks for the fields in the input StructuredRecord which contains "secure" in it, for example "secure_id", "secure_account" and drops them. Custom table sink takes all the fields from the input StructuredRecord and writes to a Table dataset with each individual field corresponds to the column name. In this case the schema of the Table will be variable based on the individual record. In this case even the pipeline developer wont be able to correctly specify the field mutations.

Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work

  • No labels