Processing error records
CDAP has the ability to process error records in data while wrangling your data and it provides a way to process dirty records as well as process the valid ones and move data across your ETL pipeline.
In this how-to article, we will see how to clean up data using CDAP and in the process filter out erroneous data for further investigation.
I already have csv data about housing or property purchase in the Bay area that I would clean and filter out erroneous records. This helps me to process files using CDAP once and create a pipeline to automate this process for large amounts of data and in the process clean up records and remove erroneous records for further analysis in CDAP.
For this demo I have my data in a GCS bucket and I wish to clean it up and load it in Big Query to process further for analysis. For information about required permissions while creating a project in GCP for Data Fusion, please refer here (Link to IAM permissions and gce-enforcer).
Instructions
Read data from GCS
Cleanse Data: We parse the data as csv and clean up and change field types for further analysis. As a last step, we send all the records where the lot_size has erroneous values (decimal values like 0.32 which is not right) to error port.
parse-as-csv :body '\t' true
drop body
send-to-error empty(Stories)
drop State
drop Zip
drop Street Address
find-and-replace Price s/,//g
set-type :Price long
drop Street Address
cleanse-column-names
drop street_address
find-and-replace lot_size s/,//g
send-to-error lot_size =~ ".*\..*"
You can copy-paste the entire set of directives on the terminal-like box at the bottom of Wrangler.
Operationalize using pipelines: Once the directives are applied, we can now create a pipeline to automate this process.
State of Pipeline: We have our GCS bucket as source. Transform data via Wrangler which emits records either in an output port or in an error port.
We have BigQuery tables: one for success records and the other for error records.
Pipeline preview: The pipeline shows some error records that we have in our source.
Error records emitted by Wrangler:
Deploy & run pipeline: Once we verify our data in preview, we can go ahead and deploy the pipeline and run it.
Verifying pipeline run with metrics: Once we run our pipeline, we can see the corresponding metrics emitted by each node (including output and error records).
Validating data in Big Query: We should now be able to see our data in BigQuery, both in success_records and error_records tables.
Once we have our erroneous records, we can further process them, either cleanse again or record for malformed records for further analysis. We demoed this using the Wrangler transform, however any transform that emits error records can be used in filtering erroneous records in your ETL workflow.