Field Level Lineage Storage and Retrieval

This document explains the design for storage and retrieval of the Field Level Lineage information.

Access Pattern:

  1. For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in backward direction within a given time range. Note that the response should be multi-level. For example, consider a case where "Employee" dataset is generated from "Person", "HR", and "Skills" datasets. Response would contain the field mappings between source datasets ("Person", "HR", and "Skills") and "Employee" dataset. However it is also possible that the source datasets are created/updated in the given time range. So response should also include the field mappings between the datasets which created the source datasets and source datasets themselves.  
  2. For a given dataset, find out the high level lineage (field mapping between source and destination datasets and not the detail operations which caused this conversion) going in forward direction within a given time range. Similar to the above query, response need to be multi-level.
  3. Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the backward direction. Response will only contain the operations belonging to the single level.
  4. Given a dataset and field name, find out detail lineage (field mapping between the source and destination datasets along with the operations which caused this conversion) going in the forward direction. Response will only contain the operations belonging to the single level.

REST API:

  1. Given a dataset and time range, get the high level lineage both in forward and backward direction.

    GET /v3/namespaces/<namespace-id>/endpoints/<endpoint-name>/fields/lineage?start=<start-ts>&end=<end-ts>&level=<level>
    
    
    Where:
    namespace-id: namespace name
    endpoint-name: name of the endpoint
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    level: how many hops to make in backward/forward direction
    
    
    Sample response:
    [
      ...
      list of lineage mappings
      ...
    ]
    
    
    where each lineage mapping will be of the form:
    
    
    {
      "source": {
         "namespace": "ns",
         "name": "Person" 
      },
      "Destination": {
         "namespace": "ns",
         "name": "Employee"
      },
      "fieldmap": [
         { "from": "id", "to": "id" },
         { "from": "first_name", "to": "name"},
         { "from": "last_name", "to": "name"}
      ] 
    }
  2. Given a dataset and field, find out the detailed lineage.

    GET /v3/namespaces/<namespace-id>/endpoints/<endpoint-id>/fields/<field-name>/lineage?start=<start-ts>&end=<end-ts>&direction=<backward/forward>
     
    Where:
    namespace-id: namespace name
    endpoint-id: endpoint name
    field-name: name of the field for which lineage information to be retrieved
    start-ts: starting timestamp(inclusive) in seconds
    end-ts: ending timestamp(exclusive) in seconds for lineage
    direction: backward or forward
    
    
    Sample response:
    {
      [
       ...
          list of nodes
       ...
      ],
      [
       ...
          list of operations
       ...
      ],
      [
       ...
          list of connections
       ...
      ]    
    }
    
    where each Node is an object representing field. Node has id which is uniquely identifies the Node (combination of origin and name) and label which is used to display on the UI. Node can have optional sourceEndPoint and destinationEndPoint members which represents if this node is generated directly from Source EndPoint or written to the Destination EndPoint.
     
    {
      "id": "origin.fieldname",
      "label": "fieldname"
      "sourceEndPoint": {
         "name": "file",
         "namespace": "ns" 
      }  
    }
    
    
    each Operation is represented as 
    {
      "name": "IDENTITY",
      "description": "description associated with the operation". 
    }
    each Connection represents transformation between two nodes with operation name that caused it:
    {
      "from": "Node1.id",
      "to": "Node2.id",
      "operation": "opname"
    }
    
    

Store:

Field level lineage information will be stored in the "FieldLevelLineage" dataset. 

This dataset will have following row keys

  1. Data row: This row will store the actual operations data against the checksum of operations.

    Row Keycolumn: d
    c|<checksum-value>FieldLineageInfo object
  2. Backward lineage row: From the perspective of the destination endpoints, operations will represent the backward lineage. For each destination, separate row will be created.

    Row Keycolumn: ccolumn: p
    b | <endpoint_ns> | <endpoint_name> | <inverted-start-time> | <id.run><checksum><program-run-id>
  3. Forward lineage row: From the perspective of the source endpoints, operationw will represent the forward lineage. For each source, separate row will be created.

    Row Keycolumn: ccoulmn: p
    f | <endpoint_ns> | <endpoint_name> | <inverted-start-time> | <id.run><checksum><program-run-id>

FieldLineageInfo object will store following information:

  1. Collection<Operation>: operations representing the field lineage.
  2. Checksum of operations.
  3. Set of source endpoints.
  4. Set of destination endpoints.
  5. High level bi-directional mapping of fields from source endpoints to destination endpoints. This is for serving the access pattern 1 and 2 described above.
  6. For each field of source endpoint there would be graph from that field to the destination fields.
  7. For each field of destination endpoint there would be graph resulting into that field from different source fields.

Open questions:

  1. Per UI we also want to show the type of fields which we currently do not accept through API.
  2. What constitutes the dataset schema? For example for fileset should we assume that the fields generated by the READ operations are part of schema?