Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Support both transactional and non-transactional message publishing
  • Support both transactional and non-transactional message consumption
  • Messages are ordered by publish time
    • This helps address messages by timestamp
  • Maintain the same message ordering for all consumers
    • Consistent message ordering is the key to reprocess messages safely
    • Easy to write tool to inspect what's inside the queue
  • Each message has a unique message ID. The message IDs should be sortable in the same order as the message ordering as seen by the consumers
  • Latency must be as low as possible.
    • For messages published non-transactionally, they should be available for consumption right after it is persisted in the messaging system
    • For messages published transactionally, they will be persisted to the message system as close as possible to the transaction commit time
      • Use either client side buffering or auxiliary payload table to minimize the "blocking" effect of non-committed transaction. See later section for the detail design.
  • Consumers poll for new messages instead of pushing from the server
    • This gives consumer more control
  • Messages are immutable once they get "accepted" by the messaging system
    • For non-transactional message, "accepted" means the message is persisted
    • For transactional messages, "accepted" means the transaction of that message is committed
    • Ordering are immutable as well
  • A centralized messaging service will be provided and all messaging clients only interact with the messaging service through REST
    • Multiple instances of the messaging service can be up and running at the same time for scalability
  • The messaging system should have minimal dependencies on other CDAP services, otherwise we cannot use it as a way to publish error status in case something failed.
    • Shouldn't depends on Transaction Service
    • Shouldn't depends on Dataset Service
    • Failure to emit log or metrics shouldn't be a failure for the messaging operations
      • Meaning no hard dependency on Kafka

Message Table

The Message Table provides ordering of messages and entries in this table are immutable. Immutability is the key to provide consistent ordering across consumers. Also, immutable means entries can be cached for multiple consumers to consume without consulting the backing storage until the cache runs out. This provides an easy way to boost performance.

...

  • To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
  • Batch write should be used to give higher throughput and lower latency for concurrent publishing
    • Refer to ConcurrentStreamWriter on the implementation
  • When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persist after all datasets writes), meaning it has to be last TransactionAware to be used in the transaction context.
    • This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to tephra.
  • Multiple instances of messaging service instances (not part of the initial scope)
    • When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it looks like
      1. All instances are equal. This mean all instances can handle publishing and consumption from all topics
        1. Different instances need to generate different seq_id for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
        1. (Pro) Easy to implement as there is no coordination needed
        2. (Con) The consumer scan cache can take up a lot of spaces because all instances would be caching for all topics
      2. Perform resource assignment through the ResourceCoordinator such that a given topic would only be handled by a sub-set of instances only
        1. Still need to handle sequence id generation to avoid key collision
        2. (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
        3. (Con) Implementation can be complicated as it involves
          1. Usage of ResourceCoordinator
          2. Usage of DistributedLock for the topic transition to have a guaranteed leader set for a topic
          3. Traffic forwarding / retry needed during topic transition from one instance to another
  • Removing entries with invalid transaction in the MessageTable is also needed. Refer to Tephra implementation on how this should be done.

Security

TBAFor authentication, it is pretty much the same as all other intra-CDAP REST calls. For authorization, we can do it at the topic level.

API

REST

TBA

Programmatic

TBA