AWS DynamoDb batch source

Introduction

A batch source that ingests data from dynamodb into hydrator pipelines.

Use case(s)

  • For an organization all the spam emails are being dumped to dynamodb table. As a data scientist I want to train my machine learning models in hydrator pipelines based on the data from the dynamodb tables.

User Storie(s)

  • User should be able to provide the table name in DynamoDb.
  • User should be able to provide the AWS endpoint url for DynamoDb instance.
  • User should be able to provide the AWS region id for DynamoDb instance.
  • User should be able to provide the throughput for DynamoDb instance. (Dynamo db charges are incurred based on throughput and user should be able to control the throughput)
  • User should be able to provide the AWS access id.
  • User should be able to provide the AWS access key.

Plugin Type

  • Batch Source
  • Batch Sink 
  • Real-time Source
  • Real-time Sink
  • Action
  • Post-Run Action
  • Aggregate
  • Join
  • Spark Model
  • Spark Compute

Configurables

This section defines properties that are configurable for this plugin. 

User Facing NameTypeDescriptionConstraints
Table nameStringName of the dynamo db tableNaming convention constraints from AWS
endpoint urlStringAWS endpoint url for DynamoDb instance

Optional, could be reconstructed using regionId

region idStringAWS region id for DynamoDb instance.Optional, with default value set as us_west_2
access idpasswordAWS access key 
access keypasswordAWS access secret key 

query

String

Query to get the data

 

filterQueryStringQuery to filter the fetched data, before returning to the client(Optional)
nameMapStringComma separated list of key value pair, where key is the Attribute name place holder used in Query/FilterQuery and value to replace the placeholders(Optional)
valueMapStringComma separated list of key value pair, where key is the value place holder used in Query/FilterQuery for items to be searched and value to replace the placeholders. 

placeholderType

StringAttribute value placeholder and its type

 

readThroughputStringRead Throughput for AWS DynamoDB table to connect to in double. Default is 1(Optional)
readThroughputPercentageStringRead Throughput Percentage for AWS DynamoDB table to connect to. Default is 0.5.(Optional)
schemaString

Specifies the schema that has to be output. If not specified, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord.

 

Design / Implementation Tips

Design

Dynamo Db JSON Format:

{

  "name": "DynamoDb",

  "type": "batchsource",

    "properties": {

                  "accessKey": "xyz",

                  "secretAccessKey": "abc",

                  "regionId": "us-east-1",

                  "tableName": "Movies",

                  "query": "Id = :v_Id",

                  "filterQuery": "rating > :v_rating",

                  "valueMap": ":v_Id|120,:v_rating|18",

                  "placeholderType": ":v_Id|int,:v_rating|int"

    }

}

Approach(s)

  1. Dropdown with the list of regions will be provided to user, to select the region for AWS Dynamo DB to connect to. Supported regions are:         
    "us-gov-west-1", "us-east-1", "us-east-2", "us-west-1", "us-west-2", "eu-west-1", "eu-west-2", "eu-central-1", "ap-south-1","ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "cn-north-1", "ca-central-1".   (Referred from: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/
    http://docs.aws.amazon.com/general/latest/gr/rande.html#ddb_region)
  2.  If user does not select any region, then default region will be used, i.e. us_west_2.
  3. User will provide the query in the form of keyCondition expression through “Query” widget, that will be used to fetch the items from DynamoDb table. For example: Id = :v_id and rating > :v_rating
    Referred from : http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/datamodeling/DynamoDBQueryExpression.html#setKeyConditionExpression-java.lang.String-
  4. If there is any filter query required to fetch the data, it can be provided through “Filter Query” widget, in similar fashion the query is provided.
  5.  User will provide the actual value for the placeholders used in Query/FilterQuery through nameMappings and valueMappings widget respectively.
  6. User will provide the type for the placeholder i.e the attribute values placeholderType widget.
  7. Current implementation supports 'boolean, int, long, double, float and string' types for searching, i.e attibute value type.
  8. If user does not provide the output schema, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord.

  9. Supported schema types for output fields are: "boolean, bytes, double, float, int, long and string"
  10. Conditions:

    8.1) Table must exists in the DynamoDB, before reading the items. If not, then it will result into the runtime failure.
    8.2) Query must follow the DynamoDB rules and supported format. Any mismatch in the query will result into the runtime failure.

