This document is still under review
Overview
This is a design document for the Metadata and Data Discovery feature in CDAP. Metadata and Data Discovery will allow CDAP applications and datasets to be annotated with both business and system metadata. This will enable users to:
- Track lineage and provenance of datasets
Discover datasets and applications based on Metadata
Requirements
Id | Requirement | Priority | Description/Comments |
---|---|---|---|
R1 | CDAP should provide Java API to annotate metadata | H |
|
R2 | CDAP should have the ability to distinguish the type of metadata | H |
|
R3 | Metadata can be associated with Dataset (All Metadata) and as well Application (Business Metadata and Generic) | H |
|
R4 | User should have the ability to annotate business metadata with Application or Datasets using REST API / CLI and Programmatically using API | H | |
R5 | User should have the ability to retrieve all of business metadata using REST API / CLI | H |
|
R6 | User should have the ability to search a Dataset and Application based on Generic Metadata | H |
|
R7 | User should have the ability to search a Dataset and Application based on Business Metadata | H | |
R8 | User should have the ability to view metadata in UI in Dataset View and for Application in Application View. | H | |
R9 | User should have the ability to distinguish the type of metadata he is viewing on the UI. | H | |
R10 | User should have ability to search an Application or Dataset by their Metadata on the UI | H | |
R11 | CDAP system should automatically annotate dataset with machine generated metadata | M | Enumerate:
|
R12 | User should have the ability to search Dataset based on machine-generated metadata – specifically we start with field names rather than values. | M | |
R13 | User should have the ability to search Dataset based on the schema fields associated with Dataset. | M | |
R14 | User should have the ability to publish business tags on notification system | H | |
R15 | User should have the ability to specify a filter on machine-generated metadata to be associated with dataset. | L | Enumerate. What kind of filters? |
User Stories
In no specific order...
Id | Description | Requirements fulfilled |
---|---|---|
U1 | As a user, I should be able to tag applications, programs, streams and datasets with business metadata at deploy time | R1 |
U2 | As a user, I should be able to tag application, programs, streams and datasets with business metadata after deployment | R4 |
U3 | As a user, I should be able to view business and system metadata separately | R2, R3, R6 |
U4 | As a user, I should be able to view metadata and runtime information that the CDAP system has automatically tagged applications and datasets with. | R11 |
U5 | As a user, I should be able to retrieve all the business and system metadata associated with applications and datasets | R5, R8 |
U6 | As a user, I should be able to view the lineage and provenance for a dataset in a given time period | |
U7 | As a user, I should be able to discover/search datasets based on metadata - business or system | R6, R7, R10 |
U8 | As a user, I should be able to discover/search datasets based on fields in the dataset schema | R13 |
U9 | As a user, I should be able to publish metadata to the notification system | R14 |
U10 | As a user, I should be able to specify filters on the kind of metadata that is automatically added to datasets and applications | R15 |
User Interactions
Interaction 1
- User goes to the metadata discovery page
- User types in a metadata keyword in the search box
- User gets a list of all possible metadata keys in the search results
- User selects (clicks) a single metadata key
- User gets a list of all datasets and applications that were annotated with the selected metadata keys
- User selects a dataset from the above list
- User is taken to the dataset detail page
- User has the option to view the lineage of the dataset, for a given time interval
Interaction 2
- User is shown all available metadata keys
- User follows Interaction 1 from point 4 onwards
Interaction 3
- User is on a page showing a lineage diagram for a dataset for the specified time interval
- User clicks on a program in the lineage diagram
- User is shown all runs for the selected program in the specified time interval
- User selects a runId
- User is shown all metadata for the selected run
Interaction 4
- User is on a dataset/application/program detail page
- User clicks a button to view metadata
- User is shown all metadata for the selected dataset/application/program/runId
Scope (3.2)
In Scope:
Metadata Annotation and Retrieval
- Allow users to update and retrieve Scope.USER Metadata through REST endpoints
- Automatically associate application, runId with dataset access
- Publish business metadata to Kafka topic.
Lineage
- View lineage on a dataset through REST API
Search and Discovery
- Allow prefix-searching for a single metadata key or value. (Either just key, or a combination of key and value).
Out of Scope
Metadata Annotation and Retrieval
- No Java Clients/CLI support
- Annotating dataset/stream accesses as read/write is out of scope.
- No support for annotating applications or datasets programmatically
Lineage
- Lineage will not be directional.
Search and Discovery
- No support for boolean expressions.
- No free-text search.
Architecture
The Metadata Service will be implemented as a separate service that runs in Yarn. One of the primary reasons for this is that it cuts across AppFabric and DataFabric. This will also allow us to scale it independently. This service will expose some REST APIs for interaction, most of which will be public (routable) while some may be private (non-routable). This service will allow the following kinds of interactions:
- Direct interactions from users for setting and retrieving metadata for applications and datasets
- Interactions from AppFabric/Dataset Service during (the start/end of) program runs for annotating that a program accessed a dataset (as input or output).
This service will also be responsible for computing and serving the lineage of a dataset in a specified time interval.
An additional service may (TBD) also be exposed to fulfill the requirement of metadata search. This service may be a YARN service that runs Solr or Lucene to index metadata.
Design
Metadata System
Metadata Types
The metadata system supports the following kinds of metadata:
- Business (User) Metadata: Metadata that users annotate, using either the Java API or the REST API.
- System Metadata: Metadata that is annotated by the CDAP system. This may include two kinds of metadata:
- Generic Metadata: Metadata that is added by default to every application/dataset - created_by, creation_time, last_updated_by, last_update_time (Any more )
- Runtime Metadata: Metadata that contains runtime information like Workflow Token, Data Quality Stats, Program/Dataset Runtime Arguments, Preferences. We may not store this data in the metadata service, but may just reference from the current MDS with a runId.
System Metadata Updates
Updates to system metadata will happen at four different times:
- App/Dataset Deployment: To update System Metadata
- Program start: To update the history table for the dataset access with the run
- During Program Run: To update dataset accesses for programs that update datasets (that are not set as input or output datasets) during their run
- Program end: To update the history table for the dataset access with the end time (and the metadata values?)
Search
We will use Solr or Lucene as an external search and indexing engine. An overview of the investigation of various available options can be found at External Search and Indexing Engine Investigation.
CDAP Search System Service
To support Solr for fault tolerant mode there will be new CDAP system service to act as adapter for the search and index engine in YARN.
The new proposed search system service will have a primary role to manage and act as proxy between CDAP master and the external search and index engine.
Indexing and Searching Metadata Records Using IndexedTable
In the 3.2 release, we will support search on metadata records using the key*=value* and key* format rather than free text search.
To do this, we will use IndexedTable
instance to store the metadata records which allow us to get another table act as inverted index of the metadata records.
Store
The metadata system will be composed of the following three tables:
Business Metadata Table
This table contains the most recent metadata associated with a dataset or an application. It contains no historical data. It's purpose is to optimally serve the annotate and retrieve APIs for business metadata.
RowKey:
<target-type> | <target-id> | <metadata-type> | <key> |
---|
where:
<target-type>
: APP/PROGRAM/DATASET/STREAM
<target-id>
: app-id(ns+app)
/ program-id(ns+app+pgtype+pgm
) / dataset-id(ns+dataset)/stream-id(ns+stream)
<metadata-type>
: "p" for a property (key-value) record, "t" for a tag record
<key>
: the key
Col:
Key:Value | Value |
---|
We store both Key:Value and Value in the column to make indexing easier using an IndexedTable
. We will be able to support prefix searches with this approach as well.
When an app is deleted, its business metadata from this table is also deleted
Lineage Table
This table contains historical information of program runs and the datasets that they accessed for read or write. It serves like an audit-trail. The primary goal of this table is to be able to compute the lineage for a dataset.
RowKey:
This table can have six kinds of row-keys
Dataset access from a CDAP Program
d |
| <inverted-start-time> | p | <run-id> | <access-type> |
---|---|---|---|---|---|
p |
| <inverted-start-time> | d | <dataset-id> | <access-type> |
where:
d
: Identifies a dataset access
<dataset-id>
: The dataset id containing ns+dataset
<
: The start time of the run; inverted because HBase0.96 does not support reverse scan.inverted-start-time
>
p
: Identifies that this access was made from a CDAP Program
<run-id>
: The run id containing ns+app+program+runId
<access-type>
: r/w/rw
Stream access from a CDAP Program
s |
| <inverted-start-time> | p | <run-id> | <access-type> |
---|---|---|---|---|---|
p |
| <inverted-start-time> | s | <stream-id> | <access-type> |
where:
s
: Identifies a stream access
<stream-id>
: The stream id containing ns+stream
<
: The start time of the run; inverted because HBase0.96 does not support reverse scan.inverted-start-time
>
p
: Identifies that this access was made from a CDAP Program
<run-id>
: The run id containing ns+app+program+runId
<access-type>
: r/w/rw
Stream access from an external entity
s |
| <inverted-start-time> | e | <external-entity-id> | <access-type> |
---|---|---|---|---|---|
e |
| <inverted-start-time> | s | <stream-id> | <access-type> |
where:
s
: Identifies a stream access
<stream-id>
: The stream id containing ns+stream
<inverted-start-time>
: The start time of the run; inverted because HBase0.96 does not support reverse scan.
e
: Identifies that this access was made from an external entity
<external-entity-id>
: The id of the external entity, which is a combination of the source
query param specified in the modified Stream Write REST APIs below and an access timestamp (eg. day timestamp). It is okay to have timestamp in the external entity id, since we will not be computing lineage for external entities.
<access-type>
: r/w/rw
Columns:
<stop-time> | <metadata> |
---|
<stop-time>
- Stop time of the program in ms, -1 if program is still running
<metadata>
- JSON containing business and system metadata, even though storing system metadata could mean a duplication. This will be null for dataset and stream rows. This will not contain run information like runtime arguments, workflow token etc. These will be looked up from the run record table when required.
Lineage Computation
Lineage can be computed for streams and datasets using the lineage table. Lineage computation is a breadth first search on the lineage table.
Given a dataset dataset1 and a time range time1 and time2, the following scan and filter will compute one iteration of the breadth first search.
Scan
start row: d-dataset1-inverted-time2
stop row: d-dataset1-inverted-min(running-program-start-times)
Filter
stop time == -1 OR stop time >= time1
This scan will satisfy the following condition - (time1 <= stop-time < time2) OR (start-time < time2 AND (stop-time == -1 OR stop-time >= time2))
min(running program start times) <= start time < time2 AND (stop time == -1 OR stop time >= time1)
inverted min(running program start times) > inverted start time >= inverted time2 AND (stop time == -1 OR stop time >= time1)
The above scan will give us all programs that accessed dataset dataset1. We can continue the scans with the programs to go further down the search. Note that each node in the graph will need a separate scan to move forward in the search.
When is metadata stored in this table?
- Every time a dataset access happens - It will make querying expensive
- First access - May be less performant, since it will include a read + write
- Latest access - Easy to implement, and query, but may show inconsistent metadata for the same program run over time, if the metadata is updated using a REST API call during the run.
Should we store some notion of app spec/app version at the time of the run, since there is no way to get that information, given a run-id today?
When an app is deleted, its historical data is not deleted.
In the lineage table, for 3.2, the access type will always be null
REST APIs
The following REST APIs will be exposed from the metadata service.
Purpose | API | Body | Response | Routable | Comments | Approved? |
---|---|---|---|---|---|---|
Annotate business metadata for datasets | POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties | { "key1" : "value1", "key2" : "value2", //... } | 200: Successful 404: Dataset not found in specified namespace | Yes |
|
|
Annotate business metadata for apps | POST /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties | { "key1" : "value1", "key2" : "value2", //... } | 200: Successful 404: App not found in specified namespace | Yes |
|
|
Annotate business metadata for programs | POST /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties | { "key1" : "value1", "key2" : "value2", //... } | 200: Successful 404: Program not found in specified namespace | Yes |
|
|
Annotate business metadata for streams | POST /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties | { "key1" : "value1", "key2" : "value2", //... } | 200: Successful 404: Stream not found in specified namespace | Yes |
|
|
Note: will be part of client call. | POST /v3/metadata/history | { "programRun": { "namespace" : "ns1", "app": "app1", "programType": "MAPREDUCE", "name": "pgm1", "runId": "r1" }, "data": { "namespace": "ns1", "type": [STREAM|DATASET], "name": "d1" }, "accessType": "in", "metadata": { "user": { "key1": "value1", ... }, "system": { "k1": "v1", ... } } } | 200: Successful 404: App/dataset not found in specified namespace (proper error message) | No |
| |
Retrieve business metadata for datasets | GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties | N/A | 200: Successful 404: Dataset not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes | ||
Retrieve business metadata for apps | GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties | N/A | 200: Successful 404: App not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes | ||
Retrieve business metadata for programs | GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties | N/A | 200: Successful 404: Program not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes | ||
Retrieve business metadata for streams | GET /v3/namespaces/{namespace-id}/stream/{stream-id}/metadata/properties | N/A | 200: Successful 404: Stream not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes | ||
Retrieve system metadata for datasets | GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties/system | N/A | 200: Successful 404: Dataset not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes |
| |
Retrieve system metadata for apps | GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties/system | N/A | 200: Successful 404: App not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes |
| |
Retrieve system metadata for programs | GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties/system | N/A | 200: Successful 404: Program not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes |
| |
Retrieve system metadata for streams | GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties/system | N/A | 200: Successful 404: Stream not found in specified namespace { "key1" : "value1", "key2" : "value2", //... } | Yes |
| |
Delete all business metadata for datasets | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties |
N/A | 200: Successful 404: Dataset not found in specified namespace | Yes | ||
Delete selected key from business metadata for datasets | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/properties/{key} |
N/A | 200: Successful 404: Dataset not found in specified namespace | Yes | ||
Delete all business metadata for apps | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties |
| 200: Successful 404: App not found in specified namespace | Yes | ||
Delete selected key from business metadata for apps | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/properties/{key} |
| 200: Successful 404: App not found in specified namespace | Yes | ||
Delete all business metadata for programs | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties |
| 200: Successful 404: Program not found in specified namespace | Yes | ||
Delete all business metadata for programs | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/properties/{key} |
| 200: Successful 404: Program not found in specified namespace | Yes | ||
Delete all business metadata for streams | DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties |
| 200: Successful 404: Stream not found in specified namespace | Yes | ||
Delete selected key from business metadata for streams | DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/properties/{key} |
| 200: Successful 404: Stream not found in specified namespace | Yes | ||
Search Datasets containing business metadata | GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=dataset
| N/A | 200: Successful ["dataset1", "dataset2"] | Yes |
| |
Search Apps containing business metadata | GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=app | N/A | 200: Successful ["app1", "app2"] | Yes |
| |
Search Programs containing business metadata | GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=program | N/A | 200: Successful ["app1", "app2"] | Yes |
| |
Search Streams containing business metadata | GET /v3/namespaces/{namespace-id}/metadata/search?query=term&target=stream | N/A | 200: Successful ["stream1", "stream2"] | Yes |
| |
View Dataset Lineage | GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | N/A | 200: Successful Response TBD, but will contain a DAG representation | Yes | ||
View Stream Lineage | GET /v3/namespaces/{namespace-id}/streams/{stream-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | N/A | 200: Successful Response TBD, but will contain a DAG representation | Yes | ||
View Run Id Accesses | POST /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/runs/{run-id}/accesses | N/A | 200: Successful Response Body TBD | Yes |
| |
View Dataset Lineage after specified stage | POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | TODO Note: Query params will become part of POST body. | Yes |
| ||
View Stream Lineage after specified stage | POST /v3/namespaces/{namespace-id}/streams/{stream-id}/lineage?start=<start-ts>&end=<end-ts>&maxLevels=<max-levels> | TODO Note: Query params will become part of POST body. | Yes |
| ||
Add tags to a dataset | POST /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags | ["tag1", "tag2"] | 200: Successful 404: Dataset not found in specified namespace | Yes |
|
|
Add tags to an app | POST /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags | ["tag1", "tag2"] | 200: Successful 404: App not found in specified namespace | Yes |
|
|
Add tags to a program | POST /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags | ["tag1", "tag2"] | 200: Successful 404: Program not found in specified namespace | Yes |
|
|
Add tags to a stream | POST /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags | ["tag1", "tag2"] | 200: Successful 404: Stream not found in specified namespace | Yes |
|
|
Retrieve dataset tags | GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags | N/A | ["tag1", "tag2"] | Yes | ||
Retrieve app tags | GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags | N/A | ["tag1", "tag2"] | Yes | ||
Retrieve program tags | GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags | N/A | ["tag1", "tag2"] | Yes | ||
Retrieve stream tags | GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags | N/A | ["tag1", "tag2"] | Yes | ||
Remove all dataset tags | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags | N/A
| 200: Successful 404: Dataset not found in specified namespace | Yes |
|
|
Remove specified dataset tag | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata/tags/{tag} | N/A
| 200: Successful 404: Dataset not found in specified namespace | Yes |
|
|
Remove all app tags | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags | N/A
| 200: Successful 404: App not found in specified namespace | Yes | ||
Remove specified app tag | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata/tags/{tag} | N/A
| 200: Successful 404: App not found in specified namespace | Yes | ||
Remove all program tags | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags | N/A
| 200: Successful 404: Program not found in specified namespace | Yes | ||
Remove specified program tag | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata/tags/{tag} | N/A
| 200: Successful 404: Program not found in specified namespace | Yes | ||
Remove all stream tags | DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags |
| 200: Successful 404: Stream not found in specified namespace | Yes | ||
Remove specified stream tag | DELETE /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata/tags/{tag} |
| 200: Successful 404: Stream not found in specified namespace | Yes | ||
Remove all business metadata for a dataset | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata |
| 200: Successful 404: Dataset not found in specified namespace | Yes | Removes all properties and tags from a dataset. Will not happen in 3.2 | |
Remove all business metadata for an app | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/metadata |
| 200: Successful 404: App not found in specified namespace | Yes | Removes all properties and tags from an app. Will not happen in 3.2 | |
Remove all business metadata for a program | DELETE /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata |
| 200: Successful 404: Program not found in specified namespace | Yes | Removes all properties and tags from a program. Will not happen in 3.2 | |
Remove all business metadata for a dataset | DELETE /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata |
| 200: Successful 404: Dataset not found in specified namespace | Yes | Removes all properties and tags from a dataset. Will not happen in 3.2 | |
Get all business metadata for a dataset | GET /v3/namespaces/{namespace-id}/datasets/{dataset-id}/metadata |
| 200: Successful 404: Dataset not found in specified namespace | Yes | Retrieves all properties and tags for a dataset. Will not happen in 3.2 | |
Get all business metadata for an app | GET /v3/namespaces/{namespace-id}/apps/{app-id}/metadata |
| 200: Successful 404: App not found in specified namespace | Yes | Retrieves all properties and tags for an app. Will not happen in 3.2 | |
Get all business metadata for a program | GET /v3/namespaces/{namespace-id}/apps/{app-id}/{program-type}/{program-id}/metadata |
| 200: Successful 404: Program not found in specified namespace | Yes | Retrieves all properties and tags for a program. Will not happen in 3.2 | |
Get all business metadata for a stream | GET /v3/namespaces/{namespace-id}/streams/{stream-id}/metadata |
| 200: Successful 404: Stream not found in specified namespace | Yes | Retrieves all properties and tags for a stream. Will not happen in 3.2 |
Lineage Output JSON
{ "start": "1441310434000", "end": "1441320599000", "relations": [ { "data": "stream.default.purchaseStream", "program": "flow.default.PurchaseHistory.PurchaseFlow", "access": "read", "runs": ["283-afsd032-adsf90", "283-0rwedfk-09wrff"], "component": ["reader"] }, { "data": "flow.default.PurchaseHistory.PurchaseFlow", "program": "dataset.default.purchases", "access": "write", "runs": ["283-afsd032-adsf90", "283-0rwedfk-09wrff"], "component": ["collector"] }, { "data": "flow.default.PurchaseHistory.PurchaseFlow", "program": "dataset.default.frequentCustomers", "access": "write", "runs": ["283-0rwedfk-09wrff"], "component": ["collector"] }, { "data": "flow.default.PurchaseHistory.PurchaseFlow", "program": "dataset.default.purchases", "access": "read", "runs": ["283-0rwedfk-09wrff"], "component": ["collector"] }, { "data": "dataset.default.purchases", "program": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder", "access": "read", "runs": ["283-afsd032-adsf90"] }, { "data": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder", "program": "dataset.default.history", "access": "write", "runs": ["283-99sd032-adsf90", "283-88wedfk-09wrff"] }, { "data": "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder", "program": "dataset.default.frequentCustomers", "access": "write", "runs": ["283-99sd032-adsf90", "283-88wedfk-09wrff"] }, { "data": "dataset.default.history", "program": "service.default.PurchaseHistory.PurchaseHistoryService", "runs": ["283-zsed032-adsf90"] } ], "programs": { "flow.default.PurchaseHistory.PurchaseFlow": { "id": { "namespace": "default", "application": "PurchaseHistory", "type": "flow", "id": "PurchaseFlow" } }, "mapreduce.default.PurchaseHistory.PurchaseHistoryBuilder": { "id": { "namespace": "default", "application": "PurchaseHistory", "type": "mapreduce", "id": "PurchaseHistoryBuilder" } }, "service.default.PurchaseHistory.PurchaseHistoryService": { "id": { "namespace": "default", "application": "PurchaseHistory", "type": "flow", "id": "PurchaseHistoryService" } } }, "data": { "dataset.default.frequentCustomers": { "id": { "namespace": "default", "type": "dataset", "id": "frequentCustomers" } }, "dataset.default.history": { "id": { "namespace": "default", "type": "dataset", "id": "history" } }, "dataset.default.purchases": { "id": { "namespace": "default", "type": "dataset", "id": "purchases" } }, "stream.default.purchaseStream": { "id": { "namespace": "default", "type": "stream", "id": "purchaseStream" } } } }
Java APIs
Applications
TBD - Out of scope for 3.2
Datasets
The following dataset APIs will be updated to help CDAP to compute lineage.
We will add ability to dataset/streams to programmatically annotate accesses as read/write. Details TBD. This is out of scope for 3.2.
For 3.2, users will not be able to tell if a dataset/stream access is a read, write or read/write. They will only be able to see connections.
UseDataset
The @UseDataset
annotation will be extended to accept an accessType
. accessType
can either be IN, OUT or UNKNOWN to indicate if the dataset is being used as input or output. If a program both reads from and writes to a dataset, it will have both IN and OUT accessType
.
getDataset()
The getDataset
API will be extended to accept an accessType
. accessType
can either be IN, OUT or UNKNOWN to indicate if the dataset is being used as input or output. If a program both reads from and writes to a dataset, it will have both IN and OUT accessType
.
Both these API updates will be backward-compatible. For usages that don't specify accessType
, we will presume it to be UNKNOWN
Stream
No Java API changes needed. Whenever a StreamWriter
gets a call to write to a stream, we will capture and register the call as an OUT call. Every other Stream interaction becomes an IN event.
Existing REST API changes
Stream
Optionally allow users to specify a query param for stream write (both batch and single enqueue endpoints) REST APIs to identify the source for the stream they are writing to.
|
|
| |
|
|
| |
|
|
|
Even though we accept a source for every stream event enqueue, for the same stream-id
and source
, we will only write to the Lineage table once per day.
Notifications
Whenever metadata on a CDAP entity (Dataset/Stream/Application/Program) is updated, the metadata system will publish a notification using the Notification System. External systems that want to subscribe to these notifications will have to:
Write a worker that subscribes to these notificationsThe worker can take any action that it wants (write to a dataset, send the notification to an external API, write to an external filesystem, start a program, etc) to when a notification is received
Implementation of the worker is out of scope of this document.
Whenever metadata on a CDAP entity (Dataset/Stream/Application/Program) is added/updated/deleted, the metadata system will publish a notification to a Kafka topic. The Kafka topic name will be configurable. External systems that would like to subscribe to these notifications will have to subscribe to this Kafka topic to receive messages. The content of each such message will be a json representation of the following class:
/* * Copyright © 2015 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package co.cask.cdap.proto; import java.util.List; import java.util.Map; import javax.annotation.Nullable; /** * Represents a Metadata change for a given {@link Id.NamespacedId}, including its previous state, the new state after * the change occurred, the change itself, and the time that the change occurred. */ public final class MetadataChangeRecord { private final MetadataRecord previous; private final MetadataRecord updated; private final MetadataDiffRecord changes; private final Id.NamespacedId targetId; private final long updateTime; private final String updater; public MetadataChangeRecord(MetadataRecord previous, MetadataRecord updated, MetadataDiffRecord changes, Id.NamespacedId targetId, long updateTime) { this(previous, updated, changes, targetId, updateTime, null); } public MetadataChangeRecord(MetadataRecord previous, MetadataRecord updated, MetadataDiffRecord changes, Id.NamespacedId targetId, long updateTime, @Nullable String updater) { this.previous = previous; this.updated = updated; this.changes = changes; this.targetId = targetId; this.updateTime = updateTime; this.updater = updater; } public MetadataRecord getPrevious() { return previous; } public MetadataRecord getUpdated() { return updated; } public MetadataDiffRecord getChanges() { return changes; } public Id.NamespacedId getTargetId() { return targetId; } public long getUpdateTime() { return updateTime; } @Nullable public String getUpdater() { return updater; } /** * Represents the metadata state for a given {@link Id.NamespacedId} at a point in time */ public static final class MetadataRecord { private final Map<String, String> properties; private final List<String> tags; public MetadataRecord(Map<String, String> properties, List<String> tags) { this.properties = properties; this.tags = tags; } public Map<String, String> getProperties() { return properties; } public List<String> getTags() { return tags; } } /** * Represents the changes between the previous and the new record */ public static final class MetadataDiffRecord { private final MetadataRecord additions; private final MetadataRecord deletions; public MetadataDiffRecord(MetadataRecord additions, MetadataRecord deletions) { this.additions = additions; this.deletions = deletions; } public MetadataRecord getAdditions() { return additions; } public MetadataRecord getDeletions() { return deletions; } } }
Caveats/Assumptions
- Metadata can only be added to datasets created/updated with the first version containing this feature (3.2). There will not be a tool to allow tagging for historical data.
- We will only support Strings as the datatype for metadata values. Other types will not be supported, at least to begin with.
- The Usage Dataset has some overlap with the Metadata System. However, at least to begin with, there would be no integration with Usage Dataset. Usage Dataset can be thought of as a purely static representation of dataset usage with no notion of runtime.
CLI
TBD
Questions/TODOs
- Find a better term for metadata.
- What term is better? USER or BUSINESS?
- Should we build metadata as a list of tags or a list of key-value pairs?
- It will be stored as key-value pairs in the backend. UI can show it delimiter separated if that is how the mocks are.
- How do we store tags so that they are searchable?