Extracting lineage from CDAP

CDAP collects data integration lineage at a dataset level and at a field level for all the pipelines that run. The lineage information represented in CDAP is accessible via REST APIs and can be used to export by users for any of their use cases. This document describes how to extract dataset lineage and field level lineage from CDAP APIs.

Lineage in CDAP is presented in a data centric view. The UI and REST APIs allow querying the lineage for each dataset and each schema fields in the datasets. To get an impact analysis from a pipeline point of view, the lineage from the dataset level and field level should be collected which provides the pipelines that impact the datasets and then get the inverse mapping.

Concepts

Dataset: Dataset in this context refers to a representation of an external source or a sink in Data Fusion. As an example, if a pipeline reads from a GCS source and writes to a BQ Sink, they are represented as external datasets. The name of the dataset is represented by the Reference name provided by the pipeline developers.

Application: Applications are deployable units in CDAP and are composed of programs and schedules. Each data pipeline corresponds to an application.

Programs: Programs process data in CDAP. Each batch pipeline is composed of a workflow program that contains one or more MapReduce or Spark programs. Each realtime pipeline is composed of a Spark streaming program.

Fetching Lineage via REST API

There are two levels at which lineage is represented:

  • Dataset level

    • Dataset level lineage represents all the origins of data access to the entity for a given time range.

  • Field level

    • A field lineage for a given dataset shows, for the specified time range, all the fields that were computed for a dataset and the fields from source datasets that participated in computation of those fields. Field lineage also shows the detail operations that caused the transformation from fields of a source dataset to the field of a given dataset.

Lineage information can be obtained by following the recipe:

1. Fetch all the namespaces.

2. For each namespace, fetch all the datasets (Step 1 below).

2.1 Dataset level lineage: For each dataset obtain the dataset level lineage (Step 2 below).

2.2 Field level lineage: For each datasets, fetch all the fields and for each field fetch the operations. (Step 3 below).

The

Step 1: Fetch all the Datasets

Please refer to the following doc about how to access CDAP REST APIs https://cloud.google.com/data-fusion/docs/reference/cdap-reference. For the section below, only the relative URL will be presented.

To list all the datasets in a given namespace, please use the following REST API:

GET v3/namespaces/<namespace-id>/data/datasets

Sample response:

[{ "name": "Delayed-Shipments-US", "type": "externalDataset", "properties": { "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"shipment_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"shipping_date\",\"type\":[\"string\",\"null\"]},{\"name\":\"shipping_time\",\"type\":[\"string\",\"null\"]},{\"name\":\"origin_country\",\"type\":[\"string\",\"null\"]},{\"name\":\"origin_city\",\"type\":[\"string\",\"null\"]},{\"name\":\"destination_country\",\"type\":[\"string\",\"null\"]},{\"name\":\"destination_city\",\"type\":[\"string\",\"null\"]},{\"name\":\"customer_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"size\",\"type\":[\"string\",\"null\"]},{\"name\":\"product\",\"type\":[\"string\",\"null\"]},{\"name\":\"region_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"hazardous_goods\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"handle_with_care\",\"type\":[\"string\",\"null\"]},{\"name\":\"time_to_ship\",\"type\":[\"string\",\"null\"]},{\"name\":\"signature_required\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"weight\",\"type\":[\"double\",\"null\"]},{\"name\":\"zipcode\",\"type\":[\"long\",\"null\"]},{\"name\":\"price\",\"type\":[\"double\",\"null\"]}]}" } }, { "name": "Cleaned-Shipments", "type": "externalDataset", "properties": { "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"shipment_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"shipping_date\",\"type\":[\"string\",\"null\"]},{\"name\":\"shipping_time\",\"type\":[\"string\",\"null\"]},{\"name\":\"origin_country\",\"type\":[\"string\",\"null\"]},{\"name\":\"origin_city\",\"type\":[\"string\",\"null\"]},{\"name\":\"destination_country\",\"type\":[\"string\",\"null\"]},{\"name\":\"destination_city\",\"type\":[\"string\",\"null\"]},{\"name\":\"customer_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"size\",\"type\":[\"string\",\"null\"]},{\"name\":\"product\",\"type\":[\"string\",\"null\"]},{\"name\":\"region_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"hazardous_goods\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"handle_with_care\",\"type\":[\"string\",\"null\"]},{\"name\":\"time_to_ship\",\"type\":[\"string\",\"null\"]},{\"name\":\"signature_required\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"weight\",\"type\":[\"float\",\"null\"]},{\"name\":\"zipcode\",\"type\":[\"int\",\"null\"]},{\"name\":\"price\",\"type\":[\"float\",\"null\"]}]}" } }]

