Salt Metrics Tables

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

Metrics are written to the metrics table with a row key that is composed from all the dimensions in the metrics context and the metric name. The metric context contains the instance id of the container, which, in a MapReduce job, is the task ID of the mapper or reducer. A large MapReduce that runs 1000 mappers and emits 10 different metrics would therefore write to 10000 rows every second. Because the metrics table is not salted or otherwise spread out, all these writes (having the same prefix of namespace, app, program id, run id) go to the same region. This region goes under very heavy load, which can impact performance of other regions served on the same region server.

Goals

The goal is to salt following metrics tables to avoid hot spotting.

  • cdap_system:metrics.v2.table.ts.1
  • cdap_system:metrics.v2.table.ts.2147483647
  • cdap_system:metrics.v2.table.ts.3600
  • cdap_system:metrics.v2.table.ts.60

User Stories 

  • CDAP system should be able to salt metrics tables so that all the writes does not go to same region.

Design

In order to salt HBase metrics tables, we can create a new salted metrics tables:

  • cdap_system:metrics.v3.table.ts.1
  • cdap_system:metrics.v3.table.ts.2147483647
  • cdap_system:metrics.v3.table.ts.3600
  • cdap_system:metrics.v3.table.ts.60

Tables that do not require salting:

  • cdap_system:metrics.kafka.meta
  • cdap_system:metrics.v2.entity

Once new salted tables are created, all the writes will go to new salted tables only. However, implementation of queries becomes little bit tricky because:

  • There can be some in-flight data in the transport after new tables are created. This means that the same rows can get split between old and new tables. So when we apply scan we need to merge data from both the tables.
  • Now for scan, data can be present in both old and new tables. This means we have to find out the type of the metric (Gauge or Counter) and either increment value of the cell (counter) or return the latest cell value (gauge). 

For queries, we propose to read from both old and new tables. Now when we scan both the tables, we will need to differentiate between metrics types, i.e. Gauge or Counter, in order to merge and return the results. That means for Gauge type of metrics we will return the latest timeseries value and for Counter type we will aggregate the results. To distinguish the type, we can use HBase tags for cells. However, tags are only supported from HBase 0.98 onwards. This means with this approach for lower HBase versions metrics may not be accurate after the upgrade. 

Migration

Metrics Tables with resolution 1s, 1m and 1h have certain retention duration which can be configured from cdap-site.xml. Now, once data in these tables gets expired, a background thread running in metrics processor can delete these tables. However, totals resolution table retains metrics for Integer.MAX_VALUE time. This table we can copy each row from unsalted table to new table with additional column `u`. This additional column will be used to figure out if the row has been copied. 

Implementation

Implement new CombinedHBaseMetricsTable to apply merging of rows from unsalted and slated tables.

public class CombinedHBaseMetricsTable implements MetricsTable {

  private final MetricsTable unsaltedHBaseTable;
  private final MetricsTable saltedHBaseTable;

  @Nullable
  @Override
  public byte[] get(byte[] row, byte[] column) {
	// read from old and new tables
    return new byte[0];
  }

  @Override
  public void put(SortedMap<byte[], ? extends SortedMap<byte[], Long>> updates) {
    // write only to saltedHBaseTable
  }

  @Override
  public void putBytes(SortedMap<byte[], ? extends SortedMap<byte[], byte[]>> updates) {
    // write only to saltedHBaseTable
  }

  @Override
  public boolean swap(byte[] row, byte[] column, byte[] oldValue, byte[] newValue) {
    return false;
  }

  @Override
  public void increment(byte[] row, Map<byte[], Long> increments) {

  }

  @Override
  public void increment(NavigableMap<byte[], NavigableMap<byte[], Long>> updates) {
     // write only to saltedHBaseTable
  }

  @Override
  public long incrementAndGet(byte[] row, byte[] column, long delta) {
    return 0;
  }

  @Override
  public void delete(byte[] row, byte[][] columns) {
    // delete records from both the tables
  }

  @Override
  public Scanner rowScan(@Nullable byte[] start, @Nullable byte[] stop, @Nullable FuzzyRowFilter filter) {
    // scan and merge records from both the tables
    return null;
  }

  @Override
  public void close() throws IOException {

  }
}

Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work