Field level metadata tagging:
- Capabilities to add and retrieve metadata or tags at field level for dataset schemas.
- The capability needs to be exposed to data pipelines and plugins as well as enterable through the UI.
- It should be possible to add tags at field level both automated and manual way. Andreas: not quite clear what that means - automated/manual?
Example 1: Metadata based processing: Consider a CDAP data pipeline which reads a user profile dataset. A user profile contains several PII fields such as social security number, phone number, etc. A stage in the data pipeline is responsible for the obfuscating the PII data. This stage looks at all the fields and if a field is tagged as 'PII', it obfuscates it.
Example 2. Metadata based discovery: Data governance officer might want to get the list of PII fields in the user profile dataset present in the data lake. In this case he would like to get the fields social security number, phone number, and address as well.
Example 3. Metadata based discovery: A Data Scientist wants to use a user profile dataset. To comply with regulations, he wants to use a dataset where the phone numbers are anonymized. He searches for "profile phone:anonymized".
- Capabilities to annotate the object level metadata. (we need to define what different types of objects are and also is it possible to specify the custom objects).
For example: - How is the directory on HDFS is created?
- Which files are responsible for creating this partition in a dataset
- Run id of the pipeline that creates a specific file in a dataset Checksum, umber of errors, and other operational meta data for a file in a dataset
- through UI, CLI and REST
- through APIs for plugins and programs
- through configuration for a pipeline (unclear what that means, perhaps it is a plugin configuration, and controls how the plugin uses the API).
This helps answering the compliance related questions such as how the file got created. This also gives a traceability so that If the file is bad we know whats the origin of the file.
Example 1: Organization can have multiple business units such as 'HR', 'Finance' etc. SFTP is commonly used mechanism for sharing data within the business unit in an organization. Since these files are used within the single business unit only the format in which the files are stored might not be consistent in the organization. Files from the HR maybe stored as CSV, while files from Finance may be stored as rich XML format. In order to perform analysis on the file, they must be imported into the HDFS by using separate CDAP data pipelines, or different configurations for the same pipeline. While importing, normalization is Sagar: My understanding is automated way meaning tags are emitted programmatically (by plugins or CDAP programs) while manual way is assigning tags through UI (or REST API). For example, plugin can be implemented to mark the user address field as 'PII' in automated fashion, however once the data is landed in the data lake, data governance officer decides that another field say, phone number should also have been marked as 'PII'. He should be able to do that through UI which is manual way. We can confirm this requirement though. OK that makes sense, but isn't that already covered in b.? Maybe you can consolidate b.a and c. and rephrase: this must be exposed:
Example 1: Metadata based processing: Consider a CDAP data pipeline which reads a user profile dataset. A user profile contains several PII fields such as social security number, phone number, etc. A stage in the data pipeline is responsible for the obfuscating the PII data. This stage looks at all the fields and if a field is tagged as 'PII', it obfuscates it.
Example 2. Metadata based discovery: Data governance officer might want to get the list of PII fields in the user profile dataset present in the data lake. In this case he would like to get the fields social security number, phone number, and address as well.
Example 3. Metadata based discovery: A Data Scientist wants to use a user profile dataset. To comply with regulations, he wants to use a dataset where the phone numbers are anonymized. He searches for "profile phone:anonymized".
- Capabilities to annotate the object level metadata. (we need to define what different types of objects are and also is it possible to specify the custom objects).
For example:- How is the directory on HDFS is created?
- Which files are responsible for creating this partition in a dataset
- Run id of the pipeline that creates a specific file in a dataset
- Checksum, umber of errors, and other operational meta data for a file in a dataset
- This helps answering the compliance related questions such as how the file got created. This also gives a traceability so that If the file is bad we know whats the origin of the file.
Example 1: Organization can have multiple business units such as 'HR', 'Finance' etc. SFTP is commonly used mechanism for sharing data within the business unit in an organization. Since these files are used within the single business unit only the format in which the files are stored might not be consistent in the organization. Files from the HR maybe stored as CSV, while files from Finance may be stored as rich XML format. In order to perform analysis on the file, they must be imported into the HDFS by using separate CDAP data pipelines, or different configurations for the same pipeline. While importing, normalization is done on the files to store them in HDFS in common format.
Data governance officer still need to look at the data in HDFS at file level rather than abstracted CDAP dataset level. By looking at the file he would want to know the checksum associated with the file, which business unit the file belongs to, when the file was created etc.
Example 2: As an extension to the above example, multiple different services (such as CDAP, Custom Hadoop stack etc.) are responsible for pumping files in the data lake. If we tag the file with the name of the process such as 'CDAP', then the data governance officer will know which service is bringing the file in.
Example 3: A pipeline ingests data into files. It gathers metrics about the data quality, such as how many schema errors, how many syntax errors, etc. were encountered during processing. At the end of the pipeline, a custom action computes a data quality score (between 0 and 100) and assigns it as meta data to the output file (or directory) in the sink dataset. Downstream pipelines that consume this dataset will decide based on each file's data quality score whether to process or to ignore that file.
Example 1: Consider a data pipeline which processes files containing news viewership feeds from multiple publishers. Files are tagged with the id of the publisher. In a data lake we want to store the viewership from all publishers in a single dataset, so that we can perform analysis such as which news got highest number of views. However it would be still useful to get the information such as for a particular publisher which news is the most popular. Since the files are tagged with the publisher id, we would like to store the publisher id in the record itself for the analysis in the data lake.
Example 2: A dataset contains data from many publishers. For each publisher, the same pipeline is used for ingestion, with different runtime arguments, one of which provides the publisher id for the data being ingested. This publisher id as well as the source file name are added as extra fields to every output record, in order to be able to identify the publisher if one of the records is later reported as inaccurate, offensive or otherwise non-compliant. This will help trace every record back to its publisher and source file.
Field level lineage: Andreas: I think this example is implementation, not a use case. Can you rewrite this from the perspective of the user? The Data Officer (or Business Analyst) wants to understand the meaning of a field, by reading how it was produced. For example, the "fullName" fiel was created by concatenating the "firstName" and "lastName" field, both of which were extracted as positional fields from a CSV record in the source named "passengerList". The Operations team (and sometimes a developer) wants to identify how a field was computed, typically in a triage/debugging scenario where some fields show up with wrong values.
Code Block |
---|
SFTP to HDFS Copy------->File------->Parser------->Name-Normalizer------->IdGenerator------->Store
|
Consider a sample record which is read by File source - John,Smith,31,Santa Clara,CA
Pipeline Stage | Fields Emitted with values | Field Level Operations |
---|---|---|
File | body: John,Smith,31,Santa Clara,CA offset: <line offset> | operation: read input: file output: body, offset description: read the file to generate the body field |
Parser | first_name: John last_name: Smith age: 31 city: Santa Clara state: CA | operation: parse input: body output: first_name, last_name, age, city, state description: parsed body field |
Name-Normalizer | name: John Smith age: 31 city: Santa Clara state: CA | operation: concat input: first_name, last_name output: name description: concatenate first_name and last_name fields
operation: drop input: first_name description: delete first_name
operation: drop input: last_name description: delete last_name |
IdGenerator | id: JohnSmith007 name: John Smith age: 31 city: Santa Clara state: CA | operation: create input: name output: id description: generated unique id based on the name |
Store | id: JohnSmith007 name: John Smith age: 31 city: Santa Clara state: CA | No field level operations performed |
Now if you want to look at the lineage of the field id which is part of CDAP dataset generated by stage Store of the above pipeline -
Sagar (Updated the example) Example: Consider a CDAP data pipeline which imports the airline passenger information stored on the SFTP server into HDFS. Since the passenger information is coming from different sources, it needs to be normalized into the standard format while importing. Consider few sample records from the SFTP file (shown as table for better understanding) - OK, I guess this example simply has a lot more detail than the other examples And important aspect is that the business user thinks of this as a property of a dataset, whereas a developer would see this as a property of a pipeline (run). That gets a little lost in all the details.
FullName | FirstName | LastName | Date of Birth | Gender | Country of Residence | Place of Birth |
---|---|---|---|---|---|---|
Bruce | Wayne | 12/10/1975 | 1 | United States of America | San Jose | |
Kent, clark | Nov 11, 1980 | M | Canada | Toronto | ||
Kathey | doe | 11/11/1987 | Female | England | Manchester | |
brian | smith | April 05, 1992 | Male | America | Palo Alto |
We want to store this information in the HDFS in the following format:
PassengerId | Name | DOB | Gender | Place of Birth | Contry Code |
---|---|---|---|---|---|
PQR6248 | Wayne, Bruce | 12/10/1975 | Male | San Jose | USA |
XLM0912 | Kent, Clark | 11/11/1980 | Male | Toronto | CAN |
YQR8765 | Doe, Kathey | 11/11/1987 | Female | Manchester | ENG |
NULL | Smith, Brian | 04/05/1992 | Male | Palo Alto | NULL |
Custom algorithm is reponsible for generating PassengerId in the above table using the normalized fields Name, DOB, and Country Code.
Now Data Officer(or Business Analyst) wants to understand which fields are critical for generating the PassengerId field. By looking at the field level lineage graph as given below, he would be able to know the fields from the source contribute to the PassengerId field.
Code Block |
---|
(Normalize) FullName--------------------------------------------> | FirstName--> (concat) (Normalize) |-------------------->Name-------------> |---->ConcatenatedName------------------> | LastName---> | (Date Normalizer) | | (concat) (create) body Date Of Birth----------->| |------------>DOB----->name------>|---->id | >PassengerId | (Code Generator) | Country | |---last_nameof Residence--------->Country Code--------->| |
Example: Consider a CDAP data pipeline which reads the user profile files from SFTP. Once the files are copied on HDFS, the files are parsed, User Name is normalized in the required format, unique Id is generated for the user, and then it is stored on HDFS in CDAP dataset.
Software developer is notified that the PassengerId in the above dataset for user Smith, Brian is generated as NULL. By looking at the field level lineage graph for the PassengerId, developer can figure out that since Country Code itself is NULL, its generating NULL PassengerId.
- This helps answering questions such as who changed what metadata. (we need both pieces who changed and what was the change)
- Provenance information should be available through the REST api as well as through UI.
- Technical and business (Andreas: all? Sagar: added this to open question) metadata should be tracked.
Example 1: Debugging: Consider that a data governance officer is looking at the fields of a particular dataset and associated metadata for the fields. He notices that the 'User Address' field is missing a PII tag. He looks at the other provenance related tags such as 'Owner process', 'Creation date' and can notify appropriate parties to look into further. For example if the 'Owner Process' is 'Data import team' then he can notify them. Data import team then can drill further based on the 'Creation date' and adjust the pipeline accordingly.
Example 2: Metadata change based processing:
Only trigger processing if the metadata associated with the source is changed last time since the source processed. (Example not clear yet).
Example: Consider a CDAP data pipeline which reads the user profile information containing PII data for example social security number associated with the user. Based on the social security number field, pipeline fetches additional information about the user such as users current address and phone number. User information then stored in the data lake. Once the data is in the data lake, generic anonymization process can look at the PII fields given a dataset and anonymize them. Since user phone number and address (which are generated from the social security number) are also considered as sensitive personal information it is useful to propagate the PII tag to them too so that those fields will also get anonymize.
Andreas: I think this is not an example of propagation, but of explicit tagging. There is no clear way the pipeline app could know that these two fields are generated from the SSN. A simpler example is a pipeline whose source has a field "address" and that has PII=true. The pipeline performs address normalization and stores both the original and the normalized form in output fields. For the original form, since it was simnply carried forward from the source, all metadata should automatically propagate (by the platform). For the normalized address field, the "address normalizer" plugin needs to have a way to either copy the "address" meta data to its output field "nomalizedAddress", or (better) have a programmatic way to signal that all metadata from "address" must be propagated to "normalilzedAddress:". Furthermore, if the plugin anonymizes rather than normalizes, it needs a way to either declare that its output field is not PII, or add an extra field "anonymized=true" to that field.
A big question here is when that should happen. For every run of a pipeline? What if different runs produce/propagate different meta data? How is that resolved?
Accessing the captured metadata:
Example 1: If the field is marked as PII, then perform obfuscation process.
Example 2: In flight metadata: Consider a CDAP data pipeline which reads the User profile information. The dataset contains the firstname and lastname as a field. These fields are tagged as 'User Identifier'. Now the pipeline perform Title casing on the field such that first character is capitalised to generate the fields UpperFirstName and UpperLastName. Next stage in the pipeline concatenates the UpperFirstName and LowerFirstName fields to generate the Name field. Since the UpperFirstName, UpperLastName, and Name field are in flight and not persisted to any dataset it, the stage in the pipeline when see these fields should still be able to access the tag associated with those fields which is 'User Identifier'.
Integration with Enterprise metadata systems:
- System should be able to generate the operational metadata.
Example: A file was generated and during the processing of which 1000 Warning were generated. This information need to be captured.
Andreas: see my example 3 under object-level meta data.
...
- appropriate parties to look into further. For example if the 'Owner Process' is 'Data import team' then he can notify them. Data import team then can drill further based on the 'Creation date' and adjust the pipeline accordingly.
Example 2: Metadata change based processing:
Only trigger processing if the metadata associated with the source is changed last time since the source processed. (Example not clear yet).
- appropriate parties to look into further. For example if the 'Owner Process' is 'Data import team' then he can notify them. Data import team then can drill further based on the 'Creation date' and adjust the pipeline accordingly.
- Metadata propagation: By default propagate the metadata at the field level. but this can be overridden by the pipeline developer.
Example: Consider a CDAP data pipeline which reads the user profile information which has "address" field and that has PII=true. The pipeline performs address normalization and stores both the original and the normalized form in output fields. For the original form, since it was simnply carried forward from the source, all metadata should automatically propagate (by the platform). For the normalized address field, the "address normalizer" plugin needs to have a way to either copy the "address" meta data to its output field "nomalizedAddress", or (better) have a programmatic way to signal that all metadata from "address" must be propagated to "normalilzedAddress:". Furthermore, if the plugin anonymizes rather than normalizes, it needs a way to either declare that its output field is not PII, or add an extra field "anonymized=true" to that field.
A big question here is when that should happen. For every run of a pipeline? What if different runs produce/propagate different meta data? How is that resolved? Sagar: Added in open questions - Accessing the captured metadata:
- The captured metadata should be accessible through API, plugins, and UI.
- This will help applying rules based on the retrieved metadata.
Example 1: If the field is marked as PII, then perform obfuscation process.
Example 2: In flight metadata: Consider a CDAP data pipeline which reads the User profile information. The dataset contains the firstname and lastname as a field. These fields are tagged as 'User Identifier'. Now the pipeline perform Title casing on the field such that first character is capitalised to generate the fields UpperFirstName and UpperLastName. Next stage in the pipeline concatenates the UpperFirstName and LowerFirstName fields to generate the Name field. Since the UpperFirstName, UpperLastName, and Name field are in flight and not persisted to any dataset it, the stage in the pipeline when see these fields should still be able to access the tag associated with those fields which is 'User Identifier'.
- Integration with Enterprise metadata systems:
- Consume the metadata events from the external systems such as Atlas, Colibra etc. into the CDAP.
- Push the metadata events from CDAP to the external systems such as Atlas, Colibra.
- What metadata is pushed to the external systems from CDAP need to be configurable.
- Integration should also allow maintain the referential integrity into the external systems. For example if I browse for a field in the UI corresponding tags should also be fetched from the external systems (not very clear about this requirement)
- This helps achieving the complete automated metadata framework.
- Operational metadata:
- System should be able to generate the operational metadata.
Example: A file was generated and during the processing of which 1000 Warning were generated. This information need to be captured.
Andreas: see my example 3 under object-level meta data. Sagar: understood.
- System should be able to generate the operational metadata.
Open Questions:
- Requirement document from the customer mentions that we need automated/manual way to add metadata/tags at the field level.
Sagar: My understanding is automated way meaning tags are emitted programmatically (by plugins or CDAP programs) while manual way is assigning tags through UI (or REST API). For example, plugin can be implemented to mark the user address field as 'PII' in automated fashion, however once the data is landed in the data lake, data governance officer decides that another field say, phone number should also have been marked as 'PII'. He should be able to do that through UI which is manual way. We can confirm this requirement though. - Requirement document from the customer mentions that metadata provenance is required for the business as well as technical metadata. Should we have provenance for all type of technical meatadata, for example if during multiple pipeline runs number of WARNINGS got changed, should we track that change too?
- Should metadata be stored for every run of the pipeline? What if different runs produce/propagate different meta data? How is that resolved?
Sagar: Since there is a possible usecase of manually tagging the field through UI and REST endpoint (which wont have any runid) we may not want to store it run level. For aggregating metadata from multiple runs, we can provide the policies (OVVERIDE, MERGE) etc. Rohit Sinha is this addressed in any of your design document? If not we can add it. - When do we emit the Metadata to the TMS? Is it happen when the code executed or the metadata is cached in memory and emitted in the "destroy" method of the program? If we create new field, then we also need to copy its metadata. Should the metadata of the newly created field emitted to TMS instantaneously or should it be emitted in the "destroy" method when the field is actually created.
Following are just notes for Sagar:
...