Elasticsearch Sink

Overview

Elasticsearch is designed as an alternative to relational databases to allow for scalability and flexible querying. Elasticsearch stores objects in documents (similar to rows in traditional databases), which contain fields (analogous to columns). Unlike relational databases, however, these fields can be searchable by full-text search, where results are returned if the search terms are contained in the body of the field, not if they match the field exactly.  Data can be processed and accessed quickly through a combination of filtering on exact value fields (analyzing structured data) and searching full-text fields, which are commonly referred to as unstructured data.

Motivation

By incorporating Elasticsearch as a sink for ETLBatch adapters, users will have more flexibility in analyzing their data; moreover, through the use of products like Kibana, they can then visualize and interpret their Elasticsearch data.  Finally, implementing elasticsearch sink will give CDAP users greater flexibility in the system through which they store and manage their data.

We chose to implement elasticsearch as a batch sink to match a customer use case.

Specific use cases may include analysis of access logs or any information processing that includes full-text analysis (bodies of tweets, for example).

Requirements

Elasticsearch includes packages, such as Hadoop OutputFormat and, more generally,  elasticsearch-hadoop, for writing to elasticsearch from Hadoop. This method requires the elasticsearch server and port, as well as the index (analogous to a database in SQL) and type (which is equivalent to a table and will be discussed in more detail below).


The user can create an index (analogous to a datatable in relational databases) before running CDAP. While CDAP will create an index in the back end if the user-specified index does not exist, the user will have more control over the configuration of the index (for example, the user can then control the number of shards, or partitions, and the number of replicas of data). Moreover, the user can store analyzers in the index, which CDAP can then access (such an operation is not intended for Hadoop), as well as mappings for the data. To make these mappings most effective, the user could opt to include a list of fields for the output. This feature is not yet implemented but would then allow the user to drop unwanted fields, such as the timestamp, that are automatically passed as part of the structured record for the input event.


CDAP or the user can easily create the type. The user supplies the name of the type, so all data from an adapter will be of the same type, which logically makes sense as all data from the same source should be similarly formatted.  Note that in addition to the types, such as strings, ints, and doubles, supported by CDAP, elasticsearch also supports dates (which CDAP stores as strings), and elasticsearch will automatically infer that the field is of type date if it is a string formatted as a date.


Elasticsearch handles parallel transactions through updating the document, keeping track of the current version of the document with a version number. CDAP handles transactions by rolling back unsuccessful transactions. However, unsuccessful transactions might lead to partial documents being written. Therefore, we ask the user to specify from which field they want to derive the id name since then CDAP can simply overwrite incorrect documents with the correct version.


In summary, to create an elasticsearch sink, the user would need to specify the name of an index, which may or may not have been created, the type name, the hostname and port for the elasticsearch server, and the field to derive the document id from.

Specifications

  • The data will be written in batch, upon receipt from the source.
  • Data should be processed without any data loss, and exactly one document should exist for each entry or event in the source.
  • The user should be able to query elasticsearch while the adapter is running.
  • The ID (equivalent to the row field) can be derived from a user-specified field.
  • CDAP will create a connection to elasticsearch, write data, then close the connection without creating unnecessary nodes.
  • CDAP will only write data to elasticsearch, not query or read data stored in elasticsearch.
  • CDAP can create the type from the user-supplied name or use a pre-existing type.

Configuration Json

Sample configuration:

 

{
    "template": "ETLBatch",
    "description": "Elasticsearch Configuration",
    "config": {
        "schedule": "*/1 * * * *",
        "source":{
            "name":"Stream",
            "properties":{
                "name":"myStream",
                "duration":"1d"
            }
        },
        "sink": {
            "name": "Elasticsearch",
            "properties": {
                "es.host": "localhost:9200",
                "es.index": "index",
                "es.type": "type",
                "es.idField": "ts"
            }
        },
        "transforms": []
    }
}