CDAP currently captures lineage at the dataset level. With lineage, users can tell the program that read from or wrote to a dataset. It can help users determine which program wrote to/read from a dataset in a given timeframe.

However, as a platform, CDAP understands schemas for most datasets. Schemas contain fields. It would be useful to be able to drill into how a field in a particular dataset was used (CREATE/READ/WRITE/DELETE) in a given time period.


  • Provide CDAP platform support (in the form of API and storage) to track field level lineage.
  • Pipelines can then expose this functionality to the plugins.
  • Plugins (such as wrangler) will need to be updated to use this feature.

Use Cases 

Use Case

As a data governance reviewer or information architect at a financial institution,

I would like to generate a report of how a PII field UID from the dataset DailyTransactions was consumed in the specified time period

so that

  1. UID is PII data, and I would like to know which processes accessed it in the given time period
  2. If the data is breached/compromised or improperly generated, I can understand its impact, and take appropriate remedial action
  3. I can also understand which fields may have been generated from the UID field in downstream processes, and judge the impact on those fields to take steps towards remediation
  4. I can generate compliance reports to certify that my organization does not breach contracts with third-party data providers. My licenses with third party providers require me to enforce strict retention policies on their data and any generated downstream data. I would like to ensure that those retention policies are adhered to at all times.

As a data scientist at a healthcare organization,

I would like to trace the provenance of the field patient_medical_score in the dataset PatientRecords over the last month

so that

  1. I can understand the true source(s) from which the patient_medical_score was generated
  2. I can understand the operations that were performed on various source fields to generate the patient_medical_score
  3. I can use this information to determine if patient_medical_score is a suitable field that can be relied upon for generating my ML model.

User Stories

  1. Spark program can perform various transformations on the input fields of the dataset to generate new fields. For example concatenate the first_name and last_name fields of the input dataset, so that resultant dataset only has Name as field. As a developer of CDAP program(for example CDAP Spark program), I should be able to provide these transformations so that I will know later about how the field Name was generated.
  2. Similar transformations on the fields can be done in the CDAP plugins as well. Plugin developer should be able to provide such transformations.
  3. Few plugins such as Javascript transform, Python transform etc execute the custom code provided by the pipeline developer. Pipeline developer in this case should be able to provide the field transformations through the plugin config UI.


Consider a following pipeline: 

Code Block

Consider a following sample structured record which was processed by the pipeline along with the operations that happened on the fields:

Pipeline StageFields Emitted with valuesField Level Operations
Filebody: John,Smith,31,Santa Clara,CA

operation: read

input: file

output: body

description: read the file to generate the body field


first_name: John

last_name: Smith

age: 31

city: Santa Clara

state: CA

operation: parse

input: body

output: first_name, last_name, age, city, state

description: parsed body field


name: John Smith

age: 31

city: Santa Clara

state: CA

operation: concat

input: first_name, last_name

output: name

description: concatenate first_name and last_name fields


operation: drop

input: first_name

description: delete first_name


operation: drop

input: last_name

description: delete last_name


id: JohnSmith007

name: John Smith

age: 31

city: Santa Clara

state: CA

operation: create

input: name

output: id

description: generated unique id


id: JohnSmith007

name: John Smith

age: 31

city: Santa Clara

state: CA

No field level operations performed

There are two ways to show the lineage graph for the field id as -

  1. Simple view: In this view, the lineage graph for each field has only 2 types of nodes - one belonging to the source datasets from which the field is ultimately created and the field itself. The edge between these nodes represents the set of operations that have been performed on the source nodes. Any intermediate generated fields will not be shown.

    Code Block
  2. Detailed view: In this view, each node of the graph represents the field (including any intermediate field generated) and each edge represents the single operation which transforms the input node into the corresponding output node.

    Code Block
    							 (parse)|				   | (concat)     (create)
    						 body------>|				   |-------->name---------->id
    									|				   |	

CDAP Platform API changes

We will add FieldOperation class in the cdap-api to represent individual field operation.

