Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Orchestrating a Data Fusion pipeline from external services is a common use case. For example, Data pipelines might be started Composer might start data pipelines based on data availability in GCS bucket by Composer and then wait for the pipeline to complete. The guide This article walks you through the APIs needed to trigger the pipeline and wait for completion, monitor the pipeline run, and get the pipeline status when it completes.

Approach

  1. Triggering of the pipeline should get the run id Run ID of the pipeline that is started. Inorder to To get the RunId Run ID, use the following endpoint should be used :

Code Block
POST v3/namespaces/<namespaceid>/start 

Request Body: ['{“appId”:”<pipelinename>”, “programType”:”workflow”, 
                  “programId”:”DataPipelineWorkflow”, "runtimeargs": { "arg1":"val1" } }']

Sample Call

Code Block
Request: curl -X POST http://localhost:11015/v3/namespaces/default/start 
-d '[{"appId": "Test", "programType": "workflow", "programId":"DataPipelineWorkflow", "runtimeargs": { "path":"gs://mybucket/csv/data.txt" }}]'

Response: [{"statusCode":200,"runId":"e6523380-a73a-11ea-a21e-acde48001122","appId":"Test","programType":"Workflow","programId":"DataPipelineWorkflow"}]

Info

Note: Any non 200 response from the REST API should be handled by the calling program which includes 504 (Timeout errors), 4xx (User errors).

2. Once the pipeline is started, the pipeline run can be monitored by using the status endpoint:

Code Block
GET v3/namespaces/<namespaceid>/apps/<pipelinename>/workflows/DataPipelineWorkflow/runs/<runid>

The status of the run will be returned in the status field in the response.

Sample call

Code Block
Request: curl http://localhost:11015/v3/namespaces/default/apps/Test/workflows/DataPipelineWorkflow/runs/e6523380-a73a-11ea-a21e-acde48001122

Response: {"runid":"e6523380-a73a-11ea-a21e-acde48001122","starting":1591368200,"start":1591368209,"end":1591368220,"status":"FAILED","properties":{"runtimeArgs":"{\"logical.start.time\":\"1591368200231\"}","phase-1":"ebdb21e1-a73a-11ea-bb32-acde48001122"},"cluster":{"status":"DEPROVISIONED","end":1591368223,"numNodes":0},"profile":{"profileName":"native","namespace":"system","entity":"PROFILE"}}