Properties

  • referenceName: This will be used to uniquely identify this source for lineage, annotating metadata, etc.
  • accessKey: Access key for AWS DynamoDB to connect to. (Macro Enabled)
  • secretAccessKey: Secret access key for AWS DynamoDB to connect to. (Macro Enabled)
  • regionId: The region for AWS DynamoDB to connect to. Default is us_west_2 i.e. US West (Oregon).
  • endpointUrl: The hostname and port for AWS DynamoDB instance to connect to, separated by a colon. For example,localhost:8000.
  • tableName: The DynamoDB table to read the data from.
  • query: Query to read the items from DynamoDB table. Query must include a partition key value and an equality condition and it must be specified in the following format: 'hashAttributeName = :hashval'. For example, ID = :v_id or ID = :v_id AND Title = :v_title, if sort key condition is used to read the data from table. (Macro Enabled)
  • filterQuery: Filter query to return only the desired items that satisfy the condition. All other items are discarded. It must be specified in the similar format like main query. For example, rating = :v_rating. (Macro Enabled)
  • nameMappings: List of the placeholder tokens used as the attribute name in the 'Query or FilterQuery' along with their actual attribute names. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and actual attribute names. For example, '#yr|year', if the query is like: '#yr = :yyyy'. This might be necessary if an attribute name conflicts with a DynamoDB reserved word. (Macro Enabled)
  • valueMappings: List of the placeholder tokens used as the attribute values in the 'Query or FilterQuery' along with their actual values. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and actual values. For example, ':v_id|256,:v_title|B', if the query is like: 'ID = :v_id AND Title = :v_title'. (Macro Enabled)
  • placeholderType: List of the placeholder tokens used as the attribute values in the 'Query or FilterQuery' along with their data types. This is a comma-separated list of key-value pairs, where each pair is separated by a pipe sign '|' and specifies the tokens and its type. For example, ':v_id|int,:v_title|string', if the query is like: 'ID = :v_id AND Title = :v_title'. Supported types are: 'boolean, int, long, float, double and string'. (Macro Enabled)
  • readThroughput: Read Throughput for AWS DynamoDB table to connect to, in double. Default is 1. (Macro Enabled)
  • readThroughputPercentage: Read Throughput Percentage for AWS DynamoDB table to connect to. Default is 0.5. (Macro Enabled)

  • schema: Specifies the schema that has to be output. If not specified, then by default each item will be emitted as a JSON string, in the 'body' field of the StructuredRecord.

Security

  • The AWS access keys should be a password field and macros enabled

NFR

1.This plugin should be able to read the data from DynamoDB table and emits the structured record to next stage successfully.

2.Only Performance measurement is in scope as part of NFR.

Limitation(s)

Future Work

Test Case(s)

DynamoDB batch source - query using partition key.

DynamoDB batch source - query using partition and sort key.

DynamoDB batch source - with query and filter query.

DynamoDB batch source - query with attribute name which is a DynamoDB reserve word and use of nameMappings for passing the attribute name.

DynamoDB batch source - query on 50K data

Sample Pipeline

DynamoDBSourcePipeline-cdap-data-pipeline.json

DynamoDBSource_WithSortQuery-cdap-data-pipeline.json

DynamoDBSource_WithFilterQuery-cdap-data-pipeline.json

DynamoDBSourceWithAttributeAsReservedWord-cdap-data-pipeline.json

DynamoDBSourcePipeline_With50K_copy-cdap-data-pipeline.json

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature