...
To start the preview for an application:
Request Method and EndpointCode Block POST /v3/namespaces/{namespace-id}/preview where namespace-id is the name of the namespace Response will contain the CDAP generated unique preview-id which can be used further to get the preview data.
Request body will contain the application configuration along with few additional configs for the preview section.
Code Block { "artifact":{ "name":"cdap-data-pipeline", "version":"3.5.0-SNAPSHOT", "scope":"SYSTEM" }, "name":"MyPipeline", "config":{ "connections":[ { "from":"FTP", "to":"CSVParser" }, { "from":"CSVParser", "to":"Table" } ], "stages":[ { "name":"FTP", "plugin":{ "name":"FTP", "type":"batchsource", "label":"FTP", "artifact":{ "name":"core-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "referenceName":"myfile", "path":"/tmp/myfile" } }, "outputSchema":"{\"fields\":[{\"name\":\"offset\",\"type\":\"long\"},{\"name\":\"body\",\"type\":\"string\"}]}" }, { "name":"MyCSVParser", "plugin":{ "name":"CSVParser", "type":"transform", "label":"CSVParser", "artifact":{ "name":"transform-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "format":"DEFAULT", "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "field":"body" } }, "outputSchema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}" }, { "name":"MyTable", "plugin":{ "name":"Table", "type":"batchsink", "label":"Table", "artifact":{ "name":"core-plugins", "version":"1.4.0-SNAPSHOT", "scope":"SYSTEM" }, "properties":{ "schema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "name":"mytable", "schema.row.field":"id" } }, "outputSchema":"{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}", "inputSchema":[ { "name":"id", "type":"int", "nullable":false }, { "name":"name", "type":"string", "nullable":false } ] } ], "preview": { "startStages": ["MyCSVParser"], "endStages": ["MyTable"], "useSinks": ["MyTable"], "outputs": { "FTP": { "numRecords": 10, "data": [ {"offset": 1, "body": "100,bob"}, {"offset": 2, "body": "200,rob"}, {"offset": 3, "body": "300,tom"} ], } "schema": { } }"type" : "record", "fields": [ {"name":"offset","type":"long"}, } } {"name":"body","type":"string"} ] } } } } } }
a. Simple pipeline
Code Block Consider simple pipeline represented by following connections. (FTP)-------->(CSV Parser)-------->(Table) CASE 1: To preview the entire pipeline: "preview": { "startStages": ["FTP"], "endStages": ["Table"], "useSinks": ["Table"], "numRecords": 10, "programName": "SmartWorkflow", // The program to "outputs": {be previewed "FTP "programType": { "numRecords": 10, } } "workflow" // The program type to be previewed } CASE 2: To preview section of the pipeline: (CSV Parser)-------->(Table) "preview": { "startStages": ["CSVParser"], "endStages": ["Table"], "useSinks": ["Table"], "outputs": { "FTP": { "data": [ {"offset": 1, "body": "100,bob"}, {"offset": 2, "body": "200,rob"}, {"offset": 3, "body": "300,tom"} ], }"schema": { } }"type" CASE 3: To preview only single stage (CSV Parser) in the pipeline: "preview": { "startStages": ["CSV Parser"], "endStages": ["CSV Parser"], "outputs": {: "record", "fields": [ {"name":"offset","type":"long"}, "FTP": { "data": [ {"name":"body","type":"string"} ] } } } } CASE 3: To preview only single stage (CSV Parser) in the pipeline: "preview": { {"offset": 1, "bodystartStages": "100,bob"},["CSV Parser"], "endStages": ["CSV Parser"], "outputs": { "FTP": { "data": [ {"offset": 21, "body": "200100,robbob"}, {"offset": 32, "body": "300200,tomrob"}, ] } } } CASE 4{"offset": To verify if records are read correctly from FTP3, "body": "preview": { 300,tom"} "startStages": ["FTP"], "endStages": ["FTP"], ], "outputs "schema": { "FTPtype" : { "record", "numOfRecordsfields": 10[ } } } CASE 5: To verify the data is getting written to Table properly: "preview": { "startStages": ["Table"], {"name":"offset","type":"long"}, {"endStagesname": ["Tablebody"], "useSinks": ["Table"], "outputs": { "type":"string"} "CSV Parser": { "data": [ ] {"id": 100, "name": "bob"}, {"id": 200, "name": "rob"}, } {"id": 300, "name": "tom"} ] } } }
b. Fork in the pipeline (multiple sinks)
Code Block Consider the following pipeline: (S3 Source) --------->(Log Parser)--------->(Group By Aggregator)--------->(Python Evaluator)--------->(Aggregated Result) } CASE 4: To verify if records are read correctly from FTP: "preview": { "startStages": ["FTP"], "endStages": ["FTP"], "numOfRecords": 10 } CASE 5: To verify the data is getting written to Table properly: "preview": { | | --------->(Javascript Transform)--------->(Raw Data) CASE 1"startStages": To preview entire pipeline "preview": {["Table"], "endStages": ["Table"], "startStagesuseSinks": ["S3 Source"], "endStages": ["Aggregated Result", "Raw DataTable"], "useSinksoutputs": ["Aggregated Result", "Raw Data"], // useSinks seem redundant as endStages is there which can control till what point the pipeline need to run { "CSV Parser": { "data": [ {"outputsid": { "S31, "name": { "numOfRecords": 10 } } } CASE 2: To mock the source "preview": {"bob"}, "startStages": ["Log Parser", "Javascript Transform"], {"id": 2, "endStagesname": ["Aggregated Resultrob"}, "Raw Data"], "useSinks": ["Aggregated Result", "Raw Data"], {"id": 3, "name": "tom"} ], "outputs "schema": { "S3type" : { "record", "datafields": [ "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", {"name":"id","type":"long"}, "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", {"name":"name","type":"string"} "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" ] ] } } } } CASE 3: To preview the section of the pipeline
b. Fork in the pipeline (multiple sinks)
Code Block Consider the following pipeline: (S3 Source) --------->(Log Parser)--------->(Group By Aggregator)--------->(Python Evaluator) "preview": {--------->(Aggregated Result) | | --------->(Javascript Transform)--------->(Raw Data) CASE 1: To preview entire pipeline "preview": { "startStages": ["LogS3 ParserSource"], "endStages": ["Aggregated Result", "Raw Data"], "useSinks": ["Aggregated Result", "Raw Data"], // useSinks seem redundant as endStages is there which can control till what point the "outputs": {pipeline need to run "S3 "numOfRecords": {10 "data": [ "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" ] } } } CASE 4: To preview the single stage Python Evaluator "preview": { "startStages": ["Python Evaluator"], "endStages": ["Python Evaluator"],} CASE 2: To mock the source "preview": { "startStages": ["Log Parser", "Javascript Transform"], "endStages": ["Aggregated Result", "Raw Data"], "useSinks": ["Aggregated Result", "Raw Data"], "outputs": { "S3 Source": { "data": [ "127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" "outputs": { ], "Group By Aggregatorschema": { "datatype" : ["record", { "ipfields":"127.0.0.1", "counts":3}, [ {"ipname":"127.0.0.2log_line", "countstype":4"string"}, {"ip":"127.0.0.3", "counts":5}, ] {"ip":"127.0.0.4", "counts":6}, ] } } } } CASE 3: To preview
c. Join in the pipeline (multiple sources)
Code Block Considerthe section of the following pipeline: (DatabaseLog Parser)--------->(PythonGroup By EvaluatorAggregator)--------->(Python Evaluator) "preview": { | |------------>(Join)-------->(Projection)------->(HBase Sink) | "startStages": ["Log Parser"], (FTP)--------->(CSV Parser)---------> CASE 1: To preview entire pipeline "preview": { "endStages": ["Aggregated Result"], "startStages": ["Database", "FTP"], "endStages": ["HBase Sink"], "useSinks": ["HBaseAggregated SinkResult"], "outputs": { "DatabaseS3 Source": { "numOfRecordsdata": 10 [ }, "FTP": { "numOfRecords": 20 127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326", } } } CASE 2: To mock both sources "preview": { "127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326", "127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326" "startStages": ["Python Evaluator", "CSV Parser"], "endStages": ["HBase Sink"], "useSinks": ["HBase Sink"], "outputs": { ], "Databaseschema": { "datatype" : ["record", { "namefields":"tom", "counts":3}, [ {"name":"boblog_line", "countstype":4"string"}, {"name":"rob", "counts":5}, ] {"name":"milo", "counts":6}, } } ] } }, "FTP CASE 4: To preview the single stage Python Evaluator "preview": { "data "startStages": ["Python Evaluator"], {"offset":1, "body":"tom,100"}, "endStages": ["Python Evaluator"], "outputs": { "Group By Aggregator": { {"offsetdata":2, "body":"bob,200"}, [ {"offsetip":3"127.0.0.1", "bodycounts":"rob,300"3}, {"offsetip":4"127.0.0.2", "bodycounts":"milo,400"4}, ] }{"ip":"127.0.0.3", "counts":5}, } } CASE 3: To preview JOIN transform only "preview": { "startStages": ["JOIN"]{"ip":"127.0.0.4", "counts":6}, "endStages ], "schema": ["JOIN"], "outputs": { { "CSVtype" Parser": { "record", "datafields": [ {"name":"tomip", "idtype":3"string"}, {"name":"bobcounts", "idtype":4"long"}, {"name":"rob", "id":5}, ] {"name":"milo", "id":6}, } ] } }, "Python Evaluator": {
c. Join in the pipeline (multiple sources)
Code Block Consider the following pipeline: (Database)--------->(Python Evaluator)---------> "data": [ | {"id":1, "last_name":"hardy"}, |------------>(Join)-------->(Projection)------->(HBase Sink) {"id":2, "last_name":"miller"}, {"id":3, "last_name":"brosnan"}, {"id":4, "last_name":"yellow"}, ] } } }
d. In order to preview either single transform or sink, it should have at least one incoming connection.
How to specify the input data: User can specify the input data for preview by inserting the data directly in table format in UI or can upload a file containing the records.Code Block Consider the pipeline containing only one transform which has no connections yet- (Javascript Transform) Preview for this would fail since there is no incoming connection provided for the transform. Pipeline can be modified to add incoming connection as (CSV Parser)------------>(Javascript Transform) Now the preview configurations can be provided as "preview": { "startStages": ["Javascript Transform"]| (FTP)--------->(CSV Parser)---------> CASE 1: To preview entire pipeline "preview": { "startStages": ["Database", "FTP"], "endStages": ["HBase Sink"], "useSinks": ["HBase Sink"], "numOfRecords": 10 } CASE 2: To mock both sources "preview": { "startStages": ["Python Evaluator", "CSV Parser"], "endStages": ["HBase Sink"], "endStagesuseSinks": ["JavascriptHBase TransformSink"], "outputs": { "CSV ParserDatabase": { "data": [ {"name":"tom", "idcounts":3}, {"name":"bob", "idcounts":4}, {"name":"rob", "idcounts":5}, {"name":"milo", "idcounts":6}, ], } "schema": { } }"type" : Note that we cannot solve this problem by having the different preview configuration property as "inputs" for the single stage when no connections are specified. How will this work if user just drops JOIN transform on the UI? We will not know in advance how many input connections the JOIN takes.
When the data is inserted in Table format, UI will convert the data into appropriate JSON records.
When user decides to upload a file, he can upload the JSON file conforming to the schema of the next stage. Ideally we should allow uploading the CSV file as well, however how to interpret the data will be plugin dependent. For example consider the list of CSV records. Now for CSVParser plugin, the entire record will be treated as body however for Table sink, we will have to split the record based on comma to create multiple fields as specified by the next stage's input schema.
Once the preview is started, the unique preview-id will be generated for it. The runtime information (<preview-id, STATUS) for the preview will be generated and will be stored (in-memory or disk).
Once the preview execution is complete, its runtime information will be updated with the status of the preview (COMPLETED or FAILED).To get the status of the preview
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/apps/{app-id}/previews/{preview-id}/status where namespace-id is the name of the namespace app-id is the name of the application for which preview data is to be requested preview-id is the id of the preview for which status is to be requested
Response body will contain JSON encoded preview status and optional message if the preview failed.
Code Block 1. If preview is RUNNING { "status": "RUNNING" } 2. If preview is COMPLETED { "status": "COMPLETED" } 3. If preview FAILED { "status": "FAILED" "errorMessage": "Preview failure root cause message." }
To get the preview data for stage:
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/apps/{app-id}/previews/{preview-id}/stages/{stage-name} where namespace-id is the name of the namespace app-id is the name of the application for which preview data is to be requested preview-id is the id of the preview for which data is to be requested stage-name is the unique name used to identify the stage
Response body will contain JSON encoded input data and output data for the stage as well as input and output schema.
Code Block { "inputData": [ "record", "fields": [ {"name":"name","type":"string"}, {"name":"counts","type":"long"} ] } }, "FTP": { "data": [ {"offset":1, "body":"tom,100"}, {"offset":2, "body":"bob,200"}, {"offset":3, "body":"rob,300"}, {"offset":4, "body":"milo,400"} ], "schema": { "fields": [ {"name":"name","type":"string"}, {"name":"offset","type":"long"} ] } } } } CASE 3: To preview JOIN transform only "preview": { "startStages": ["JOIN"], "endStages": ["JOIN"], "outputs": { "Python Evaluator": { "data": [ {"name":"tom", "counts":3}, {"name":"bob", "counts":4}, {"name":"rob", "counts":5}, {"name":"milo", "counts":6} ], "schema": { "type" : "record", "fields": [ {"name":"name","type":"string"}, {"name":"counts","type":"long"} ] } }, "CSV Parser": { "data": [ {"offset":1, "body":"tom,100"}, {"offset":2, "body":"bob,200"}, {"offset":3, "body":"rob,300"}, {"offset":4, "body":"milo,400"} ], "schema": { "fields": [ {"name":"name","type":"string"}, {"name":"offset","type":"long"} ] } } } }
d. Preview for a single stage (TBD)
Code Block Consider the pipeline containing only one transform which has no connections yet- (Javascript Transform) The preview configurations can be provided as "preview": { "startStages": ["Javascript Transform"], "endStages": ["Javascript Transform"], "outputs": { "MockSource": { "data": [ {"name":"tom", "id":3}, {"name":"bob", "id":4}, {"name":"rob", "id":5}, {"name":"milo", "id":6} ], "schema": { "type" : "record", "fields": [ {"name":"name","type":"string"}, {"name":"id","type":"long"} ] } } } }
- How to specify the input data: User can specify the input data for preview by inserting the data directly in table format in UI or can upload a file containing the records.
When the data is inserted in Table format, UI will convert the data into appropriate JSON records.
When user decides to upload a file, he can upload the JSON file conforming to the schema of the next stage. Ideally we should allow uploading the CSV file as well, however how to interpret the data will be plugin dependent. For example consider the list of CSV records. Now for CSVParser plugin, the entire record will be treated as body however for Table sink, we will have to split the record based on comma to create multiple fields as specified by the next stage's input schema. - Once the preview is started, the unique preview-id will be generated for it. The runtime information (<preview-id, STATUS) for the preview will be generated and will be stored (in-memory or disk).
- Once the preview execution is complete, its runtime information will be updated with the status of the preview (COMPLETED or FAILED).
To get the status of the preview
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/previews/{preview-id}/status where namespace-id is the name of the namespace preview-id is the id of the preview for which status is to be requested
Response body will contain JSON encoded preview status and optional message if the preview failed.
Code Block 1. If preview is RUNNING { "status": "RUNNING" } 2. If preview is COMPLETED { "status": "COMPLETED" } 3. If preview application deployment FAILED { "status": "DEPLOY_FAILED", "failureMessage": "Exception message explaining the failure" } 4. If preview application FAILED during execution of the stages { "status": "RUNTIME_FAILED", "failureMessage": "Failure message" /* "stages": { [ "stage_1": { "numOfInputRecords": 10, "numOfOutputRecords": 10 }, "stage_2": { "numOfInputRecords": 10, "numOfOutputRecords": 7 }, "stage_3": { "numOfInputRecords": 7, "numOfOutputRecords": 4, "errorMessage": "Failure reason for the stage" } ] } */ }
To get the preview data for stage:
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/previews/{preview-id}/stages/{stage-id} where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested stage-id is the unique name used to identify the emitter
Response body will contain JSON encoded input data and output data for the emitter as well as input and output schema.
Code Block { "inputData": [ {"first_name": "rob", "zipcode": 95131}, {"first_name": "bob", "zipcode": 95054}, {"first_name": "tom", "zipcode": 94306} ], "outputData":[ {"name": "rob", "zipcode": 95131, "age": 21}, {"name": "bob", "zipcode": 95054, "age": 22}, {"name": "tom", "zipcode": 94306, "age": 23} ], "inputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"first_name", "type":"string"}, {"name":"zipcode", "type":"int"} ] }, "outputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"name", "type":"string"}, {"name":"zipcode", "type":"int"}, {"name":"age", "type":"int"} ] }, "errorRecordSchema": { "type":"record", "name":"schemaBody", "fields":[ {"name":"errCode", "type":"int"}, {"name":"errMsg", "type":"String"}, {"name":"invalidRecord", "type":"String"} ] }, "errorRecords": [ { "errCode":12, "errMsg":"Invalid record", "invalidRecord":"{\"offset\":3,\"body\":\"This error record is not comma separated!\"}" } ], // Logs per stage may not make sense will need to think more about it. "logs" : { "stage level logs" } }
To get the logs/metrics for the preview:
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/previews/{preview-id}/logs where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested logs end point return the entire preview logs. Sample response for the logs endpoint. Note that it is similar to the regular app. [ { "log":"This is sample log - 0", "offset":"0.1466470037680" }, { "log":"This is sample log - 1", "offset":"1.1466470037680" }, { "log":"This is sample log - 2", "offset":"2.1466470037680" }, { "log":"This is sample log - 3", "offset":"3.1466470037680" }, { "log":"This is sample log - 4", "offset":"4.1466470037680" } ] GET /v3/namespaces/{namespace-id}/previews/{preview-id}/metrics where namespace-id is the name of the namespace preview-id is the id of the preview for which data is to be requested Sample response for the metrics. Note that it is similar to the regular app. { "endTime": 1466469538, "resolution": "2147483647s", "series": [ { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Projection.records.out" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Projection.records.in" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, {"first_namemetricName": "rob", "zipcode": 95131},user.Stream.records.out" }, {"first_name": "bob", "zipcode": 95054}, "data": [ {"first_name": "tom", "zipcode": 94306} { ], "outputData":[ {"name": "rob", "zipcode": 95131, "agetime": 21}0, {"namevalue": "bob", "zipcode": 95054, "age": 22}, 4 } {"name": "tom", "zipcode": 94306, "age": 23} ], "inputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"first_name", "type":"string"}, {"name":"zipcode", "type":"int"} ] }, "outputSchema": { "type":"record", "name":"etlSchemaBody", "fields":[ {"name":"name", "type":"string"}, {"name":"zipcode", "type":"int"}, {"name":"age", "type":"int"} ] } }
To get the logs/metrics for the preview:
Request Method and EndpointCode Block GET /v3/namespaces/{namespace-id}/apps/{app-id}/previews/{preview-id}/logs GET /v3/namespaces/{namespace-id}/apps/{app-id}/previews/{preview-id}/metric where namespace-id is the name of the namespace app-id is the name of the application for which preview data is to be requested preview-id is the id of the preview for which data is to be requested ], "grouping": {}, "metricName": "user.JavaScript.records.in" }, { "data": [ { "time": 0, "value": 4 } ], "grouping": {}, "metricName": "user.Stream.records.in" } ], "startTime": 0 }
Response would be similar to the regular app.
Open Questions:
- How to make it easy for the user to upload the CSV file?
- Lookup data is the user dataset. Should we allow mocking of the look up dataset as well?