Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
Operations dashboard provides users two views: Dashboard and Reporting. The Dashboard view is a real-time interactive interface that allows users to take a sneak peek of the program runs statistics, resource usage within any 24 hours in the last week. A user can also select any hour within the 24 hours view to get program run details within that hour. The Reporting view provides a comprehensive report of program runs, resource usage and other statistics selected by users within a given time range such as 24 hours or one week. However, once users select the types of data to be included in the report, the Reporting view does not support interaction and may require users to wait for the report to be generated.
Goal
Provide API and data required to generate interactive Dashboard view within a shorter time range and comprehensive reports within a longer time range.
User Stories
- As a cluster administrator, I want to see how many programs were running at 8pm yesterday and who started those programs.
- As a cluster administrator, I want to see how memory and core usage changes accross time in the last 24 hours.
- As a cluster administrator, I want to see what programs are scheduled to run in next 24 hours.
- As a pipeline developer, I want to see how many programs failed in the last 24 hours and be able to investigate their logs.
More use cases: https://confluence.cask.co/pages/viewpage.action?title=Operational+Dashboard&spaceKey=PROD
Design
As shown in the diagram above, each program run is currently publishing a message that contains program run meta information such as namespace, program name, status, etc to a TMS topic. Additional fields will be added to these messages for report generation, such as launching method and user. The two additional fields launching method and user will also be added to RunRecordMeta. Such messages are currently processed by ProgramNotificationSubscriberService to persist in an HBase table RunReocrdMeta.DashboardHandler handles requests for dashboard view by scanning RunReocrdMeta/ If the user chooses to enable the report generation feature, a Spark program will be launched. The spark program contains the ProgramMetaSubscriberService that subscribes to the TMS topic and write the program run meta information into time partitioned files. Requests for generating or getting reports will be handled by ReportHandler. Both ProgramMetaSubscriberService and ReportHandler run in the Spark driver. The handler will forward the request to ReportGenerator in Spark executor and ReportGenerator will start a report generation job and return a unique report ID. ReportGenerator will first read from the time partitioned program run meta records and write report files under directories named by report ID's.
Open questions: how will the operation meta data be collected from the cloud? how will resource usage be measured and displayed?
Approach
End-to-End Report Generation
The client first gets a list of all the reports owned by the user by calling GET /reports. With the report id's returned, the user can get further information about each report by calling GET /reports/<report-id> , such as the request used to generate this report and the status of the report generation. If no existing report contains the desired information, the user can call POST /reports to generate a report. A report ID will be returned for the user to query for the status of the requested report. When the report is completed, the user can call GET /reports/<report-id>/summary and GET /reports/<report-id>/runs to get the report summary and details.
The diagram above is generated by https://www.websequencediagrams.com/ with the following lines:
No Format |
---|
title Report Query Sequence Client->ProgramOperationsHTTPHandler: GET /reports ProgramOperationsHTTPHandler->Client: Existing report ids owned by the user Client->ProgramOperationsHTTPHandler: GET /reports<report-id> ProgramOperationsHTTPHandler -> Client: Report generation status Client->ProgramOperationsHTTPHandler: POST /reports with query in body to generate report ProgramOperationsHTTPHandler -> ReportGenerator: Generate report if the report doesn't exist and is not being generated ReportGenerator -> ProgramOperationsHTTPHandler: Job status and Report ID ProgramOperationsHTTPHandler->Client: Report ID Client->ProgramOperationsHTTPHandler: GET /reports<report-id> ProgramOperationsHTTPHandler -> Client: Report generation status Client->ProgramOperationsHTTPHandler: GET /reports/<report-id>/summary ProgramOperationsHTTPHandler->Client: Report summay Client->ProgramOperationsHTTPHandler: GET /reports/<report-id>/runs ProgramOperationsHTTPHandler->Client: Report details |
Report Generator State Diagram
Initially, the report generator is in Idle state waiting for report generation request. When a report generation request arrives, the report generator transitions into Started state and generates the report ID. The report ID is returned in the response to the request, and a new directory will be created with the report ID. In the directory, a Job Creation File will be created, which includes the report generation request, time when the job started and the RunId of the job. After writing the file, the report generator transitions into Running state and it may ends in Completed or Failed state. If successfully completed, a _SUCCESS file and the report will be written.
What to do when the report generator is not in Idle state but a new request is received? How to scale report generator? Expose an endpoint to stop a report generation job?
Data model
Message format
Program Run Status | Existing fields | Additional fields |
---|---|---|
STARTING | programRunId (JSON), startTime, status (STARTING), programStatus, userOverrides (JSON map), systemOverrides (JSON map), TwillRunId (optional) | parentArtifactId (JSON), principal (JSON) |
RUNNING | programRunId (JSON), startRunningTime, status (RUNNING) | |
COMPLETED | programRunId (JSON), endTime, status (COMPLETED) | |
FAILED | programRunId (JSON), endTime, status (FAILED), failureCause (JSON) | |
KILLED | programRunId (JSON), endTime, status (KILLED) |
ProgramRunMetaRecord Schema
No Format |
---|
[ { "namespace": "co.cask.report", "type": "record", "name": "ReportRecord", "fields": [ /** * ID of the source of this programRunMetaRecord. Specifically, TMS messageId in implementation */ {"name": "sourceId", "type": "string"}, {"name": "application", "type": "string"}, {"name": "version", "type": "string"}, {"name": "type", "type": "string"}, {"name": "program", "type": "string"}, {"name": "run", "type": "string"}, {"name": "time", "type": "long"}, {"name": "status", "type": "string"}, {"name": "statusInfo", "type": ["null", "co.cask.report.ProgramStartingInfo"]} ] } }, { "namespace": "co.cask.report", "type": "record", "name": "ProgramStartingInfo", "fields": [ {"name": "artifactName", "type": "string"}, {"name": "artifactVersion", "type": "string"}, {"name": "user", "type": "string"}, /** * A map of runtime arguments */ {"name": "runtimeArgs", "type": "map", "values": "string"}, /** * Method of the program being launched. "method" can have value either "MANUAL" or "SCHEDULED". "triggerInfo" is null * if the program is started manually or contains the TriggerInfo in JSON format if it is started by schedule. */ { "name": "startMethod", "type": "record", "fields" : [ {"name": "method", "type": "string"}, {"name": "triggerInfo", "type": ["null", "string"]} ] } ] } ] |
Report Schema
Report schema will be a subset of the ProgramRunMetaRecord schema depending on the query. sourceId will always be excluded from the report.
Mechanism for storage
ProgramRunMetaRecord File format
Format | Pros | Cons |
---|---|---|
Parquet | Fast read and query, more space efficient, can utilize row group statistics and page statistics to filter records, most importantly, with a given query time range [BEGIN, END), we can efficiently filter out records with startTime < END && endTime > BEGIN with predicate pushdown, program runs started by certain users or status with dictionary pushdown. | Slower write (less relevant since write is not frequent); cannot fully utilize the column chunk if new program runs are not frequently generated |
Avro | Faster write. Can append row by row with less delay, good for realtime dashboard view. | Slower filtering and random access; more space consumption |
Suppose records have average size 400B, there are 100 new records every minute, a file with all records generated in 24 hours is roughly 50MB, this can easily fit into one block of HDFS file.
Since there are only around 20 fields and most fields will be included in a report, Parquet doesn't really have much advantage of querying. However, if each individual file is large and require filtering out a small fraction of records, Parquet still has advantage in reading.
ProgramRunMetaRecord Storage Layout
/<meta-file-base-directory>/<namespace>/<date>/<file-creation-time>.avro
Files are stored in HDFS directories as specified above according to namespace, and each file contains only program runs within a certain time range, as specified by the filename.
For instance, a file with path "/<meta-file-base-directory>/default/2017-12-02/1519095562.avro" is the file containing all program runs meta information in namespace default between 12am UTC 2017-12-02 and before 12am UTC 2017-12-03.
TTL?
Report Files Layout
/<reports-base-directory>/<owner>/<report-id>/
Reports will be stored in HDFS directory with a path like above. There are three possible combinations of files in the directory:
_STARTED
_STARTED, _SUCCESS, report and summary
_STARTED and _FAILED
_STARTED file is created when the report generation job first starts and it includes the report generation request, time when the job started and the RunId of the job. If the RunId in _STARTED doesn't match the RunId of the current Spark program run, then the report generation failed and a _FAILED file will be created. _SUCCESS file indicates the report generation succeeds. _FAILED file indicates the report generation fails and contains the reason of failure. The report and summary files are both JSON text files following the same format as specified in the REST API response.
Runtime Infrastructure
Report ID Generation
Report ID's are generated by the report generator.
Report Generation Job
Approach #1: A CDAP app which contains a service and a Spark program. The service is the endpoint for accepting new jobs and polling job status. If a report already exists or is being generated, the service will return the existing report link or the runId of the running job. The Spark program performs report generation and will store the result as a file. Pro: easy to develop. Con: overhead of launching Spark program.
Approach #2: A long running Spark program that contain HTTP service handler. Pro: less overhead of launching Spark program. Con: occupies more resource even when no job is being generated.
Scalability
Scalability of the ProgramOperationHandler: TODO
Scalability of the ReportGenerator: TODO
Report Pagination
Approach #1: Without offset in the request and return a body producer that can be used to call next.
Approach #2: Save the filtered and sorted results in a file and get offset from the request.
Report Cleanup
When each report is being created, there is a TTL for these files (for instance, 60 days). Before ReportHTTPHandler return the report, it will first check whether the files have exceeded the TTL since created. Expired files won't be returned. A background thread will periodically cleanup files that have existed for more than TTL + delay (maybe 1 hour), to reduce conflict between ReportHTTPHandler reading the files and deletion.
Share ID Generation
Share ID is generated by encrypting the report id, owner of the report, a random seed and maybe the share ID creation time with a private key. A share id can be decrypted to get the information for locating the report.
Failure Handling
Failure during writing ProgramRunMetaRecord file: after restart, latest sourceId will be found from latest meta record files under each namespace will be read. Message subscriber will read messages after the latest sourceId. The unfinished runs from the latest complete row block will also be read into memory to be carried over to the next row block.
Failure during generating report: since the Report Status won't be updated after failure, existing partial report will be rewritten after the report generation reruns. It is not feasible to continue with the existing report because if some fields are sorted in the report, since the start point of reprocessing meta records cannot be determined.
Query & Reporting structure
Sample Dashboard Query Scenarios
- Select all records from namespace A, B, C from 2017-12-10/13-00 to 2017-12-11/14-00 with fields application, version, program, run, start time, end time, principal name, parent artifact type, launching method, memory usage and core usage.
Sample Reporting Scenarios
- Select all records from namespace A from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name, ranked by highest memory usage
- Select all records from namespace A from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and memory usage, filtered by a given principal name
- Select all records from namespaces A, B, C from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name, ranked by highest memory usage filtered by a given principal name
- Select all records from namespaces A, B, C from 2017-12-10/13-00 to 2017-12-17/14-00 with fields application, version, program, run and principal name filtered by memory usage larger than 500MB
Security
Authentication Design:
On Authentication enabled cluster, Spark service handler can get the user id using the HTTP header “CDAP-UserId”
We will use the userId to create directory under reports structure and store reports generated by this user under this directory.
On authentication disabled clusters, this header won’t be present, and we will use “system” directory to store the reports.
For listing reports, we construct path based on user-id and list reports under this directory.
For sharing reports, we will create an encrypted id (link) based on user name and file and send it as response. The user can share this share-id with their colleagues for sharing the report.
Users with a share-id, use that share-id to retrieve a report, On backend after receiving the shared id, we decrypt username and file name and construct the correct path to the report, which we will use to read and return the report.
Dashboard View
Program Runs Statistics:
- The average duration of the completed program runs in each time bucket within a time range.
- Future scheduled program runs with namespace, program name, parent app type and
- user ( ? )
Resource Information:
- Memory usage of namespace(s) (?), clusters
- Max memory available
- Core usage of namespace(s) (?), clusters
- Max number of cores available
Report View
Additional Program Runs information besides those in Dashboard view:
- Memory Usage
- Number of CPU
- Number of Containers
- Number of Log Warnings
- Number of Log Errors
- Number of records out
Summary Counts:
- Runs per namespace
- Time range
- Pipelines (Realtimevs Batch), custom apps
- Durations: min, max, average
- Last Started: Oldest and Newest
- List of users & count per user
- List of start method & count per methods
API changes
RunRecord API Change
Code Block | ||
---|---|---|
| ||
/** * This class records information for a particular run. */ public class RunRecord { ... @SerializedName("artifactname") private final String artifactName; @SerializedName("artifactversion") private final String artifactVersion; @SerializedName("principal") private final String principalName; @SerializedName("principaltype") private final String principalType; ... } |
New
Schedule REST APIsNew Workflow Schedule REST APIs
Query Parameters:
namespace - the namespace of the schedules (at least one namespace must be specified)
start - the start time in seconds
duration - the duration in seconds (at most 86400, which is 24 hours)
200 - On successfully returning the list of next runtimes
400 - Bad request with the query parameters
A list of JSON object
No Format |
---|
[
{
"id":"<id>",
"time":"<time>"
}
] |
where id is a "." separated programId and time is scheduled time in millis
New REST APIs
Path | Method | Description | Request | Response Code | Response | ||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
/v3/reports?offset=<offset>&limit=<limit> | GET | List all reports owned by the given user | Query Parameters: offset - the offset to start read the reports list limit - the max number of reports returned | 200 - On successfully returning the list of reports | Return a JSON object
Each entry in "reports" contains the name of the report, description of the report (optional), whether the report is saved or not, if not saved, expiry time of the report as epoch time in seconds, the time when the report was created, the "request" contains the request object to create the report in "POST /v3/reports" endpoint | ||||||||
/v3/reports | POST | Request to generate program runs report with required fields, filtering, and sorting conditions specified in the request body. | Request body: The name of the report, the start (inclusive, all runs in the report will have no end time or end time >= start) and end (exclusive, all runs in the report will have start time < end) time in seconds to indicate the time range for generating the report. Specify required fields in an array in "fields" property. Use "sort" to indicate whether to sort records by a field in ascending or descending order. Specify the filtering conditions in "filters". To get records with certain values in a field, add the values to be matched in "whitelist". To exclude records with certain values in a field, add the values to be excluded in "blacklist". To get records with values within a certain range in a field, specify the "min" (inclusive) and "max" (exclusive) in "range". One of "min" or "max" can be omitted in "range" to indicate an open-ended range.
A complete list of all the possible fields in the report and eligible properties for each field are listed below:
Example:
| 200 - On successfully starting a new report generation job or a report generation is in progress, or the requested report already exists 400 - Invalid time range, invalid query, or nonexistent namespace in the request | Return a JSON object
The "id" is a unique and deterministic identifier to check the status of the report generation job and to retrieve the report result. | ||||||||
/v3/reports/info?reportId=<report-id> (or) share-id=<share-id> | GET | Request to get meta info and status about a report | Query Parameter: report-Id - id of the report share-id - the shareid granted by the owner of this report if it exists Either of the reportId and shareId is expected. If none or both specified a BadRequestException will be thrown. reportId will be used in adjunction with user information header to identify the report. while shareId will be decoded to find out the owner and reportId information and the decoded information will be used to identify the report. | 200 - On successfully returning the result 404 - When the given report does not exist 400 - Bad Request - missing reportId and shareId. | A JSON object including the creation time of the report, status, name, description (optional), the error that caused the report generation failure (optional, present only when the report generation failed), report generation request, summary of the report (optional, present if the report generation succeeds:
| ||||||||
/v3/reports/<report-id>/save | POST | Request to save the report with a given name and description |
| 200 - On successfully updating the report 400 - Changing saved report to be expiring, invalid name or timeout, or other mistakes in the request 404 - When the given report does not exist | |||||||||
/v3/reports/download?offset=<offset>&limit=<limit>&report-id=<reportId> (or) share-id=<share-id> | GET | Request to read a program runs report starting at the given offset and with at most the number of records specified by the limit | Query Parameters: offset - the offset to start read the report from limit - the max number of runs returned report-id - id of the report share-id - the share-id granted by the owner of this report if it exists Either of the reportId and shareId is expected. If none or both specified a BadRequestException will be thrown. report-id will be used in adjunction with user information header to identify the report. while shareId will be decoded to find out the owner and reportId information and the decoded information will be used to identify the report. | 200 - On successfully returning the details of a completed report 202 - On the report is still being generated 400 - Invalid offset, limit in the request, or the report generation has failed for the given report ID 404 - When the given report does not exist | A JSON format of the records:
| ||||||||
/v3/reports/<report-id> | DELETE | Delete the given report | 200 - On successfully deleting the report 404 - When the given report is not available | ||||||||||
/v3/reports/<report-id>/share | POST | Request for a permalink to be shared other users to read the report. | 200 - On successfully generating the permalink 404 - When the given report is not available | A JSON object containing the permalink
| |||||||||
/v3/dashboard?start=<start>&duration=<duration>&namespace=<namespace1>&namespace=<namespace2> | GET | Request for the program runs within the given time range. | Query Parameters: start - the start time in seconds to get the dashboard view duration - the duration in seconds the dashboard view (either 3600 or 86400) namespace - the namespace to be included in the dashboard view (there can be more than one) | 200 - On successfully returning the result 400 - Invalid time range in the request | A JSON object containing a list of the program runs details within the querying time range:
For program runs scheduled in the future, there will be no "start", "running", "end", and "status" fields. |
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release 5.0
Next Steps
- Alternative of adding parentArtifact in the message: lookup the artifactId per application
- Spark vs local process
- Conduct experiments with various query scenarios to determine query time with Parquet and Avro.
- Depending on the experiment result, determine the design:
- If Parquet is significantly faster than Avro (say 1 minutes vs 10 minutes), we need to write meta record files in batch in order to leverage the performance advantage of Parquet. One possible implementation is to keep a long running Spark program with HTTP handler, and let it periodically process and persist records in batch. When report generation request arrives, if record files do not contain the later records requested in the query time range, Spark will directly read from TMS HBase table.
- If Avro doesn't have performance issue, we can just keep appending Avro files without worrying about batch writing.
- Need to determine the contract of report existence and report generation status solely based on files. For instance the presence of __SUCCESS file means a job succeeded.
Future Work
#1 | /v3/namespaces/<namespace-id>/runs?statuses=<statuses>&start=<startTs>&end=<endTs>&limit=<limit> | GET | Returns the RunRecord's under a given namespace within the given time range. Statuses are comma separated ProgramRunStatus, defaults to ALL if left empty. | 200 - On success 500 - Any internal errors | RunRecord's |
Report sharing: The report owner can choose to share the report with others or revoke the privilege of others to view the report. If a user only has READ privilege on the report, which is granted by the original owner, this user cannot share the report with others. REST API's are listed below.
/v3/namespaces/<namespace-id>/report/<report-id>/privileges/grant | POST | Grant another user with the privilege to read the report | 200 - On success 403 - Current user is not authorized to grant privilege for this report 404 - When the given report is not available 409 - Privilege is already granted | { "entity": { "namespace": "<namespace>", "entity": "REPORT", "id" : "<report-id>" }, "principal": { "name": "admin", "type": "ROLE" }, "actions": [ "READ" ] } |
/v3/namespaces/<namespace-id>/report/<report-id>/privileges/revoke | POST | Revoke another user's privilege to read the report | 200 - On success 403 - Current user is not authorized to revoke privilege for this report 404 - When the given report is not available 409 - Privilege is already revoked | { "entity": { "namespace": "<namespace>", "entity": "REPORT", "id" : "<report-id>" }, "principal": { "name": "admin", "type": "ROLE" }, "actions": [ "READ" ] } |