Data Quality Application
...
Suppose the user wants to take in a stream of Apache Access log data (CLF format) to generate several histograms of status codes and referrers (partitioned by time) and then wants to be able to query ranges of timestamps for aggregated histogram data.
Another such example would be if a user wanted to figure out which IP addresses were trying to access a given service, and the user had the related Apache Access Log data stored in a table — partitioned by time. The application could be configured to read in the IP addresses from the table and compute a unique aggregation on each set of IP addresses for each time interval. After that, the user could once again query the aggregated data while specifying relevant timestamps.
Requirements
Read in multiple data formats from a generic source
Data Quality should be assessed in batch and the results should be computed in periodic intervals (i.e. each hour)
Map Reduce program that will take in log data as input, apply an aggregation function, and store the results of the aggregation in the table.
Allow the user to query aggregated data over various time intervals.
Library of aggregation functions
Allow the aggregation function logic to be pluggable (allow users to plug in their own aggregation functions)
Allow the source type and input data format to be configurable
Determine when there are anomalies/suspicious patterns (using the aggregated data).
Design
Implicit Table Schema
...
Service for combinable aggregations (example: frequency/histogram)
Handler for retrieving aggregations
Returns a JSON of aggregated values
Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations/{aggregationName}?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Path param: field name
...
Hander for retrieving the fields that have been aggregated over for a given sourceID.
Returns a JSON of field names
Request format: /v1/source/{sourceID}/fields/?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Query param: start timestamp
...
The service will return a JSON containing the available field names for the given parameters (sourceID, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1 between timestamps 1423371600 and 1423372900
{
"content_length": [
{ "name":"median", "combinable": false },
{ "name":"max", "combinable": true },
],
...
}
Handler for retrieving the aggregation types for a given sourceID, fieldName, and time interval.
Returns a JSON of aggregation types
Request format: /v1/source/{sourceID}/fields/{fieldname}/aggregations?startTimestamp={start_ts}&endTimestamp={end_ts}
Path param: source ID
Path param: field name
...
The service will return a JSON containing the available aggregation types for the given parameters (sourceID, fieldName, time interval). Here is an example of the format of something the service might return if the user wants to see the available aggregation functions for stream1, content_length, between timestamps 1423371600 and 1423372900
[
{ "name":"median", "combinable": false },
{ "name":"max", "combinable": true },
...
]
Service for non-combinable aggregations (example: standard deviation)
Handler for retrieving aggregations
Returns a timeseries
Request format:
...
This is important because it dictates the structure of users’ custom aggregation functions.
Interfaces
Code Block |
---|
| /**
* Basic Aggregation Function Interface: All aggregation function classes -- combinable or * non-combinable-- should implement these methods
* @param <T> Type of the aggregation
*/
public interface BasicAggregationFunction<T> {
/**
* Incrementally add values to the aggregation
* @param value is a DataQualityWritable that we want to add to the running aggregation
*/
void add(DataQualityWritable value);
/**
* Return the aggregation accumulated using add() in the form of a byte[]
* @return a byte[] that represents the final aggregated value
*/
byte[] aggregate();
/**
* Deserialize a given value appropriately
*/
T deserialize(byte[] value);
} |
|
Code Block |
---|
| /**
* Aggregation Function Interface
* This is for aggregation that still make sense when combined.
*
* An example of such a function would be a discrete values histogram. If several
* histograms (each corresponding to various time intervals) were combined
* the result would be a histogram that would represent the frequencies of
* various values over the combined time interval
* @param <T> Aggregation type
*/
public interface CombinableAggregationFunction<T> extends BasicAggregationFunction {
/**
* Retrieve a combined aggregation
* @return T is the type of the combined aggregation
*/
T retrieveAggregation();
/**
* Combine existing aggregations one-by-one
* @param values is a byte[] that we want to add to the running combined aggregation
*/
void combine(byte[] values);
} |
|
...
Application API Usages
Usage of void add(DataQualityWritable value) and byte[] aggregate()
Code Block |
---|
| @Override
public void reduce(Text key, Iterable<DataQualityWritable> values, Context context)
throws IOException, InterruptedException {
if (fieldsSet.contains(key.toString()) || fieldsCSV.equals("")) {
try {
Class<?> aggregationClass = Class.forName("data.quality.app.functions." +
aggregationFunctionClassName);
BasicAggregationFunction instance = (BasicAggregationFunction)
aggregationClass.newInstance();
for (DataQualityWritable value : values) {
instance.add(value);
}
FieldTimestampKey fieldTimestampKey = new FieldTimestampKey(timeKey, key.toString());
context.write(fieldTimestampKey.getTableRowKey(), new Put(fieldTimestampKey.getTableRowKey(),
Bytes.toBytes(aggregationFunctionClassName), instance.aggregate()));
} catch (ClassNotFoundException | RuntimeException | InstantiationException |
IllegalAccessException e) {
throw new RuntimeException(e);
}
}
} |
|
...
Usage of T retrieveAggregation() and void combine(byte[] values)
Code Block |
---|
| /**
* Handler class for Data Quality Histogram Service
*/
@Path("/v1")
public static final class HistogramLookup extends AbstractHttpServiceHandler {
@UseDataSet("logDataStore")
Table logDataStore;
@Path("fields/{fieldName}/aggregations/{aggregationType}")
@GET
public void handler(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("fieldName") String fieldName, @PathParam("aggregationType")
String aggregationType,
@QueryParam("startTimestamp") @DefaultValue("0") long startTimestamp,
@QueryParam("endTimestamp") @DefaultValue("9223372036854775807")
long endTimestamp) throws IOException {
FieldTimestampKey fieldStartTimestampKey = new FieldTimestampKey(startTimestamp,
fieldName);
FieldTimestampKey fieldEndTimestampKey =
new FieldTimestampKey(endTimestamp + 1 , fieldName); // scan rows inclusive of
endTimestamp
Scanner scanner = logDataStore.scan(fieldStartTimestampKey.getTableRowKey(),
fieldEndTimestampKey.getTableRowKey());
try {
Class<?> aggregationClass = Class.forName("data.quality.app.functions." +
aggregationType);
CombinableAggregationFunction aggregationClassInstance =
(CombinableAggregationFunction) aggregationClass.newInstance();
Row row;
byte[] aggregationTypeBytes = Bytes.toBytes(aggregationType);
try {
while ((row = scanner.next()) != null) {
Map<byte[], byte[]> columnsMapBytes = row.getColumns();
byte[] output = columnsMapBytes.get(aggregationTypeBytes);
if (output != null) {
aggregationClassInstance.combine(output);
}
}
} finally {
scanner.close();
}
Object output = aggregationClassInstance.retrieveAggregation();
responder.sendJson(output == null ? 404 : 200, output == null ? aggregationType +
" is not a valid CombinableAggregationFunction" : output);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
new RuntimeException(e);
}
}
} |
|
...
Below is code demonstrating how a discrete values histogram aggregation function would look like using the aforementioned APIs.
Code Block |
---|
| /**
* Discrete Values Histogram Aggregation Function
*/
public class DiscreteValuesHistogram
implements BasicAggregationFunction, CombinableAggregationFunction<Map<String, Integer>> {
private static final Gson GSON = new Gson();
private static final Type TOKEN_TYPE_MAP_STRING_INTEGER = new TypeToken<Map<String,
Integer>>() { }.getType();
private Map<String, Integer> histogramMap = Maps.newHashMap();
private Map<String, Integer> aggregatedHistogramMap = Maps.newHashMap();
public void combine(byte[] value) {
Map<String, Integer> outputMap = deserialize(value);
for (Map.Entry<String, Integer> entry : outputMap.entrySet()) {
Integer val = aggregatedHistogramMap.get(entry.getKey());
aggregatedHistogramMap.put(entry.getKey(), val == null ? entry.getValue() : val +
entry.getValue());
}
}
public Map<String, Integer> deserialize(byte[] valueBytes){
return GSON.fromJson(Bytes.toString(valueBytes), TOKEN_TYPE_MAP_STRING_INTEGER);
}
public Map<String, Integer> retrieveAggregation() {
return aggregatedHistogramMap.isEmpty() ? null : aggregatedHistogramMap;
}
public void add(DataQualityWritable value) {
Integer mapVal = histogramMap.get(value.get().toString());
if (mapVal != null) {
histogramMap.put(value.get().toString(), mapVal + 1);
} else {
histogramMap.put(value.get().toString(), 1);
}
}
public byte[] aggregate() {
return Bytes.toBytes(GSON.toJson(histogramMap));
}
} |
|
...