...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
Overview
Motivation
...
Overview
Motivation
The goal of this application is to provide users with an extensible CDAP application to help them determine the quality of their data. Users will have the ability to assess the quality of their data using some out of box functionality and libraries. Furthermore, users will also be able to extend the application with their own aggregation functions and customizations. Finally, we’d like to provide users with a way to query the results of the quality metric computations via RESTful APIs.
Example Use Cases
- Suppose the user wants to take in a stream of Apache Access log data (CLF format) to generate several histograms of status codes and referrers (partitioned by time) and then wants to be able to query ranges of timestamps for aggregated histogram data.
- Another such example would be if a user wanted to figure out which IP addresses were trying to access a given service, and the user had the related Apache Access Log data stored in a table — partitioned by time. The application could be configured to read in the IP addresses from the table and compute a unique aggregation on each set of IP addresses for each time interval. After that, the user could once again query the aggregated data while specifying relevant timestamps.
...
Requirements
Read in multiple data formats from a generic source
Example: read in from stream of CLF data
Data Quality should be assessed in batch and the results should be computed in periodic intervals (i.e. each hour)
Map Reduce program that will take in log data as input, apply an aggregation function, and store the results of the aggregation in the table.
The table should be partitioned by time
Allow the user to query aggregated data over various time intervals.
Library of aggregation functions
Allow the aggregation function logic to be pluggable (allow users to plug in their own aggregation functions)
Allow the source type and input data format to be configurable
- UI
Determine when there are anomalies/suspicious patterns (using the aggregated data).
Explore if it’s possible to attach a level of confidence to these findings
Alert the user via email when such things occur
Examples:
Suppose 90% of the status codes are consistently 200’s and all of a sudden, in the adjacent time interval, 70% of the status codes coming in are a mix of 404’s and 500’sof 404’s and 500’s. We would want to be able to determine that this is a situation worth alerting the user via email over.
Suppose the frequencies of referrers over a standard length time interval are fairly equally distributed up until a certain interval at which point the frequencies are largely skewed towards one referrer. We would want to be able to determine alert the user that this referrer is a situation worth alerting the user via email over.
Suppose the frequencies of referrers over a standard length time interval are fairly equally distributed up until a certain interval at which point the frequencies are largely skewed towards one referrer. We would want to be alert the user that this referrer is likely a bot (possibly even with a confidence level)
Service for combinable aggregations (example: frequency/histogram)
Handler for retrieving aggregations
Returns a JSON of aggregated values
Design
Implicit Table Schema
- The reducer writes the results of the data aggregations to a Table following the following implicit Table schema:
- Row key : v.SourceID.FieldName.Timestamp
- Column key: Aggregation function name
- Value: Aggregate for a field over some time window
- In addition, the reducer will write the list of aggregation types available for each source and time interval combination following the following implicit Table schema:
- Row key: a.SourceID.Timestamp
- Column key: FieldName
- Value: A list of objects -- each object corresponds to an aggregation type.
Services
We need two services: one for aggregated data that can be combined in a meaningful way over multiple timestamps, and one for data that cannot be combined because combining such aggregations (like in the case of multiple standard deviations corresponding to multiple timestamps) is nonsensical.
likely a bot (possibly even with a confidence level)
Design
Implicit Table Schema
- The reducer writes the results of the data aggregations to a Table following the following implicit Table schema:
- Row key : v.SourceID.FieldName.Timestamp
- Column key: Aggregation function name
- Value: Aggregate for a field over some time window
- In addition, the reducer will write the list of aggregation types available for each source and time interval combination following the following implicit Table schema:
- Row key: a.SourceID.Timestamp
- Column key: FieldName
- Value: A list of objects -- each object corresponds to an aggregation type.
Services
We need two services: one for aggregated data that can be combined in a meaningful way over multiple timestamps, and one for data that cannot be combined because combining such aggregations (like in the case of multiple standard deviations corresponding to multiple timestamps) is nonsensical.
Service for combinable aggregations (example: frequency/histogram)
Handler for retrieving aggregations
- Returns a JSON of aggregated values
- Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations/{aggregationName}?startTimestamp={start_ts}&endTimestamp={end_ts}
- Path param: source ID
- Path param: field name
- Path param: aggregation type
- Query param: start timestamp
- Query param: end timestamp
- The service will return a JSON containing the aggregated values that were queried. Here is an example of the format of something the service might return if the user wants to see a discrete values histogram over a given time range:
Code Block |
---|
{
"data": {
"200": 504,
"400": 12,
"404": 23,
"500": 2
}
} |
Hander for retrieving the fields that have been aggregated over for a given sourceID.
Returns a JSON of field names
Request format: /v1/source/{sourceID}/fields
...
/
...
?startTimestamp={start_ts}&endTimestamp={end_ts}
- Path param: source ID
...
...
- Query param: start timestamp
- Query param: end timestamp
- The service will return a JSON containing
...
- the available field names for the given parameters (sourceID, time interval). Here is an example of the format of something the service might return if the user wants to see
...
{
“data” : {
"200": 504,
"400": 12,
“404”: 23,
“500”: 2
}
}
Hander for retrieving the fields that have been aggregated over for a given sourceID.
Returns a JSON of field names- the available aggregation functions for stream1 between timestamps 1423371600 and 1423372900
Code Block |
---|
{
"content_length":[
{
"name":"median",
"combinable":false
},
{
"name":"max",
"combinable":true
}
]
} |
Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.
Returns a JSON of aggregation types
Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Path param: field name
Query param: start timestamp
Query param: end timestamp
The service will return a JSON containing the available
...
aggregation types for the given parameters (sourceID, fieldName, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1, content_length, between timestamps 1423371600 and 1423372900
{
"content_length": [
...
Code Block |
---|
[ { "name":"median", "combinable":false }, |
...
{ "name":"max", "combinable":true |
...
],
...
}
...
Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.
...
}
]
|
Service for non-combinable aggregations (example: standard deviation)
Handler for retrieving aggregations
- Returns a timeseries
Request format : /v1/source/{sourceID
- Returns a timeseries
...
/fields/{
...
fieldName}/
...
timeseries/{aggregationType}?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Path param: field name
Path param: aggregation type
Query param: start timestamp
Query param: end timestamp
...
This service will return a JSON containing the
...
[
{ "name":"median", "combinable": false },
{ "name":"max", "combinable": true },
...
]
Service for non-combinable aggregations (example: standard deviation)
Handler for retrieving aggregations
Returns a timeseries
Request format:
/v1/source/{sourceID/fields/{fieldName}/timeseries/{aggregationType}?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Path param: field name
Path param: aggregation type
Query param: start timestamp
Query param: end timestamp
This service will return a JSON containing the aggregated values that were queried. For instance, if a user queries the average of content length over a given time range, the service will likely return something of the following format:
"timeValues": [
{
"timestamp": 1423371600,
"data": 122240
},
{
"timestamp": 1423375200,
"data": 122240
},
{
"timestamp": 1423378800,
"data": 121732
},
{
"timestamp": 1423382400,
"data": 122240
},
{
"timestamp": 1423386000,
"data": 121732
},
{
"timestamp": 1423389600,
"data": 122240
},
{
"timestamp": 1423393200,
"data": 121732
},
{
"timestamp": 1423396800,
"data": 47327
}
]
...
aggregated values that were queried. For instance, if a user queries the average of content length over a given time range, the service will likely return something of the following format:
Code Block |
---|
{
"timeValues":[
{
"timestamp":1423371600,
"data":122240
},
{
"timestamp":1423375200,
"data":122240
},
{
"timestamp":1423378800,
"data":121732
},
{
"timestamp":1423382400,
"data":122240
},
{
"timestamp":1423386000,
"data":121732
},
{
"timestamp":1423389600,
"data":122240
},
{
"timestamp":1423393200,
"data":121732
},
{
"timestamp":1423396800,
"data":47327
}
]
} |
Handler for retrieving the field names that have been aggregated over for a given sourceID
This will be identical to the second handler for combinable aggregations mentioned above.
Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.
- This will be identical to the third handler for combinable aggregations mentioned above.
Library of Aggregation Functions
...
We want to provide a set of basic aggregation functions for users to use right off the bat and also base their custom aggregation functions’ structures off of. The following are some of the functions we’d like to include:
For discrete values:
For numerical values:
Generate histogram (sum of frequencies for each field)
Average
Standard deviation
Median
For categorical values:
Generate histogram (sum of frequencies for each field)
Unique values
...
Average
Standard deviation
Median
Determine buckets, and generate histogram
...
Structure of Aggregation Functions
This is important because it dictates the structure of users’ custom aggregation functions.
Interfaces
|
|
Application API Usages
Usage of void add(DataQualityWritable value) and byte[] aggregate()
|
Usage of T retrieveAggregation() and void combine(byte[] values)
|
Example Aggregation Function
Below is code demonstrating how a discrete values histogram aggregation function would look like using the aforementioned APIs.
|
Aggregation Function Customization
Users can build custom aggregation functions using the APIs described above in a manner that is similar to the example above. After writing a custom class for an aggregation function, they will simply have to pass in their custom aggregation class name into the application configuration JSON.
...
Jumping back to the first use case mentioned under Example Use Cases: Suppose the user wants to take in a stream of CLF log data to generate several histograms partitioned by time of distributions of status codes and referrers and then wants to be able to query ranges of timestamps for aggregated histogram data.
They would create a Data Quality Application by specifying the following config JSON
{
“aggregationName”: “discreteValuesHistogram”
“fieldsCSV” = “status, referrer”,
“workflowScheduleMinutes” = 5,
“datatInputFormat”: “clf”,
“sourceType”: “stream”
...
Code Block |
---|
{
"aggregationName":"discreteValuesHistogram",
"fieldsCSV":"status, referrer",
"workflowScheduleMinutes":5,
"datatInputFormat":"clf",
"sourceType":"stream"
} |
After some time, suppose the user want to be able to query the stored data from timestamp 1436220867548 to 1436220868900
The service requests would look like this:
/v1/fields/status/totals/discreteValuesHistogram?startTimestamp=1436220867548&endTimestamp=1436220868900
/v1/fields/referrer/totals/discreteValuesHistogram?startTimestamp=1436220867548&endTimestamp=1436220868900
The results produced by the service would look something like the following for status codes (the results for the referrer query would be of an identical format):
{
Code Block |
---|
{ "200":504, |
...
"400":12, |
...
"404":23, |
...
"500":2 |
...
} |
UI
...
The idea is to have a timeline on which the user can select a time range and sourceID. Once that selection happens, the user will be prompted with a list of fields over which they can aggregate over. Once the user selects a field from this list, he/she will then be prompted with a list of aggregation types. Once the user ultimately picks an aggregation type, they will be served a JSON containing the aggregated values that correspond to the time range, sourceID, field, and aggregation type selected previously.
...