HTTP Batch Source
Table of Contents
General
As a framework Apache HttpComponents HttpClient is be used, a successor of Commons HttpClient.
It seems the most widely used/supported by community framework. It is very simple to find all kind of solutions and workaround already implemented, which makes plugin development and maintenance easy. Framework has a built in support for compession, https tunneling, digest auth and lot of other functions.
Properties:
Section | Name | Description | Default | Widget | Validations |
---|---|---|---|---|---|
General | URL | The url we will request. "{pagination.index}" can be included into url to represent a changing part needed for some pagination strategies. E.g: https://my.api.com/api/v1/user?maxResults=10&name=John&pageNumber={pagination_index} | Text Box | Validate it contains protocol. | |
HTTP Method | Possible values:
| GET | Radio group | ||
Headers | Key-value map of headers | KeyValue | |||
Request Body | Text Area | No validation [1] | |||
Error Handling | HTTP Errors Handling | This is a map in which user can define which error status codes produce which results. Possible values are: RETRY, FAIL, SKIP, SEND_TO_ERROR, ALERT Example: 500: RETRY 404: SEND_TO_ERROR *: FAIL Wildcard (*) means "otherwise" or "for all other codes do ..". If the field is empty. Any status_code>=400 will yield a pipeline failure. | KeyValue Dropdown | If using SEND_TO_ERROR or SKIP or SEND_TO_ALERTS and current pagination type does not support it throw a validation error. [2] | |
Non-HTTP Error Handling | Handling of type casting and any other unhandled exceptions thrown during transformation of a record: Possible values are:
| Stop on error | Dropdown list | If using "Send to error" or "Skip on error" and current pagination type does not support it throw a validation error. [2] | |
Retry Policy | Possible values are:
| Exponential | Radio group | ||
Linear Retry Interval | The interval between retries (seconds) | 30 | Number |
| |
Max retry duration | Max seconds it takes to do retries | 600 | Number | ||
Connect Timeout | Maximum seconds to connect to server. (seconds) 0 - wait forever | 120 | Number | ||
Read Timeout | Maximum seconds to wait for data. (seconds) 0 - wait forever | 120 | Number | ||
Basic authentication | Username | Used for basic authentication. | Text Box | ||
Password | Used for basic authentication. | Password | |||
HTTP Proxy: | Proxy URL | Example: http://proxy.com:8080 Note for me: test this with https proxies. | Text Box | ||
Username | Text Box | ||||
Password | Password |
[1] Unfortunately we cannot do validation here. Even though most commonly body in API requests is a JSON for JSON APIs or an XML for XML SOAP APIs. Theoretically it can be anything.
[2] Pagination types, where next page url is on previous taken from the previous page, are the one which do not support SEND_TO_ERROR or IGNORE.
Parallelization
There are two reasons why we should not parallelize the requests:
- We don't want to DDOS the API we are using and get a flood IP ban.
When running requests on paginated API, often APIs will return an url to next page in previous one, or in response header. The process is subsequent and cannot be parallelized.
Not parallelizing will be achieved by creating a single inputSplit for RecordReader. Also to avoid DDOSing the API, we should add a configurable interval between requests for paginated requests.
Pagination
Name | Description | Default | Widget | Validations |
---|---|---|---|---|
Pagination type | Possible values are:
| None | Select |
|
Start Index | Initial value for index which replaces {pagination.index} in url. See example here | Text Box |
| |
Max Index | Max value for index which replaces {pagination.index} in url. If this is empty, plugin will load pages until no results or 404 is returned. Also plugin may stop iteration before max index is reached, if no more records. | Text Box |
| |
Index Increment | Increment value for index which replaces {pagination.index} in url. | Text Box |
| |
Next Page JSON/XML Field Path | Link to a field in JSON or an XML containing next page url. See an example here | Text Box |
| |
Next Page Token Path | Link to a field in JSON or an XML containing next page token. |
| ||
Next Page Url Parameter | For type "Token in Response Body" this is used as next page token name in added to url | Text Box |
| |
Custom Pagination Python Code | A code fragment which determines how next page url is generated and also when to finish iteration. For more info see Custom Pagination | Python code | If set and pagination type is not "Custom" fail. | |
Wait time between pages | The number of milliseconds to wait before requesting the next page. | 1000 | Number |
|
The above is a bit messy cause we cannot dynamically change the content of widget depending on pagination type. Which makes it a mix of properties for different pag_types. Is not super user-friendly for end-user. For now I will a placeholder which says which pagination type property coresponds to.
Pagination type is none
Plugin will request a single page.
Pagination via url from response header
When accessing the page the response header contains a link to next page:
Link: <http://helloworld.voog.co/admin/api/pages?page=1&q.language.id=1>; rel="first", <http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="next", # <-------- HERE IT IS <http://helloworld.voog.co/admin/api/pages?page=2&q.language.id=1>; rel="last"
When there is no next page in header, iteration is finished.
Pagination via next page url in response body
This example is from Confluence API. When querying https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5
{ "_links": { "base": "https://wiki.cask.co", "context": "", "next": "/rest/api/space/TEPHRA/content/page?limit=5&start=10", "prev": "/rest/api/space/TEPHRA/content/page?limit=5&start=0", "self": "https://wiki.cask.co/rest/api/space/TEPHRA/content/page?limit=5&start=5" }, ... actual data ... }
User will need to set "Next Page Link" to /_links/next to get the pagination working.
Pagination by page number or offset
User must add a placeholder {pagination.index} to URL. How it is replaced is controlled by three properties:
- Start Index
- Max Index
- Index Increment
WordPress API has this kind of pagination. Here's widget configurations example for it:
url = "http://localhost/wp/v2/posts?page={pagination_index}" start_index = "1" max_index = "" index_increment = "1"
Another example:
url = "http://localhost:4532/api/v3/transactions?start_time={pagination_index}" start_index = "1389075585" max_index = "" index_increment = "10000000"
The plugin stops reading when a page returns no records or 404. Or when reached max_index (if it's not empty)
Pagination by next page token
Here's an example of pagination from youtube API. NextPageToken field contains a token, which should be included in url to get next page. "&page_token=CAEQAA"
{ ... "nextPageToken": "CAEQAA", # <---------- HERE IT IS "pageInfo": { "totalResults": 208, "resultsPerPage": 1 }, "items": [ { ... } } ] }
Here's how the links chain looks like:
${url} ${url}&nextPageToken=${nextPageToken1} ${url}&nextPageToken=${nextPageToken2} ...
Custom pagination
Different APIs use very different styles of pagination. In the simple cases they return link in header or some field of response JSON.
- For example API where user wants to paginate by time in the following way: &start_time={something}&end_time={something+10000}. Two dependent variables are involved here. It would be very problematic to give ability to configure something like this via widget.
- Let's images another case. User wants to download a webserver directory. So "pages" in this case are files on webserver. Let's say he analyses/backups a whole site. So we need to paginate based on results from parsing HTML.
Let's assume another example. User wants to skip certain pages in API. Let's say the API pagination is time based, meaning something like this is appended to url "&start_time=1389075585". But he only wants to get pages for the weekends.
I would suggest adding a property with Python code which would define how the pagination is done and when the iteration should finish.
context.pages_proccessed = 0 def get_next_page_url(url, page, headers): context.next_page = url + '&nextPageToken=' + page['nextPageToken'] context.pages_proccessed += 1 context.should_finish = (context.pages_proccessed > 1_000_000)
OR
context.start_time = 1389075585 def get_next_page_url(url, page, headers): context.start_time += 10000 end_time = context.start_time + 10000 context.next_page = url + '&start_time=' + str(start_time) + '&end_time=' + str(end_time)
For this Jython is used, so user does not need to have Python installed. "Context" object is a java object exposes to Python.
Transforming API responses into Records
No automatic schema generation is implemented. Since we don't know the value types.
Properties:
Section | Name | Description | Default | Widget | Validations | |||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Format | Format | Possible values:
| Dropdown list | |||||||||||||||
JSON/XML Result Path | For JSON a simple slash separated path is used e.g. /library/books/items. For XML an XPath is used. | Text Box | Fail if used with non JSON/XML format | |||||||||||||||
JSON/XML Fields Mapping | Mapping of schema field name to jsonPath (past the result path). Example (Jira API):
Schema fields which are not in the map, will use fieldName:/fieldName mapping. |
|
1 JSON format
JSON entries are converted into StructuredRecord using StructuredRecordStringConverter.java
To specify where we should take record from user needs to specify JSON Result Path.
Example:
{ "pageInfo": { "totalResults": 208, "resultsPerPage": 2 }, "items": [ { "kind": "youtube#searchResult", "etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/yrJNwvacPS7tA7BQCQmeIZr7fg8\"", "id": { "kind": "youtube#channel", "channelId": "UCfkRcekMTa5GA2DdNKba7Jg" }, "snippet": { "publishedAt": "2015-02-12T22:12:43.000Z", "channelId": "UCfkRcekMTa5GA2DdNKba7Jg", "title": "Cask", "description": "Founded by developers for developers, Cask is an open source big data software company focused on developers. Cask's flagship offering, the Cask Data ...", "thumbnails": { ... }, "channelTitle": "Cask", "liveBroadcastContent": "upcoming" } }, { "kind": "youtube#searchResult", "etag": "\"Bdx4f4ps3xCOOo1WZ91nTLkRZ_c/uv6u8PSG0DsOqN9m77o06Jl4LnA\"", "id": { "kind": "youtube#video", "videoId": "ntOXeYecj7o" }, "snippet": { "publishedAt": "2016-12-21T19:32:03.000Z", "channelId": "UCfkRcekMTa5GA2DdNKba7Jg", "title": "Cask Product Tour", "description": "In this video, we take you on a product tour of CDAP, CDAP extensions and the Cask Market. Cask Data Application Platform (CDAP) is the first Unified ...", "thumbnails": { ... }, "channelTitle": "Cask", "liveBroadcastContent": "none" } } ] }
For above API response user can specify Result Path to be /items/snippet. In this case CDAP will take all elements in the closest list which is /items and than take snippet dictionary out of every of them and generate record with the following fields: "publishedAt": "2015-02-12T22:12:43.000Z","channelId": "UCfkRcekMTa5GA2DdNKba7Jg","title": "Cask", etc.
Another example:
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, ... ], "bicycle": { "color": "red", "price": 19.95 } }, }
Here user can specify result path to be /store/book. Which will produce records with fields like: "category": "reference","author": "Nigel Rees".
Handling unknown fields:
- Fields which are not present in schema get skipped.
- Schema fields, which are not present in JSON response, are set to null.
- If no fields from schema are found an exception is thrown. Which is handled according to "Non-HTTP Error Handling" property value.
2 XML format
We may add functionality for XML parsing to separate project so other projects can re-use that.
XML below will be used as basis for examples in this section.
<?xml version="1.0" encoding="UTF-8"?> <bookstores> <bookstore id="1"> <book category="cooking"> <title lang="en">Everyday Italian</title> <author>Giada De Laurentiis</author> <year>2005</year> <price> <value>15.0</value> <policy>Discount up to 50%</policy> </price> </book> <book category="web"> <title lang="en">XQuery Kick Start</title> <author>James McGovern</author> <author>Per Bothner</author> <year>2003</year> <price> <value>49.99</value> <policy>No discount</policy> </price> </book> ... </bookstore> <bookstore id="2"> ... </bookstore> </bookstores>
Parsing XML includes below steps:
- Get nodes from XML by result XPath (provided by user). For example /bookstores/bookstore/book[@category='web'].
- Convert resulting nodes to JSON (using org.json lib).
- Generate StructuredRecords from JSON.
2.1 STEP 1 - Get XML by XPath
XML parsing is done by default Java DOM parser. Which is able to get items by a specified XPath. XPath is super flexible it allows user to get nodes by attribute value, as well as to group nodes from different parents into single result, as well as chose nodes conditionally etc. etc.
Some XPath examples:
/bookstores/bookstore/book[position()<3] //title[@lang] //title[@lang='en'] /bookstores/bookstore/book/price[text()] # convert all subelements to string /bookstores/bookstore/book[price>35.00]/title
2.2 STEP 2 - Convert XML to JSON
Converting XML to StructuredRecord is non-trivial, we should:
- not lose information like attributes (id, category, lang from example)
- handle name collisions between tags. XML can have multiple tags with the same name in the same node.
- handle name collisions between tags and attributes, when tags are converted to record fields.
Org.json library can convert XML to JSON, which is just what we need. Below is a conversion result obtained by using org.json library.
{"bookstores": {"bookstore": [ { "book": [ { "year": 2005, "author": "Giada De Laurentiis", "price": { "value": 15, "policy": "Discount up to 50%" }, "category": "cooking", "title": { "lang": "en", "content": "Everyday Italian" } }, { "year": 2003, "author": [ "James McGovern", "Per Bothner" ], "price": { "value": 49.99, "policy": "No discount" }, "category": "web", "title": { "lang": "en", "content": "XQuery Kick Start" } }, ... ], "id": 1 }, { ... "id": 2 } ]}}
On a side note: look at author fields, they are of different type in the above JSON. This will be handled. If schema has field type = union, and there is a value not a list in place, we consider it as a list with a single element.
2.3 STEP 3 - Generate StructuredRecord from JSON
Converting JSON into Structured?ecord is a simple task. We do this via StructuredRecordStringConverter.java. Example:
{ "year": 2003, "author": [ "James McGovern", "Per Bothner" ], "price": { "value": 49.99, "policy": "No discount" }, "category": "web", "title": { "lang": "en", "content": "XQuery Kick Start" } }
will yield records compatible with below schema:
year: string author: array price: record - value:double - policy:string category: string title: record - lang:string - content:string
3 Delimited format
Will use the functionality from cdap-formats/DelimitedStringsRecordFormat.java to validate schema and convert input TCS/CSV to StructuredRecord. The class supports a columns mapping as last field of schema.
4 Text format
Will use the functionality from cdap-formats/TextRecordFormat.java to validate schema and convert input text to StructuredRecord.
OAuth2
Moved design information into a separate doc: Plugin OAuth2 Common Module
Properties:
Name | Description | Default | Widget | Validations |
---|---|---|---|---|
OAuth2 Enabled | True or false. | false | Radio group | |
Auth URL | A page, where the user is directed to enter his credentials. Example: https://www.facebook.com/dialog/oauth | Text Box | Assert to be empty if OAuth2 is disabled and the not empty if enabled. | |
Token URL | A page, where CDAP can exchange authCode for accessToken and refreshToken. Or refresh the accessToken. | Text Box | Assert to be empty if OAuth2 is disabled and the not empty if enabled. | |
Client ID | User should obtain this when registering the OAuth2 application in the service (e.g. Twitter). | Text Box | Assert to be empty if OAuth2 is disabled and the not empty if enabled. | |
Client Secret | User should obtain this when registering the OAuth2 application in the service (e.g. Twitter). | Password | Assert to be empty if OAuth2 is disabled and the not empty if enabled. | |
Scope | This is optional. Scope is a mechanism in OAuth 2.0 to limit an application's access to a user's account. An application can request one or more scopes, this information is then presented to the user in the consent screen, and the access token issued to the application will be limited to the scopes granted. | Text Box | Assert to be empty if OAuth2 is disabled. | |
Refresh Token | This is populated by the button "Login via OAuth 2.0". Since we save Refresh Token (not an access token which is short lived), this should be done only once, during initial pipeline deployment. For more information click here. UI should put an actual value into secure store and put macro function ${secure(key)} a value for extra safety. | Fail is empty and OAuth2 is enabled. |
SSL/TLS
Some general definitions for more context:
KeyStore file - contains a private key of client (in our case), which will be used for client-server SSL communication.
TrustStore file - stores certificate chain from trusted CAs. This allows to validate server's identity during SSL handshake.
Other things which are customized are Cipher Suites and TLS version. These things are negotiated between server and client during SSL handshake. By providing these we can set what the client sends as "available" for usage, than server will choose from those.
Should we provide an option for user to skip identity check during HTTPs connection? This is not recommended anywhere you read about it, but it might be useful in case user is testing some API which is in development stage.
Name | Description | Default | Widget | Validations |
---|---|---|---|---|
Verify HTTPs Trust Certificates | If false will allow connection to untrusted https sources. | true | ||
Keystore File | Path to a keystore file | Text Box | Check if file exists | |
Keystore Type | According to Oracle docs. There are 3 supported keystore types. Possible values:
| JKS | Radio Group | |
Keystore Password | Leave empty if keystore is not password protected | Password | Try to load keystore with given password | |
Keystore Key Algorithm | SunX509 is default in Java. | SunX509 | Text Box | |
TrustStore File | Path to a truststore file. If empty use default Java truststores. | Text Box | Check if file exists | |
TrustStore Type | According to Oracle docs. There are 3 supported truststore types. Possible values:
| JKS | Radio Group | |
TrustStore Password | Leave empty if keystore is not password protected | Password | Try to load truststore with given password | |
Truststore Trust Algorithm | SunX509 | Text Box | ||
Transport Protocols | User can add multiple protocols. Which will be offered by client during handshake. | TLSv1.2 | Array | Validate if names are correct |
Cipher Suites | User can add multiple cipher suites. They will be offered by client during handshake. If empty use default cipher suites. This is textBox with comma separated list of ciphers. Since sometimes there can be 20, 30 or more ciphers it is not usable for user to add every one of them manually into an array. | Text Box | Validate if supported by current java implementation |