XML Reader

Introduction

The XML Reader plugin is a source plugin that allows users to read XML files stored on HDFS.

Use-case 

Customer XML feeds that are dropped onto HDFS. These can be small to very large XML  files. The files have to read, parsed and when used in conjunction with XML Parser they are able to extract fields. This reader will emit one XML event that is specified by a XPath to be extracted from the file.

Conditions

  • XML file(s) are not splittable

  • Streaming processing with XPath has to be implemented

Options

  • User is able to specify a directory or multiple directory paths

  • User is able to specify a pattern

  • User is able to specify an XPath for extracting an individual event

  • User is able to specify what should be done with the file(s) that has been processed -- whether it should be archived, move or deleted

    • If the file(s) has to be archived, the user specifies the target directory

    • If the file(s) has to be moved, the user specifies the target directory

    • If the file(s) has to be deleted, the system deletes the file reliably

  • Source keeps track of the file(s) are processed and user is able to specify whether the they should be reprocessed or no

Reference

  • Here is the rough implementation of XML Input format that needs to be improved

  • StAx Parser

Design

Approaches Evaluated and Challenges:

  1. StAx library supports streaming; however it does not support XPath, however XPath processing logic can be implemented with limitations.
  2. Similar library Trax supports XPath, but does not support streaming (https://docs.oracle.com/cd/E17802_01/webservices/webservices/docs/1.6/tutorial/doc/SJSXP2.html)
  3. Reading one block at a time in case of streaming implementation: In case of streaming, we would be reading certain number of bytes everytime from input XML file. How would we be able to read exact XML block from file
  4. XPathStAX library was evaluated. However it has handler and methods associated with it; which is difficult to map with CDAP method implementation. ( http://polyglotted.io/xpath-stax/ )
  5. StreamsXMLRecordReader class
  6. Apply xpath on the XML blocks between start and end tags where start / end tags can be derived from xpath expression.
    I have tried this XML file having size with more than 1.5GB. It is working as expected and there is no loss of any XML block.
    I have specified maxSplitSize and file get divided in chunks on the basis of maxSplitSize.
    Example : for xpath = "/catalog/book/price"
                         startTag = "book"
                         endTag = "book"
                    Now get the XML block between start and end tag, like below .
                        <book>
                            <author>Ralls, Kim</author>
                            <title>Midnight Rain</title>
                            <genre>Fantasy</genre>
                            <price>5.95</price>
                            <publish_date>2000-12-16</publish_date>
                           <description>A former architect battles corporate zombies,
                            an evil sorceress, and her own childhood to become queen
                            of the world.</description>
                       </book>

            apply xpath= "/book/price" on the XML block, which will derive record = "<price>5.95</price>"

    Limitations - This may fail if -
    a. XML block is larger than the max split size.
    b. XPath is too long or complicated.

          Flaw:

            This approach will fail if same startTag exist in different hierarchy.

            Example: if startTag="price" exist in Xpath "/catalog/book/price" and "/catalog/toy/price" then for XPath="/catalog/book/price" it will give invalid price node.

Approach to follow:

       On the basis of approach #6 evaluated above, will derive XML record on the basis of start and end tag specified by user and then apply xPath on it.

      XMLReader will read file(s) splits using record reader and will emit schema as "offset","fileName" and "XMLRecord",
      Where XMLRecord will be derived on the basis of start and end tag.
      XMLParser will take XMLRecords as input from XMLReader and apply xPath to it, and create an output schema. (refer to Example section for more clarity.)

      If user has selected any of the after processing action (Delete, Archive, Move) as well as selected to "Yes" for reprocessing of files,
      then user must get error message that "Please select either 'After Processing Action' or 'Reprocessing Required'"
     because,
          a. If user selected to delete file after process, then reprocessing of file not possible.
          b. If user selected to Archive / Move after process, then reprocessing of file not possible as files get moved from its original position.

Update on approach:

     As per the discussion, evaluating approach #1.

Plug-in Representation:

{
"name": "XMLReaderBatchSource",
"plugin":{
"name": "XMLReaderBatchSource",
"type": "batchsource",
"properties":{
"referenceName": "referenceName""
"path": "/source/xmls/",
"Pattern": "^catalog"
"nodePath": "/catalog/book"
"actionAfterProcess" : "Delete",
"targetFolder":"/target/", //this must be existing folder path
"reprocessingReq": "Yes",
"tableName": "table"
}
}
}

Open Questions with answers after analysis : 

  1. We need to figure out approach to achieve both streaming and support for XPath?
    Ans:- After analysing various streaming solution, we concluded that we cannot apply xPath directly to the streaming data. But we can derive XML record on the basis of start and end tag and then apply xPath over it. 
  2. How would we read one block from XML file with streaming?
    Ans:XML record between start and end tag.
  3. What would be the complexity of XPaths? Can we define the level to which XPaths can be defined? (If we can, then we may throw exception if XPath is more complex that defined level)?
    Ans: With XML record between start and end tag, can apply any complext xPath provided that xPath must have start / end tag element in it.
  4. As per approach 6, XMLReader is working as expected without streaming with a large XML file as well. So do we need to still consider file streaming?
    Ans: Hadoop RecordReader and FileInputFormat handles split very well, so that there is no loss of data while reading it. 
            Tried reading XML file with size of 1.5GB and it works as expected. It seems like no streaming required.

Sample Classes :

XML Reader will contains components -

  1. XMLReaderBatchSource - This class is XMLRecordReader plugin and of type "batchsource" 
    a. It contains static class XMLConfig which extends ReferencePluginConfig class -
        i. It will validate all the required fields like path, nodePath, actionAfterProcessing etc.
        ii. actionAfterProcessing and reprocessingReq cannot be selected at a time, it would produce conflict as reprocessing for deleted / moved / archived file cannot be done.
    c. It will set Context input with XMLInputFormat and XMLConfig values. 
    d. It will emit each XML record event created by XmlRecordReader.
  2. XMLInputFormat - 
    a. It extends FileInputFormat.
    b. It will create a record reader of type XmlRecordReader. 
  3. XmlRecordReader - This is static class inside XMLInputFormat class and will read actual XML records and emit it to batchsource.
    a. It extends RecordReader.
    b. It will read XML file and derive XML record on the basis of node path and will emit it.

XMLReader-batchsource.json

{
"metadata": {
"spec-version": "1.0"
  },
"configuration-groups": [
{
"label": "XML Reader Batch Source Properties",
"properties": [
{
"widget-type": "textbox",
"label": "Reference Name",
"name": "referenceName"
  },
{
"widget-type": "textbox",
"label": "Path",
"name": "path"
  },
{
"widget-type": "textbox",
"label": "File Pattern",
"name": "fileRegex"
  },
  {
   "widget-type": "textbox",
"label": "Node Path",
"name": "nodePath"
  },
  {
"widget-type": "select",
"label": "Action After Processing File",
"name": "actionAfterProcess",
"widget-attributes": {
"values": [
"None",
      "Delete",
"Move",
"Archive"
  ],
"default": "None"
  }
},
{
"widget-type": "textbox",
"label": "Target Folder",
"name": "targetFolder"
  },
{
"widget-type": "select",
"label": "Reprocessing Required",
"name": "reprocessingReq",
"widget-attributes": {
"values": [
"No",
"Yes"
  ],
"default": "Yes"
 }
},
{
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
 }
]

 

}
],
"outputs": [
{
"widget-type": "non-editable-schema-editor",
"schema": {
"offset": "long",
"fileName": "string",
"record": "string"
  }
}
]
}

 

Challenges:

  1. Update tracking table with processed file names
    Solution- 
    Step followed to update table
    a. Update a temporary file when nextKeyValue() method of XMLRecordReader return false (that is EOF File).
    b. Read temporary file in onRunFinish() method of XMLReaderBatchSource to update tracking table.

         Pseudo Code

Assumptions: 

  1. Multiple XML files will be processed provided that all files must have same document structure. This ensure that provided xPath will be valid for all files.
    Otherwise, no records will be emitted for files with no unmatched xPath.
    Example: Suppose there are two files below - 
                      a. catalog_1.xml
                          <catalog>
                               <book>
                                  <price>10.00</price>
                               </book>
                          </catalog>

                    b. catalog_2.xml
                          <catalog>
                               <book>
                                  <author>John</author>
                               </book>
                          </catalog>

    For xPath = "/book/price" will fail for catalog_2.xml.

Examples

Suppose XML file is present on HDFS. Details of the File(s) are entered by User and possible values are below –

  • path : Path of the XML file to be read or folder from where files are to be read. Folder path ends with “\”
  • pattern: Pattern to select specific file(s).
    Example -
    1. Use '^' to select file with name start with 'catalog', like '^catalog'.
    2. Use '$' to select file with name end with 'catalog.xml', like 'catalog.xml$'.
    3. Use '*' to select file with name contains 'catalogBook', like 'catalogBook*'.
  • nodePath: Node path to emit individual event from the schema. Example - '/book/price' to read only price under the book node
  • actionAfterProcess: Action to be taken after processing of the XML file.
    Possible actions are -
    1. Delete from the HDFS.
    2. Archived to the target location.
    3. Moved to the target location.
  • targetFolder: Target folder path if user select actionAfterProcess either ARCHIVE or MOVE.
  • reprocessingReq: Flag to know if file(s) to be reprocessed or not.
  • tableName: table to keep track of processed files.

Example: 
XMLReader Plugin (Source) :XML file will be ready by plugin from the HDFS location (details provided in config object). See below XML sample –         

<catalog>
        <book id="bk101">
            <author>Gambardella, Matthew</author>
            <title>XML Developer's Guide</title>
            <genre>Computer</genre>
            <price>

               <base>44.95</base>

                <tax>

                   <taxid>1</taxid>

                   <surcharge>10.00</surcharge>

                   <excise>10.00</excise>

              </tax>

            </price>
            <publish_date>2000-10-01</publish_date>
            <description><name>An in-depth look at creating applications
            with XML.</name></description>
     </book>
      <book id="bk102">
           <author>Ralls, Kim</author>
           <title>Midnight Rain</title>
           <genre>Fantasy</genre>
           <price>

                <base>7.99</base>

                <tax>

                  <taxid>2</taxid>

                  <surcharge>11.00</surcharge>

                  <excise>11.00</excise>

               </tax>

            </price>
            <publish_date>2000-12-16</publish_date>
            <description><name><name>A former architect battles corporate zombies,
             an evil sorceress, and her own childhood to become queen
            of the world.</name></name></description>
      </book>
</catalog>

Following are the User inputs for path and nodePath:

path : "file:///hadoop/hdfs/catalog.xml"

nodePath : "/catalog/book/price/tax"

It will emit below StructuredRecord -.

Structured Record:

offsetfileNamerecord
123catalog.xml

 <tax> <taxid>1</taxid><surcharge>10.00</surcharge><excise>10.00</excise></tax>

125catalog.xml

<tax> <taxid>2</taxid><surcharge>11.00</surcharge><excise>11.00</excise></tax>

 

Transformation: XMLParser (yet to be implemented), will apply xPathMap over the records received from XMLReader and pass it to Sink (Table, KVTable etc.)

Lets say xPathMap is :

xpathMap:{

        "id": "/tax/taxid",

         "exciseTax": "/tax/excise",

         "surchargeTax: "/tax/surcharge"

}

output schema will be -

idexciseTaxsurchargeTax
110.0010.00
211.0011.00


Sink : After the XMLParser, output can be saved to desired sink type. Below is the Table sink type:

 

TAXIDEXCISESURCHARGE
110.0010.00
211.0011.00

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature