Background
Orchestrating a Data Fusion pipeline from external services is a common use case. For example, Composer might start data pipelines based on data availability in GCS bucket and then wait for the pipeline to complete. The guide walks you through the APIs needed to trigger the pipeline and wait for it to complete.
Approach
Triggering of the pipeline should get the Run ID of the pipeline that is started. To get the Run ID, use the following endpoint:
POST v3/namespaces/<namespaceid>/start Request Body: ['{“appId”:”<pipelinename>”, “programType”:”workflow”, “programId”:”DataPipelineWorkflow”, "runtimeargs": { "arg1":"val1" } }']
Sample Call
Request: curl -X POST http://localhost:11015/v3/namespaces/default/start -d '[{"appId": "Test", "programType": "workflow", "programId":"DataPipelineWorkflow", "runtimeargs": { "path":"gs://mybucket/csv/data.txt" }}]' Response: [{"statusCode":200,"runId":"e6523380-a73a-11ea-a21e-acde48001122","appId":"Test","programType":"Workflow","programId":"DataPipelineWorkflow"}]
Note: Any non 200 response from the REST API should be handled by the calling program which includes 504 (Timeout errors), 4xx (User errors).
2. Once the pipeline is started, the pipeline run can be monitored by using the status endpoint:
GET v3/namespaces/<namespaceid>/apps/<pipelinename>/workflows/DataPipelineWorkflow/runs/<runid>
The status of the run will be returned in the status field in the response.
Sample call
Request: curl http://localhost:11015/v3/namespaces/default/apps/Test/workflows/DataPipelineWorkflow/runs/e6523380-a73a-11ea-a21e-acde48001122 Response: {"runid":"e6523380-a73a-11ea-a21e-acde48001122","starting":1591368200,"start":1591368209,"end":1591368220,"status":"FAILED","properties":{"runtimeArgs":"{\"logical.start.time\":\"1591368200231\"}","phase-1":"ebdb21e1-a73a-11ea-bb32-acde48001122"},"cluster":{"status":"DEPROVISIONED","end":1591368223,"numNodes":0},"profile":{"profileName":"native","namespace":"system","entity":"PROFILE"}}