Code Block
// Identifies the source from where the data is read
public class FieldOperationSource {
   //String Operationnamespace;
name    String name;  // could be the //name Optionalof detailedthe descriptiondataset, aboutKafka the operationtopic etc.			
   String description;
 // Optional description associated with the source

// SetRepresents ofthe input fields participate infor the operation	
public interface  Set<FieldPath> inputFields;
 Input {
   // SetName of outputthe fieldsinput
generated as a part of this operationString getName(); 
   // Question:Optional Candescription thisassociated bewith null?the Forinput
example in case of Drop operation. 
   // However if the field is dropped and its not present in the destination dataset can it be reached?
   Set<FieldPath> outputFields;		
public class FieldPath {
   // Represents the path to the field in the Schema. 
   // We want the field to be uniquely determined. 
   // So if the field with the same name exists at different level of nesting 
   // in the schema we will accept the complete ordered path from root to that field.
   List<String> pathString getDescription();
  // Get the type associated with the input
  // Probably can be used to distinguish between different types of inputs such as source input or field input.
  Type getType(); 
// Represent Source as an input for the Field operation
public class SourceInput extends AbstractInput {
   Source source;
// Represent Field as an input for the Field operation
public class FieldInput extends AbstractInput {
   Schema.Field field;   
public class FieldOperation {
   // Operation name
   String name;
   // Optional Originaldetailed sourcedescription informationabout fromthe whereoperation
the field is coming.String description;
 Source source; // Set of }input  fields publicparticipate classin Sourcethe {operation	
   StringSet<Input> namespaceinputs;
 String id; // couldSet beof theoutput namefields ofgenerated theas dataset,a Kafkapart topic etc.			
 of this operation
 String description; // OptionalQuestion: descriptionCan associatedthis withbe thenull? source


Once constructed following platform api can be used to record these field operations:

Code Block
 * This interface provides methods that will allow programs to recordFor example in case of Drop operation. 
   // However if the field levelis operations.dropped and */its publicnot interfacepresent LineageRecorderin {the destination dataset can it /**be reached?
	 * Record the field level operations against the given destination.
 	 * @param destination the destination for which to record field operations
 	 * @param fieldOperations The list of field operations.
    void record(Destination destination, List<FieldOperation> fieldOperations);
public class Destination {
   String namespace;
   String id;
   String description;


LineageRecorder will be available in the initialize method of the CDAP programs through program runtime context.

Sample program code to record the operations as done for the above pipeline from ETLMapReduce. Note that this code is called from the CDAP program and not from the plugin.

Code Block
public class ETLMapReduce extends AbstractMapReduce {
   public void initialize() throws Exception {
      MapReduceContext context = getContext();
	  List<FieldOperation> operations = new ArrayList<>();
      Source source = Source.from("myns", "user_data", "files contains user information");
      FieldPath inputFieldPath = new FieldPath(null /* No input fields for source */, source);
      FieldPath outputFieldPath = new FieldPath(Arrays.asList("body"), source);
      Set<FieldPath> input = new HashSet<>();
      Set<FieldPath> output = new HashSet<>();
      FieldOperation operation = new FieldOperation("read", "reading the information about users from source", input, output);
      input = new HashSet<>Set<Schema.Field> outputs;
   // Builder for the FieldOperation
   public static Builder {
      String name;
      String description;
      Set<Input> inputs;
      Set<Schema.Field> outputs;
      private Builder() {
        inputs = new HashSet<>();
        outputs = new HashSet<>();
      public Builder setName(String name) { = name;   
         return this;
      public Builder setDescription(String description) {
         this.description = description;
         return this;
      public Builder addInputSource(Source source) {
         Input input = new SourceInput(source);
         return this;
      public Builder addInputField(Field field) {
         Input input = new FieldInput(field);
         return this;
      public Builder addOutput(Field field) {
         return this; 


Once constructed following platform api can be used to record these field operations:

Code Block
 * This interface provides methods that will allow programs to record the field level operations.
public interface LineageRecorder {
 	 * Record the field level operations against the given destination.
 	 * @param destination the destination for which to record field operations
 	 * @param fieldOperations The list of field operations.
    void record(Destination destination, List<FieldOperation> fieldOperations);
public class Destination {
   String namespace;
   String id;
   String description;


LineageRecorder will be available in the initialize method of the CDAP programs through program runtime context.

Sample program code to record the operations as done for the above pipeline from ETLMapReduce. Note that this code is called from the CDAP program and not from the plugin.

Code Block
public class ETLMapReduce extends AbstractMapReduce {
   public void initialize() throws Exception {
      MapReduceContext context = getContext();
	  List<FieldOperation> operations = new ArrayList<>();
      outputSource source = new HashSet<>( Source.from("myns", "user_data", "files contains user information");
      inputFieldPathFieldOperation.Builder builder = new FieldPath(ArraysFieldOperation.asListBuilder("body"), source););
      outputFieldPath = new FieldPath(Arrays.asListsetDescription("first_name"), source);
  Reading the data from the file")
   output.add(outputFieldPath); 	  outputFieldPath = new FieldPath(Arrays.asList("last_name"), source);addInputSource(source)
      outputFieldPath = new FieldPath(Arrays.asList("age"), sourceaddOutput(bodyField);      
output.add(outputFieldPath); 	  outputFieldPath = new FieldPathoperations.add(Arraysbuilder.asListbuild("city"), source);
   output.add(outputFieldPath); 	  outputFieldPathbuilder = new FieldPath(ArraysFieldOperation.asListBuilder("state"), source););
	         operation = new FieldOperation.setDescription("parse", "parsing commathe separatedinput body field",)
 input, output);       operations.addaddInput(operationbodyField);
        input = new HashSet<>.addOutput(firstNameField);
      output = new HashSet<>.addOutput(lastNameField);
       inputFieldPath = new FieldPath(Arrays.asList("first_name"), source);addOutput(ageField)
      inputFieldPath = new FieldPath(Arrays.asList("last_name"), source)addOutput(stateField);

    	  outputFieldPathbuilder = new FieldPath(ArraysFieldOperation.asListBuilder("name"), source););
      operation = new FieldOperation.setDescription("concat", "concatenatingconcatenating the first_ name and last_ name", input, output);
      operations.add(operation);        input = new HashSet<>();.addInput(firstNameField)
      output = new HashSet<>.addInput(lastNameField);
       inputFieldPath = new FieldPath(Arrays.asList("first_name"), source.addOutput(nameField);

    operation  builder = new FieldOperation("drop", "deleting first_name", input, null /* drop does not generate any output field */);.Builder();
setDescription("deleting the first name field")
      input = new HashSet<>.addInput(firstNameField);
      output = new HashSet<>operations.add(;
      inputFieldPathbuilder = new FieldPath(ArraysFieldOperation.asListBuilder("last_name"), source);
      operation = new FieldOperation("drop", "deleting last_name", input, null /* drop does not generate any output field */);
         .setDescription("deleting the last name field")
       input = new HashSet<>.addInput(lastNameField);
      output = new HashSet<>operations.add(;

      inputFieldPathbuilder = new FieldPath(ArraysFieldOperation.asListBuilder("name"), source););
	    outputFieldPath = new FieldPath(Arrays.asListsetName("idcreate"),
  source);       output.add(outputFieldPath);setDescription("creating unique id from the name field")
      operation = new FieldOperation("create", "created unique id from name", input, output);.addInput(nameField)

      context.record(Destination.from("myns", "mytableds"), operations);


