Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status: INCOMPLETE

This page is work in progress 

 


Motivation

If a program takes a longer time than anticipated (rogue workflows), we can pinpoint where the problem is, instead of going over a lot of unnecessary logs. As a System Admin, this would be useful to find which jobs were causing delays in times. As a developer, it will be useful to see what parts of the workflow took how much time, and trying to parallelize/optimize those aspects of the workflow. From a scheduling standpoint, a user can figure out the duration of the average/ worst case runs and set the frequencies of the jobs accordingly. Setting a common standard to show metrics of MR jobs/ Spark at the workflow level will help users understand and analyze their runs better.

User Stories


 

Number

User Story

Priority

1

User wants to find what runs of a workflow have experienced delays in meeting SLA

H

2

User wants to see what are the stats for a workflow

H

3

User wants to know what were the reason behind the delay of certain action/run

H

4

User wants to make future resource allocation decisions based on historical performance of past runs

M

5

User wants to see a common metric across various action types to make things look uniform

L

6

User wants to see common aggregations across all runs of a workflow (see below)

H

7

User wants to see statistics across actions in a workflow

M

 


Design



WorkflowSlaStatsHandler is a part of the App-Fabric

On calling the /stats endpoint

  • We retrieve a list of all completed Workflow Runs from the Workflow Stats Dataset within the provided time range.

  • Using this list we calculate the statistics(percentiles requested by user) for a workflow and all its nodes and then return those stats to the user.

The second endpoint gives the user the ability to dig deeper into investigating what the cause was for runs to take unusually long. The /stats endpoint will, for example, return a list of all run-ids that were greater than the 99th percentile. Using those run-ids, we can dig deeper into analyzing and seeing the difference of a particular run against the normal runs of a workflow.

  • This endpoint provides the user the ability to configure at fine-grained level, the count of runs before and after the current run of interest and also sampling time interval. Let's say the user made a request with count=3 and time_interval=1day, we would return 7 run_ids in the result, 3 runs before the interested run and 3 after, each evenly spaced at 1 day interval.

  • The details will be collected from MRJobInfoFetcher, MetricsStore and MDS.

  • FUTURE IMPROVEMENT: Naively, this will just return a evenly spaced sampling from the interval but we could optimize it to provide those results from the range which are close to the average so that the user does not end up seeing only abnormal runs.


The third endpoint gives us the ability to compare 2 runs of a workflow side-by-side. This ability comes in handy when the user has 2 runs, one which is normal, and the other which is an outlier and compare them to see why the outlier run turned out that way.



Workflow Analytics Service

This service should be a system service, with the ability to be turned on or off as the user desires.


 

Handler Endpoint

Response

Desc

app/{app-id}/workflows/{workflow-id}/statistics?start={st}&end={end}&percentile=80&percentile=90&percentile=95&percentile=99

{

  "startTime" : 1439200000,

  "endTime" : 1439300000,

  "total_successful_runs" : 100,

  "total_unsuccessful_runs" : 10,

  "avg_runTime": 30,

  "median_runtime": 15,

  "percentile_completion" : [

     {"80": 20},

     {"90": 28},

     {"95": 40},

     {"99": 45}

  ],

  "runs_above_Percentile": [

        {"80": ["32af2-44",  "54xxasdss", "..."]},

        {"90": ["234asd23", "..."]},

        {"95": ["65fgdf7657", "..."]},

        {"99": ["84xv6985", "..."]}

  ],

  "nodes": [{

    "node1": {

      "name": "PurchaseEventParser",

      "type": "MapReduce",

      "avg_run_time_seconds": 12,

      "80_percentile_seconds": 18,

      "90_percentile_seconds": 30,

      "95_percentile_seconds": 40,

      "99_percentile_seconds": 50

    }},

 {"node2": {

      "name": "PurchaseAnalyser",

      "type": "Spark",

      "avg_run_time_seconds": 17,

      "80_percentile_seconds": 18,

      "90_percentile_seconds": 30,

      "95_percentile_seconds": 40,

      "99_percentile_seconds": 50

    }

  }]

}

returns basic stats about the workflow program across all runs. This will help us in detecting which jobs might be responsible for delays.

app/{app-id}/workflows/{workflow-id}/runs/{run-id}/stats?start={st}&end={end}&count={count}&time_interval_sec={H}

{

  "start_time": 1439200000,

  "end_time":  1439279804,

  "num_runs": 20,

  "nodes": {

     "node1" : {

      "name": "PurchaseEventParser",

      "p2": {  "runid" : "13fdssvqe423",

               "run_times_seconds": 15,

               "mappers": 4,

               "reducers": 3,

               "file_read_data_bytes": 800,

               "file_write_data_bytes": 900,

               "file_read_data_ops": 80,

               "file_write_data_ops": 90,

               "file_large_read_data_ops": 8,

               "hdfs_read_data_bytes": 700,

               "hdfs_write_data_bytes": 800,

               "hdfs_read_data_ops": 70,

               "hdfs_write_data_ops": 80,

               "hdfs_read_data_ops": 5,

      "total_time_spnt_map_tasks_seconds": 100,

      "total_time_spnt_red_tasks_seconds": 140

              },

      "p1": {

               "runid" : "245fdasfsdaw5ee",

               "run_time_seconds": 20,

               "mappers": 7,

               "reducers": 8,

               "file_read_data_bytes": 800,

               "file_write_data_bytes": 900,

               "file_read_data_ops": 80,

               "file_write_data_ops": 90,

               "file_large_read_data_ops": 8,

               "hdfs_read_data_bytes": 700,

               "hdfs_write_data_bytes": 800,

               "hdfs_read_data_ops": 70,

               "hdfs_write_data_ops": 80,

               "hdfs_read_data_ops": 5,

      "total_time_spnt_by_map_tasks_seconds": 11,

      "total_time_spnt_by_red_tasks_seconds": 9

     },

      "current":{},

      "n1": {},

      "n2": {}   

    },

   "node2" : {"name": "PurchaseSpark",

                  "p2": {},

                  "p1": {},

                  "curr": {},

                  "n1": {},

                  "n2": {}

      }

 }

}