Step 2: Getting dataset level Lineage

Dataset level lineage is represented by one or more dataset that is read by a data pipeline and one or more dataset where the data pipeline writes to. As an example consider the following representation:

This can be fetched by querying for the lineage of the any of the dataset in the graph.

Querying for Delayed-Shipments-US using the following API:

GET v3/namespaces/default/datasets/Delayed-Shipments-US/lineage?start=now-30d&end=now

will return the lineage information of the programs that wrote to Delayed-Shipment-US datasets and which datasets were written to.

Sample response below:

{ "start": 1581128699, "end": 1583720699, "relations": [ { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.Delayed-Shipments-US.phase-1", "accesses": [ "read" ], "runs": [ "c727d571-5296-11ea-a4f4-42010a8a0019" ], "components": [] }, { "data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "write" ], "runs": [ "beda6d41-57e3-11ea-9a86-42010a8a001e" ], "components": [] }, { "data": "dataset.default.Delayed-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "write" ], "runs": [ "84ca5fc1-57d4-11ea-b707-42010a8a001f" ], "components": [] }, { "data": "dataset.default.Delayed-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "write" ], "runs": [ "beda6d41-57e3-11ea-9a86-42010a8a001e" ], "components": [] }, { "data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ_v1.phase-1", "accesses": [ "write" ], "runs": [ "b9b36b71-57d7-11ea-b352-42010a8a0041" ], "components": [] }, { "data": "dataset.default.Invalid-Shipment-Data", "program": "mapreduce.default.Shipment-Data-Cleansing.phase-1", "accesses": [ "write" ], "runs": [ "6dabc5c1-5295-11ea-b95e-42010a8a001f" ], "components": [] }, { "data": "dataset.default.Delayed-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ_v1.phase-1", "accesses": [ "write" ], "runs": [ "b9b36b71-57d7-11ea-b352-42010a8a0041" ], "components": [] }, { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.TitanicCopy.phase-1", "accesses": [ "read" ], "runs": [ "2a264e31-5cde-11ea-a6c7-42010a8a002f" ], "components": [] }, { "data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US.phase-1", "accesses": [ "write" ], "runs": [ "c727d571-5296-11ea-a4f4-42010a8a0019" ], "components": [] }, { "data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "write" ], "runs": [ "84ca5fc1-57d4-11ea-b707-42010a8a001f" ], "components": [] }, { "data": "dataset.default.Raw_Shipping_Data", "program": "mapreduce.default.Shipment-Data-Cleansing.phase-1", "accesses": [ "read" ], "runs": [ "6dabc5c1-5295-11ea-b95e-42010a8a001f" ], "components": [] }, { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.Delayed-Shipments-US_BQ_v1.phase-1", "accesses": [ "read" ], "runs": [ "b9b36b71-57d7-11ea-b352-42010a8a0041" ], "components": [] }, { "data": "dataset.default.Delayed-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US.phase-1", "accesses": [ "write" ], "runs": [ "c727d571-5296-11ea-a4f4-42010a8a0019" ], "components": [] }, { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "read" ], "runs": [ "beda6d41-57e3-11ea-9a86-42010a8a001e" ], "components": [] }, { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "read" ], "runs": [ "84ca5fc1-57d4-11ea-b707-42010a8a001f" ], "components": [] }, { "data": "dataset.default.Cleaned-Shipments", "program": "mapreduce.default.Shipment-Data-Cleansing.phase-1", "accesses": [ "write" ], "runs": [ "6dabc5c1-5295-11ea-b95e-42010a8a001f" ], "components": [] }, { "data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.TitanicCopy.phase-1", "accesses": [ "write" ], "runs": [ "2a264e31-5cde-11ea-a6c7-42010a8a002f" ], "components": [] } ], "programs": { "mapreduce.default.Shipment-Data-Cleansing.phase-1": { "entityId": { "application": "Shipment-Data-Cleansing", "version": "-SNAPSHOT", "type": "Mapreduce", "program": "phase-1", "namespace": "default", "entity": "PROGRAM" } }, "mapreduce.default.TitanicCopy.phase-1": { "entityId": { "application": "TitanicCopy", "version": "-SNAPSHOT", "type": "Mapreduce", "program": "phase-1", "namespace": "default", "entity": "PROGRAM" } }, "mapreduce.default.Delayed-Shipments-US_BQ_v1.phase-1": { "entityId": { "application": "Delayed-Shipments-US_BQ_v1", "version": "-SNAPSHOT", "type": "Mapreduce", "program": "phase-1", "namespace": "default", "entity": "PROGRAM" } }, "mapreduce.default.Delayed-Shipments-US_BQ.phase-1": { "entityId": { "application": "Delayed-Shipments-US_BQ", "version": "-SNAPSHOT", "type": "Mapreduce", "program": "phase-1", "namespace": "default", "entity": "PROGRAM" } }, "mapreduce.default.Delayed-Shipments-US.phase-1": { "entityId": { "application": "Delayed-Shipments-US", "version": "-SNAPSHOT", "type": "Mapreduce", "program": "phase-1", "namespace": "default", "entity": "PROGRAM" } } }, "data": { "dataset.default.On-Time-Shipments-US": { "entityId": { "dataset": "On-Time-Shipments-US", "namespace": "default", "entity": "DATASET" } }, "dataset.default.Invalid-Shipment-Data": { "entityId": { "dataset": "Invalid-Shipment-Data", "namespace": "default", "entity": "DATASET" } }, "dataset.default.Raw_Shipping_Data": { "entityId": { "dataset": "Raw_Shipping_Data", "namespace": "default", "entity": "DATASET" } }, "dataset.default.Cleaned-Shipments": { "entityId": { "dataset": "Cleaned-Shipments", "namespace": "default", "entity": "DATASET" } }, "dataset.default.Delayed-Shipments-US": { "entityId": { "dataset": "Delayed-Shipments-US", "namespace": "default", "entity": "DATASET" } } } }

The key components of the response are:

  • data: The data is of the format dataset.<namespace-id>.<dataset-name>.

  • program: The data is of the format <program-type>.<namespace-id>.<pipeline-name>.<program-name>.

  • access: access type can be read/write or both.

"data": "dataset.default.On-Time-Shipments-US", "program": "mapreduce.default.Delayed-Shipments-US_BQ.phase-1", "accesses": [ "write" ],

 

Step 3: Getting Field Level Lineage

Each dataset has a schema that represents all the fields that the dataset is composed of. To get the field level lineage, the first step is to get a list of fields for the dataset.

3.1: Getting Fields for the Dataset

This can be achieved by querying the following end point:

GET v3/namespaces/<namespace-id>/datasets/<dataset-id>/lineage/fields?start=<start-duration>&end=<end-duration>

The response returns an array of fields which can be iterated over to get the field level lineage in the next step. Sample response:

 

3.2: Field Level Lineage Operations

This can be fetched using the following end-point:

GET v3/namespaces/<namespace-id>/datasets/<dataset-id>/lineage/fields/<field-name>/operations?start=<start-time>&end=<end-time>

Operations represent a two way relationship, incoming that represent the root cause analysis and outgoing that represent the impact analysis of the given field. The key components of the incoming and outgoing relationships are:

  • application: Pipeline name.

  • program: Program that wrote to (incoming operations) or read from (outgoing operation).

  • operations: Array of operations that were operated on the field.

As an example for hazardous_goods in the Delayed-Shipments-US the sample response is below:

 

Â