Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 34 Next »

Goals

Checklist

  • User stories documented (Albert/Vinisha)
  • User stories reviewed (Nitin)
  • Design documented (Albert/Vinisha)
  • Design reviewed (Terence/Andreas)
  • Feature merged ()
  • Examples and guides ()
  • Integration tests () 
  • Documentation for feature ()
  • Blog post

Terminology


Pipeline Developer - Somebody who create pipelines through the UI or REST API. May be a business analyst, researcher, plugin developer, etc.

Stage - a node in the pipeline. Corresponds to a plugin instance.

Phase - one or more stages in the pipeline. Corresponds to a node in the physical workflow.

Validation - static checks for plugin configuration

Debug - actually running plugin code on example input

Use Cases

  1. A pipeline developer is configuring a JavaScript transform and wants to make sure the logic is correct.  The developer fills in all the configuration settings, provides some input, and wants to see the corresponding output, or any errors that happened.
  2. A pipeline developer has configured several stages and connected them together. The developer wants to see the input and output (or errors) at each stage.
  3. A pipeline developer is using the database plugin and does not know if the jdbc connection string given is correct. Syntax might wrong, or maybe the host or port or credentials are wrong, and the developer wants to check these things before publishing and running the pipeline.

User Stories

  1. As a pipeline developer, I want to be able to see what a pipeline stage would output given the config for the stage and input to the stage
  2. As a pipeline developer, I want to be able to debug a single stage by providing config and input to see the output
  3. As a pipeline developer, I want to be able to see a message describing the cause of any errors encountered while debugging 
  4. As a pipeline developer, I want to be able to validate specific fields of a plugin, so that I can debug without publishing the pipeline
  5. As a pipeline developer, I want to be able to manually provide the input records for my pipeline while debugging

Approach (WIP)

  1. Preview application deployment:

    Application deployment stepsChanges to the Persistent storeRegular ApplicationPreview Application (Proposed)
    REST Endpoint PUT "/apps/{app-id}"PUT "/apps/{app-id}/preview"
    Application Id used As specified by app-idGenerate new application id with name as app_id_preview. Note that user application may have _preview as suffix, however we will still append it with _preview.
    Enforce that the current principal has write access to the namespace  Should be similar to the regular app since it will be deployed in the same namespace as the regular app
    Application class name co.cask.cdap.datapipeline.DataPipelineAppco.cask.cdap.datapipeline.DataPipelineApp
    Config json  

    Preview configuration will have three additional configuration parameters -

    {
      "name" : "mypipeline_preview",
      "previewMode" : "ON",
      "previewRecords": "10"
      "previewStageName": "" 
      ...
    }

    Preview stage name configuration can be the name of the pipeline in which case pipeline created so far will be executed, or it could be name of the stage in the pipeline in which case only single stage would be executed.

    LocalArtifactLoaderStage responsible for calling the Application.configure() method DataPipelineApp.configure() gets called to generate the Pipeline specifications. Before generating the pipeline specifications, configurations are validated by class BatchPipelineSpecGenerator. 1. Each stage should have valid configuration for name and plugin type 2. Pipeline should have at least one stage 3. All stages should have unique names 4. Check that from and to have actual stage names 5. Source do not have incoming connections and sink do not have outgoing connection. (Where is the validation happens when we simply have source or sink in the pipeline?)For preview mode we might want to add BatchPreviewPipelineSpecGenerator class which overrides the default validation behavior. What should be validated: 1. There should be at least one source. 2. All the connections should be valid. 3. Its ok if the pipeline does not contain sink. Ideal way to use preview is first drag source then preview, then add transform to the pipeline then preview. If issues in the transform, do preview on the transform stage only.
    ApplicationVerificationStage responsible for verifying the application specs  Behavior should be similar to the regular application.
    DeployDatasetModulesStage Nothing to do hereNothing to do here
    CreateDatasetInstanceStage responsible for instantiating the datasets used by the applicationDatasets are created. For preview application, we will have to create the preview dataset as well. We will still create the datasets specified in the application configurations. (TODO: Who all creates datasets for example table dataset creates one) It is advisable to the user to not to use the production table as sink for preview mode. We will also create one special fileset dataset responsible for storing the preview data. Whenever the pipeline is run in preview mode, before transferring the record to the next stage, record will be appended to the fileset dataset. (TODO exact details need to be figure out as to how the data will be stored in the fileset.) The reason for selecting the fileset dataset is that since it will be written from the MapReduce program, writing to the table dataset would happen in the same transaction as the MapReduce program. If MapReduce program fails for some reason, then the records in the preview dataset will not be visible.
    CreateStreamStage responsible for creating streams used in the application.Streams used by application are created. Behavior would be similar to the regular application.
    DeletedProgramHandlerStage Seems like this stage just stops the currently running program. There is special handling for FLOW but would not be applicable in the pipeline cases. Nothing to do hereNothing to do here
    ProgramGenerationStage receives the ApplicationDeployable instance as an input and emits ApplicationWithPrograms as output. ApplicationWithPrograms contains ProgramDescriptors which holds the id of the program.  Behavior should be similar to the regular application.
    ApplicationRegistrationStage responsible for storing the application specification in the meta data table and update the usage registry for the program. (TODO not sure why we require to update the registry, especially when we have dynamic dataset usage by the programs.)Application specifications are stored in the app meta table. Usage registry dataset gets updated by the program dataset usage. Behavior should be similar to the regular application.
    CreateScheduleStageSchedule store dataset gets updated from schedules in the application. Should be skipped.
    SystemMetadataWriterStageProgram mesatadata (program id and program specification) is written to the dataset Should be skipped. But need to check when the application is actually published the metadata should get register,
  2. Preview application runtime:

    Application runtime stagePersistent store read from/written toRegular applicationPreview application
    REST endpoint to start programRead preferences from preference store. Get application specifications from store to get the ProgramDescriptor.  
    Program lifecycle changesRun record for the program gets updated to the app metadata.  
    Workflow executionWorkflowToken gets stored in the meta data store. Workflow node states get stored in the meta data store. Local datasets may get created by Workflow runtime.  
    Workflow statisticsWorkflow statistics dataset gets updated by the workflow runtime information.  
    LineageLineage information for the dataset accesses is written to the lineage dataset.  
    Logging contextLogs are appended to the kafka(cluster) or file(SDK)  
    MetricsMetrics data is persisted in the leveldb or HBase.  
  3. Applications also write to the user datasets.

Meeting notes from Friday, June 3 2016

  1. In preview mode we write to the records to the preview dataset. If mapper dies for some reason then records written to the preview dataset will not get cleaned.
  2. We should make clear to the user that in preview mode it is not advisable to write to the actual production sink.
  3. When would the preview datasets be cleaned up? If we decide to clean it up after the publishing the pipeline, what if user decides not to publish the pipeline?
  4. What if multiple users are doing preview on SDK? In order to isolate the writes to the preview dataset for the same pipeline from multiple user we can use some sort of session id. However ideally single SDK should not be shared by multiple users.
  5. Think about how the platform will expose a way to view preview data.
  6. Creating streams, datasets in preview?
  7. For logs we will need different appender.
  8. Pipeline preview may write to the actual user datasets used in pipeline as well as preview datasets. Should lineage information be written for the actual datasets and preview datasets?
  9. Is it possible to use separate in-memory injector for the preview process, so that preview data will be stored in memory and will not affect the actual metadata? Is it possible to write the data to different directory while in preview mode? In that case how it will be able to access the datasets stored in the directory as configured by "local.data.dir"
  10. How application communicate back to the UI something like preview collector interface?
  11. REST endpoints to start the preview and preview data collections.
  12. Design should be easy to extend for cluster mode and add more features such as schema propagation where we will simply call the pipeline.configure and return the data. 

Action Items:

  1. Try using separate injector for the preview mode.
  2. Come up with the REST endpoints and mechanism for preview and preview data collection.

REST Endpoints:

  1. To start the preview for an application: 
    Request Method and Endpoint 

    POST /v3/namespaces/{namespace-id}/apps/{app-id}/preview
    where namespace-id is the name of the namespace
          app-id is the name of the application for which preview is to be previewed
    
    Note that user will have to provide the app-id for the preview request.
     
    Response will contain the CDAP generated unique preview_id which can be used further to get the preview data per stage.

    Request body will contain the application configuration along with following additional configs for the preview.

    a.  Connection configuration in the application JSON can be updated to specify the input mock data for the destination stage of the connection.

    Consider pipeline represented by following connections.
    
    "connections":[  
             {  
                "from":"FTP",
                "to":"CSVParser"
             },
             {  
                "from":"CSVParser",
                "to":"Table"
             }
          	]
    
    Now if user wants to provide the input data to the CSVParser instead of reading it from FTP source the inputData can be provided as:
    "connections":[  
             {  
                "from":"FTP",
                "to":"CSVParser",
    			"inputData"  : [
                                     {"offset": 1, "body": "100,bob"}, 
                                     {"offset": 2, "body": "200,rob"}, 
                                     {"offset": 3, "body": "300,tom"}
                                  ]
             },
             {  
                "from":"CSVParser",
                "to":"Table"
             }
          	]
     
    The above configuration indicates that connection from FTP to CSVParser need to be updated to read the inputData instead of reading it from FTP Source directly.


    b. In case there are multiple sources in the pipeline or multiple stages reading from the same source and user wants to provide the mock data for all of them, then every connection emitted from the source need to have the inputData associated with it.

    Consider the following pipeline:
     "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser"
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs"
            }
        ]
    
    Now if user want to preview the pipeline but do now want to read the data from the S3 Source, connections can be updated with the inputData information as 
     "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser",
    			"inputData": [
    							"127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326"
    						 ]	
    
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs",
    			"inputData": [
    							"127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326"	
    						 ]
            }
        ]


    c. If user wants to use actual source for the preview, the number of records read from the source can be limited by the numOfRecords property. 

     "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser",
    			"numOfRecords": 10
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs",
                "numOfRecords": 10,
    			"inputData": [
    							"127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326"	
    
    						 ]
            }
        ]
     
    In the above example configuration, 10 records will be read from the S3 Source in order to pass them to the Log Parser, however since the inputData is specified for the connection S3 Source to Raw Logs, inputData will be passed to Raw Logs without reading from the S3 Source.

    d. If user do not want to write to the sink, following are few possible approaches:

    Approach a) Specify list of sinks to ignore using ignoreSinks property.
    
    "preview" : {
    				"ignoreSinks" : ["Raw Logs"]  
    			}
    
    Approach b) For each connection to the Sink we can add the ignoreConnection property and set it to true as
     "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser",
    			"numOfRecords": 10
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs",
    			"numOfRecords": 10,
    			"ignoreConnection": "true"
            }
        ]
    
    In the example configuration above, preview will write to the Aggregated Results, however would not write to the Raw Logs.


    e. Preview single stage:

    Consider pipeline connections:
    "connections":[  
             {  
                "from":"FTP",
                "to":"CSVParser"
             },
             {  
                "from":"S3",
                "to":"CSVParser"
             },
             {  
                "from":"CSVParser",
                "to":"Table"
             }
          	]
     
    CSVParser in the above pipeline has two input connections, one from FTP and another from S3. 
    In order to preview the single stage CSVParser following configurations can be specified -
     
    "connections":[  
             {  
                "from":"FTP",
                "to":"CSVParser",
    			"inputData"  : [
                                     {"offset": 1, "body": "100,bob"}, 
                                     {"offset": 2, "body": "200,rob"}, 
                                     {"offset": 3, "body": "300,tom"}
                                  ]
             },
             {  
                "from":"S3",
                "to":"CSVParser",
    			"inputData"  : [
                                     {"offset": 1, "body": "500,milo"}, 
                                     {"offset": 2, "body": "600,whitney"}, 
                                     {"offset": 3, "body": "700,yosemite"}
                                  ]
             },
             {  
                "from":"CSVParser",
                "to":"Table"
             }
          	]
    "preview": {
    	"endStages": ["CSVParser"]
    }
     
    Note that in 3.5, only one stage can be provided as a endStages and when endStages is specified, inputData must be provided for all the incoming connections to that stage.


    f. NOT in 3.5   In order to execute the section of the pipeline, endStages can be provided.

    Consider the following pipeline:
     "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser"
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs"
            }
        ]
     
     
    (S3 Source) --------->(Log Parser)--------->(Group By Aggregator)--------->(Aggregated Result)
                |
    			|
    			--------->(Raw Data)	
     
    Now if user wants to preview (Log Parser)--------->(Group By Aggregator) section of the pipeline, endStages can be provided as Group By Aggregator using following configurations:
     
    "connections": [
            {
                "from": "S3 Source",
                "to": "Log Parser",
    			"inputData": [
    							"127.0.0.1 - frank [10/Oct/2000:13:55:36 -0800] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - bob [10/Oct/2000:14:55:36 -0710] GET /apache_pb.gif HTTP/1.0 200 2326",
    							"127.0.0.1 - tom [10/Oct/2000:23:55:36 -0920] GET /apache_pb.gif HTTP/1.0 200 2326"
    						 ]	
            },
            {
                "from": "Log Parser",
                "to": "Group By Aggregator"
            },
            {
                "from": "Group By Aggregator",
                "to": "Aggregated Result"
            },
            {
                "from": "S3 Source",
                "to": "Raw Logs"
            }
        ]
    
    "preview": {
    	"endStages": ["Group By Aggregator"],
        "ignoreSinks": ["Raw Logs"]    
    }
     
      
  2.  

  3. Once the preview is started, the unique preview id will be generated for it. Preview id could be of the form: namespace_id.app_id.preview. The runtime information (<Preview_id, STATUS) for the preview will be generated and will be stored (in-memory or disk). While preview is running if user again sends the preview request for the same application and the STATUS is RUNNING, user will get 403 status code with "Preview already running for application." error message. This enforces only one preview is running at any instance of time for a given application in a given namespace.

  4. If the startStage in the preview configurations is not SOURCE plugin, then the preview system will generate the MOCK source in the pipeline which will read the JSON records specified in the inputData field and convert them into the StructureRecord.

  5. Once the preview execution is complete, its runtime information will be updated with the status of the preview (COMPLETED or FAILED).

  6. To get the status of the preview
    Request Method and Endpoint

    GET /v3/namespaces/{namespace-id}/apps/{app-id}/preview/status
    where namespace-id is the name of the namespace
          app-id is the name of the application for which preview data is to be requested

    Response body will contain JSON encoded preview status and optional message if the preview failed.

    1. If preview is RUNNING
    {
    	"status": "RUNNING" 
    }
    
    2. If preview is COMPLETED
    {
    	"status": "COMPLETED" 
    }
    
    3. If preview FAILED
    {
    	"status": "FAILED"
        "errorMessage": "Preview failure root cause message." 
    }
  7. To get the preview data for stage:
    Request Method and Endpoint

    GET /v3/namespaces/{namespace-id}/apps/{app-id}/preview/stages/{stage-name}
    where namespace-id is the name of the namespace
          app-id is the name of the application for which preview data is to be requested
          stage-name is the unique name used to identify the stage

    Response body will contain JSON encoded input data and output data for the stage as well as input and output schema.

    {
    	"inputData": [
                     	{"first_name": "rob", "zipcode": 95131},
                        {"first_name": "bob", "zipcode": 95054},
                        {"first_name": "tom", "zipcode": 94306}
                     ],
    	"outputData":[
    					{"name": "rob", "zipcode": 95131, "age": 21},
                        {"name": "bob", "zipcode": 95054, "age": 22},
                        {"name": "tom", "zipcode": 94306, "age": 23}
    				 ],
    	"inputSchema": {
    						"type":"record",
    						"name":"etlSchemaBody",
    						"fields":[
    									{"name":"first_name", "type":"string"},
    									{"name":"zipcode", "type":"int"}
    								 ]
    					},
    	"outputSchema": {
    						"type":"record",
    						"name":"etlSchemaBody",
    						"fields":[
    									{"name":"name", "type":"string"},
    									{"name":"zipcode", "type":"int"},
    									{"name":"age", "type":"int"}
    								 ]
    					}  
    }
  8. To get the logs/metrics for the preview:
    Request Method and Endpoint

    GET /v3/namespaces/{namespace-id}/apps/{app-id}/preview/logs
    GET /v3/namespaces/{namespace-id}/apps/{app-id}/preview/metric
    where namespace-id is the name of the namespace
          app-id is the name of the application for which preview data is to be requested

    Response would be similar to the regular app.











  • No labels