get stats for each stage for a run of a workflow and compares it to + or - count runs that came in the time window provided. The time interval specified allows the previous or next runs to space out in give range.

Reference -

p{x} is x runs before the current run

n{x} is x runs after the current run

app/{app-id}/workflows/{wf-id}/runs/{run-id}/compare?other_run={run-id-2}

[{"node1":[

{"run1": {

      "name": "PurchaseEventParser",

      "type": "MapReduce",

      "runid" : "13fdssvqe423",

      "run_time_seconds": 15,

       "mappers": 4,

       "reducers": 3,

       "file_read_data_bytes": 800,

       "file_write_data_bytes": 900,

       "file_read_data_ops": 80,

       "file_write_data_ops": 90,

       "file_large_read_data_ops": 8,

       "hdfs_read_data_bytes": 700,

       "hdfs_write_data_bytes": 800,

       "hdfs_read_data_ops": 70,

       "hdfs_write_data_ops": 80,

       "hdfs_read_data_ops": 5,

   "total_time_spnt_by_map_tasks_seconds": 100,

       "total_time_spnt_by_red_tasks_seconds": 140

    }},

{"run2": {

      "name": "PurchaseEventParser",

      "type": "MapReduce",

      "run_time_seconds": 50,

      "mappers": 10,

      "reducers": 10,

      "file_read_data_bytes": 800,

       "file_write_data_bytes": 900,

       "file_read_data_ops": 80,

       "file_write_data_ops": 90,

       "file_large_read_data_ops": 8,

       "hdfs_read_data_bytes": 700,

       "hdfs_write_data_bytes": 800,

       "hdfs_read_data_ops": 70,

       "hdfs_write_data_ops": 80,

       "hdfs_read_data_ops": 5,

      "total_time_spnt_by_map_tasks_seconds": 20,

      "total_time_spnt_by_red_tasks_seconds": 30

      }}

]},

{"node2":[

{"run1": {

       "name": "PurchaseSpark",

       "type": "Spark",

       "executors" : 2,

       "driver_running_stages": 4,

       "driver_mem_used_megabytes": 17,

       "driver_disk_space_used_megabytes": 12,

       "driver_all_jobs": 7,

       "driver_failed_stages": 2,

       "filesystem.file_write_ops": 1500,

       "filesystem.file_read_ops": 1000,

        "filesystem.file_large_read_ops": 100,

       "filesystem.file_write_bytes": 15000,

       "filesystem.file_read_bytes": 10000,

       "hdfs.file_write_ops": 1500,

       "hdfs.file_read_ops": 1000,

       "hdfs.file_large_read_ops": 100,

       "hdfs.file_write_bytes": 15000,

       "hdfs.file_read_bytes": 10000,

       "current_pool_size": 5,

       "max_pool_size": 8

    }},

{"run2" : {}}]

}

]



Compare 2 runs of a workflow side by side

 


Design:

 Table

 rowkey : {ns_appId_workflowId_startTime}

 col : runid

 col: time_taken

 col: node_stats  example: [{ program_name, runId, timeTaken_seconds}, { program_name, runId

    timeTaken_seconds}]

 


CDAP Metrics System

  • All the metrics from Spark are emitted to the CDAP Metrics System

  • User metrics and job-level metrics for Map-reduce job are emitted to CDAP Metrics System

  • Metrics for custom-action and conditions nodes aren’t available right now, but they will be emitted to CDAP Metrics System when implemented

  • CDAP Metrics System has metrics aggregated by multiple resolutions {seconds, minutes, hours} - when querying for long-running batch program metrics , it will be useful and efficient to use minutes or hours resolutions to get higher-level details and reduces data points.

  • since we depend on CDAP Metrics System heavily, CDAP should be operational with the metrics.processor and metrics.query services running.

MDS

  • MDS stores run-records of program containing information

    • <app-name> <prg-type> <prg-name> <run-id> <start-time> <stop-time> <final-status> <limit>

  • Use the store directly to access the run records.

Assumptions

  • CDAP Services are running.

  • Dataset service is running.

User Interface

curl calls

Implementation Plan

  1. Create a dataset that stores completed workflow runs

  2. Implement the handler

  3. /stats endpoint

  4. /runs/{run-id}/stats endpoint

  5. /compare endpoint


Limitations

JHS stores data for 7 days only by default. Data that is older is not stored. In that case, we might have to return CDAP metrics for MR jobs if older runs are returned.

New workflow runs can only be added to this dataset at this point. There is no way to backfill the workflow stats table based on historical runs.


  • No labels