Dynamic pipelines

This guide shows you how to build a reusable Cloud Data Fusion pipeline in which you dynamically substitute plugin configurations to be served by an HTTP server.

Instructions

Reusable pipelines have a regular pipeline structure, however, the configuration of each node can be changed based on configurations provided by an HTTP server. For example, a pipeline that reads data from GCS, applies transformations, and writes to BigQuery, has a static pipeline structure. However, depending on which GCS file was read, transformations and the BigQuery table may vary.

Prerequisites

Before starting to build a reusable Cloud Data Fusion pipeline, construct an accessible HTTP endpoint that can serve below content.

{ "arguments" : [ { "name" : "input.path", "type" : "string", "value" : "gs://argument_setter/persons_data.csv" }, { "name" : "parser", "type" : "array", "value" : [ "parse-as-csv :body ',' true", "drop body", "set-type :id integer" ] }, { "name" : "output.schema", "type" : "schema", "value" : [ { "name" : "id", "type" : "int", "nullable" : true}, { "name" : "first_name", "type" : "string", "nullable" : true}, { "name" : "last_name", "type" : "string", "nullable" : true}, { "name" : "email", "type" : "string", "nullable" : true}, { "name" : "address", "type" : "string", "nullable" : true} ] }, { "name" : "bq.dataset", "type" : "string", "value" : "cdf_dataset"}, { "name" : "bq.table", "type" : "string", "value" : "cdf_persons_table"} ] }

To know more about how the contents of this file maps to the pipeline, please take a look at Argument Setter plugin.

For this how-to guide, we will use following persons data file.

Deploy Argument Setter Plugin from Hub

1. Click on Hub.

2. Click on Argument Setter Action Plugin and Deploy it.

3. Once the plugin is deployed, it will show up under Conditions and Actions plugins on Pipeline Studio.

Build Reusable pipeline

1. Click on deployed HTTP Argument Setter plugin in Pipeline Studio.

2. Add a macro named http.url in URL field of the plugin configuration.

3. Click on Source and click on Google Cloud Storage and connect HTTP Argument Setter with Google Cloud Storage.

4. Configure Google Cloud Storage plugin as shown below.

5. Click on Transforms and on Wrangler and connect it to GCSSource as shown below.

6. Configure the Wrangler plugin as below. To update OutputSchema, click on Actions > Macro.

7. Click on Sink and click on the BigQuery plugin and connect the BigQuery plugin with the Wrangler transform as shown below.

8. Configure the BigQuery plugin as shown below. To update OutputSchema, click on Actions > Macro.

9. Deploy the pipeline.

9. When you click on the Run button, click on all the runtime arguments as provided except, http.url. Provide the http url endpoint for “http.url“ runtime argument and Run the pipeline.

Once the above pipeline succeeds, you will see a new dataset named cdf_dataset and a table named cdf_persons_table created in BigQuery with data from persons_data.csv file.