AWS DynamoDb batch source
- abhinavc (Unlicensed)
- Rashi Gandhi
Introduction
A batch source that ingests data from dynamodb into hydrator pipelines.
Use case(s)
- For an organization all the spam emails are being dumped to dynamodb table. As a data scientist I want to train my machine learning models in hydrator pipelines based on the data from the dynamodb tables.
User Storie(s)
- User should be able to provide the table name in DynamoDb.
- User should be able to provide the AWS endpoint url for DynamoDb instance.
- User should be able to provide the AWS region id for DynamoDb instance.
- User should be able to provide the throughput for DynamoDb instance. (Dynamo db charges are incurred based on throughput and user should be able to control the throughput)
- User should be able to provide the AWS access id.
- User should be able to provide the AWS access key.
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Configurables
This section defines properties that are configurable for this plugin.
User Facing Name | Type | Description | Constraints |
---|---|---|---|
Table name | String | Name of the dynamo db table | Naming convention constraints from AWS |
endpoint url | String | AWS endpoint url for DynamoDb instance | Optional, could be reconstructed using regionId |
region id | String | AWS region id for DynamoDb instance. | Optional, with default value set as us_west_2 |
access id | password | AWS access key | |
access key | password | AWS access secret key | |
query | String | Query to get the data |
|
filterQuery | String | Query to filter the fetched data, before returning to the client | (Optional) |
nameMap | String | Comma separated list of key value pair, where key is the Attribute name place holder used in Query/FilterQuery and value to replace the placeholders | (Optional) |
valueMap | String | Comma separated list of key value pair, where key is the value place holder used in Query/FilterQuery for items to be searched and value to replace the placeholders. | |
placeholderType | String | Attribute value placeholder and its type |
|
readThroughput | String | Read Throughput for AWS DynamoDB table to connect to in double. Default is 1 | (Optional) |
readThroughputPercentage | String | Read Throughput Percentage for AWS DynamoDB table to connect to. Default is 0.5. | (Optional) |
schema | String | Specifies the schema that has to be output. If not specified, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord. |
Design / Implementation Tips
- For Testing purposes tables can be created either using AWS cli or using java code http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/JavaDocumentAPITablesExample.html
- AWS dynamoDb cli refrence http://docs.aws.amazon.com/cli/latest/reference/dynamodb/
- Java example for CRUD operations http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/batch-operation-document-api-java.html
- Java Example for working with queries http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryingJavaDocumentAPI.html
- Please reuse/modify RecordReader, InputFormat classes present here https://github.com/awslabs/emr-dynamodb-connector
Design
Dynamo Db JSON Format:
{
"name": "DynamoDb",
"type": "batchsource",
"properties": {
"accessKey": "xyz",
"secretAccessKey": "abc",
"regionId": "us-east-1",
"tableName": "Movies",
"query": "Id = :v_Id",
"filterQuery": "rating > :v_rating",
"valueMap": ":v_Id|120,:v_rating|18",
"placeholderType": ":v_Id|int,:v_rating|int"
}
}
Approach(s)
- Dropdown with the list of regions will be provided to user, to select the region for AWS Dynamo DB to connect to. Supported regions are:
"us-gov-west-1", "us-east-1", "us-east-2", "us-west-1", "us-west-2", "eu-west-1", "eu-west-2", "eu-central-1", "ap-south-1","ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "cn-north-1", "ca-central-1". (Referred from: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/
http://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region) - If user does not select any region, then default region will be used, i.e. us_west_2.
- User will provide the query in the form of keyCondition expression through “Query” widget, that will be used to fetch the items from DynamoDb table. For example: Id = :v_id and rating > :v_rating
Referred from : http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/datamodeling/DynamoDBQueryExpression.html#setKeyConditionExpression-java.lang.String- - If there is any filter query required to fetch the data, it can be provided through “Filter Query” widget, in similar fashion the query is provided.
- User will provide the actual value for the placeholders used in Query/FilterQuery through nameMappings and valueMappings widget respectively.
- User will provide the type for the placeholder i.e the attribute values placeholderType widget.
- Current implementation supports 'boolean, int, long, double, float and string' types for searching, i.e attibute value type.
If user does not provide the output schema, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord.
- Supported schema types for output fields are: "boolean, bytes, double, float, int, long and string"
- Conditions:
8.1) Table must exists in the DynamoDB, before reading the items. If not, then it will result into the runtime failure.
8.2) Query must follow the DynamoDB rules and supported format. Any mismatch in the query will result into the runtime failure.
Properties
- referenceName: This will be used to uniquely identify this source for lineage, annotating metadata, etc.
- accessKey: Access key for AWS DynamoDB to connect to. (Macro Enabled)
- secretAccessKey: Secret access key for AWS DynamoDB to connect to. (Macro Enabled)
- regionId: The region for AWS DynamoDB to connect to. Default is us_west_2 i.e. US West (Oregon).
- endpointUrl: The hostname and port for AWS DynamoDB instance to connect to, separated by a colon. For example,localhost:8000.
- tableName: The DynamoDB table to read the data from.
- query: Query to read the items from DynamoDB table. Query must include a partition key value and an equality condition and it must be specified in the following format: 'hashAttributeName = :hashval'. For example, ID = :v_id or ID = :v_id AND Title = :v_title, if sort key condition is used to read the data from table. (Macro Enabled)
- filterQuery: Filter query to return only the desired items that satisfy the condition. All other items are discarded. It must be specified in the similar format like main query. For example, rating = :v_rating. (Macro Enabled)
- nameMappings: List of the placeholder tokens used as the attribute name in the 'Query or FilterQuery' along with their actual attribute names. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and actual attribute names. For example, '#yr|year', if the query is like: '#yr = :yyyy'. This might be necessary if an attribute name conflicts with a DynamoDB reserved word. (Macro Enabled)
- valueMappings: List of the placeholder tokens used as the attribute values in the 'Query or FilterQuery' along with their actual values. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and actual values. For example, ':v_id|256,:v_title|B', if the query is like: 'ID = :v_id AND Title = :v_title'. (Macro Enabled)
- placeholderType: List of the placeholder tokens used as the attribute values in the 'Query or FilterQuery' along with their data types. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and its type. For example, ':v_id|int,:v_title|string', if the query is like: 'ID = :v_id AND Title = :v_title'. Supported types are: 'boolean, int, long, float, double and string'. (Macro Enabled)
- readThroughput: Read Throughput for AWS DynamoDB table to connect to, in double. Default is 1. (Macro Enabled)
- readThroughputPercentage: Read Throughput Percentage for AWS DynamoDB table to connect to. Default is 0.5. (Macro Enabled)
- schema: Specifies the schema that has to be output. If not specified, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord.
Security
- The AWS access keys should be a password field and macros enabled
NFR
1.This plugin should be able to read the data from DynamoDB table and emits the structured record to next stage successfully.
2.Only Performance measurement is in scope as part of NFR.
Limitation(s)
Future Work
Test Case(s)
DynamoDB batch source - query using partition key.
DynamoDB batch source - query using partition and sort key.
DynamoDB batch source - with query and filter query.
DynamoDB batch source - query with attribute name which is a DynamoDB reserve word and use of nameMappings for passing the attribute name.
DynamoDB batch source - query on 50K data
Sample Pipeline
DynamoDBSourcePipeline-cdap-data-pipeline.json
DynamoDBSource_WithSortQuery-cdap-data-pipeline.json
DynamoDBSource_WithFilterQuery-cdap-data-pipeline.json
DynamoDBSourceWithAttributeAsReservedWord-cdap-data-pipeline.json
Table of Contents
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature