Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Message consumption
    1. During message consumption, we can simply apply a lower bound on the scan start row based on the TTL setting of the topic being consumed from
  2. Message cleanup
    1. In local mode, LevelDB is used as the backing store. The messaging system will have a cleanup thread up and running and periodically scan and delete old entries based on the TTL setting of each topic.
    2. In distributed mode, since HBase is used as the backing store, the best way to do cleanup is to use a coprocessor that drops expired cells at flush/compaction time. The TTL setting can be made available to the coprocessor through the table attribute.

Implementation

...

Considerations

  • To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
  • Batch write should be used to give higher throughput and lower latency for concurrent publishing
    • Refer to ConcurrentStreamWriter on the implementation
  • When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persists after all datasets writes), meaning it has to be the last TransactionAware to be used in the transaction context.
    • This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to Apache Tephra
  • Multiple instances of messaging service instances (not part of the initial scope)
    • When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
      1. All instances are equal. This mean all instances can handle publishing and consumption from all topics
        1. Different instances need to generate different seq_id for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
        1. (Pro) Easy to implement as there is no coordination needed
        2. (Con) The consumer scan cache can take up a lot of space because all instances would be caching for all topics
      2. Perform resource assignment through the ResourceCoordinator such that a given topic would only be handled by a sub-set of instances
        1. Still need to handle sequence id generation such as to avoid key collision
        2. (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
        3. (Con) Implementation can be complicated as it involves
          1. Usage of ResourceCoordinator
          2. Usage of DistributedLock for the topic transition to have a guaranteed leader set for a topic
          3. Traffic forwarding and retry are needed during topic transition from one instance to another
  • Removing entries with invalid transaction in the MessageTable is also needed. Refer to the Tephra implementation on how this should be done.

...