Introduction
CDAP has many use cases for using a reliable, transactional messaging system for communication between different components. Currently we use Kafka as an interim solution for some of these use cases. But Kafka has a few shortcomings: it is not transactional (that is, we cannot ensure that a state transition and the notification about that state change happen consistently); and it is not durable (that is, if we lose a file system, we may lose messages). This document outlines use cases and requirements for the messaging system, and discusses the design of the implementation.
Use Cases
- The System (Dataset Framework) wants to publish events about programs accessing datasets. Consumers for these events are the lineage writer, usage registry, and audit trail.
- The System wants to publish events about meta data changes for entities. These events will be consumed by the Tracker extension.
- The System (Scheduler) wants to trigger runs of programs (workflows) based on events; for example, availability of data, status of another program, external events, etc.
- The System (App Fabric) wants to emit notifications about the state of running programs (or their completion). This may include changes to the workflow token as execution of a workflow progresses.
- The System wants to notify programs about a change in their configuration.
- The User (Developer, DevOps) wants to receive notifications about events in the system, such as meta data changes, program state transitions, data availability, etc.
- The User (DevOps) wants to trigger programs based on custom notifications.
- The User (Developer, DevOps) wants to send control messages to all containers of a program or an app.
The following are explicitly not intended as use cases:
- publishing logs
- emitting metrics
- transporting data (especially row/cell level data: the volume is too much)
Requirements
- Throughput should be relatively low, in the thousands of messages per second at most
- Throughput can scale linearly with resources available for the messaging system
- Latency should be near real time, ideally sub-second. However, this is impacted by event ordering (see below for discussion)
- Messaging system must be capable of transactional operations, configurable at the topic level
- In the context of a larger transaction: for state change, we need to ensure that either both or none of the state change and the corresponding notification happen
- In a transaction by itself: for cases such as audit logging, we need mostly the C and D from ACID: consistent and durable
- Non-transactionally: for some use cases, performance tops reliability, and messages would be sent asynchronously, possibly with retry, but dropped in case of failure to send
- Publishing must be reliable and durable: if a message is published and the messaging system responds with OK, then the message is guaranteed to be persisted and eventually available to all consumers
- Publishing must be possible both synchronously and asynchronously, although that can be a client-side option.
- Publishing must be available as a service, via an RPC call. The protocol must support the TransactionAware interface.
- The messaging system must preserve the order in which messages are sent for consumers
- Order must be preserved within a topic, but need not be guaranteed across topics
- All consumers must see the same message ordering, regardless of when the consumption happen
- The messaging system must provide visibility into its state
- Metrics about queue size, throughput, latency, ...
- Ability to inspect the content of a queue without consuming it (this is mostly for recovery from problems)
- The message queue must be addressable by time. That is, a consumer must be able to consume starting with messages that were sent at a given time.
- Messages must expire after a configurable time-to-live
- Configurable at the topic level
- Overridable for each message with a shorter life time than configured for the topic
- The messaging system must be as self-contained as possible
- It must not rely on the Dataset Service. This is to ensure that messages can be transported even in case of a DS Service outage.
- Non-transactional publishing must be available even when the Transaction Service is down
- Consumption of topics is not partitioned. That is, every consumer will see all messages in a topic.
Discussion
Latency vs. Ordering
Our goal is to reduce the latency of message delivery to a minimum (as long as we can assume that the consumers are able to keep up with the rate of incoming messages). However, if we want to preserve the order of messages, that can impact latency. The reason is that some messages may be enqueued but not immediately visible, because their transaction has not committed yet. That means that other messages that were published later may become visible first.
In response, we can either:
- Block those later messages from consumption until all earlier messages have become visible. That would impact the latency for the later messages (which are already available but can't be consumed).
or - Let the consumer retrieve the later messages, and deliver the earlier one(s) as soon as they becomes visible. That, however, would impact the order of consumption.
For easy consumption and consistency between consumers, we choose ordering over latency. With proper implementation (discussed below), latency can be minimize while maintaining proper ordering.
Design
Before going deep into the details, these are the key principles/features of the messaging system, based on the use cases and requirements described above:
- Support both transactional and non-transactional message publishing
- Support both transactional and non-transactional message consumption
- Messages are ordered by publish time
- This helps address messages by timestamp
- Maintain the same message ordering for all consumers
- Consistent message ordering is the key to reprocess messages safely
- Easy to write a tool to inspect what's inside the queue
- Each message has a unique message ID. The message IDs should be sortable in the same order as the message ordering as seen by the consumers.
- Latency must be as low as possible.
- For messages published non-transactionally, they should be available for consumption right after they are persisted in the messaging system
- For messages published transactionally, they will be persisted to the message system as close as possible to the transaction commit time
- Use either client side buffering or an auxiliary payload table to minimize the "blocking" effect of a non-committed transaction. See later sections for the detailed design.
- Consumers poll for new messages instead of pushing from the server
- This gives consumer more control
- Messages are immutable once they get "accepted" by the messaging system
- For non-transactional message, "accepted" means the message is persisted
- For transactional messages, "accepted" means the transaction of that message is committed
- Ordering is immutable as well
- A centralized messaging service will be provided and all messaging clients will only interact with the messaging service through RESTful APIs
- Multiple instances of the messaging service can be up and running at the same time for scalability
- The messaging system should have minimal dependencies on other CDAP services, otherwise we cannot use it as a way to publish error status in case something has failed.
- Shouldn't depend on the Transaction Service
- Shouldn't depend on the Dataset Service
- Failure to emit log or metrics shouldn't be a failure for the messaging operations
- Meaning no hard dependency on Kafka
Message Table
The Message Table provides ordering of messages, and entries in this table are immutable. Immutability is the key to provide consistent ordering across consumers. Also, immutable means entries can be cached for multiple consumers to consume without consulting the backing storage until the cache runs out. This provides an easy way to boost performance.
In Standalone, the Message Table is backed by a non-transactional LevelDB table. In distributed mode, it will be backed by a non-transactional HBase table.
Row Key Format
concat(topic, publish_timestamp, seq_id)
topic
- String representation of the topic name
publish_timestamp
- 8 bytes time in milliseconds
seq_id
- 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
- The 2 bytes gives us 64M messages per second per topic, which is well over the limit we need
Columns Format
Each row in the Message Table can have up to two columns and should have at least one column.
Column Name | Description |
---|---|
t | If present, stores the transaction write pointer (8 bytes) of the transaction that was used for the message publishing. Only messages published transactionally have this column. |
p | If present, stores the message payload. If it is missing, the will be used to scan the Payload Table (describe below) to fetch messages published within that transaction. |
Payload Table
The Payload Table stores the message payload for messages published transactionally. Not all transactional message payloads are stored in this table; only for the ones requested by the client. Typically, this table can be used to store the payload of messages published from long-running transaction to minimize consumer latency. The details of the protocol will be discussed below.
In Standalone, the Payload Table is backed by a non-transactional LevelDB table. In distributed mode, it will be backed by a non-transactional HBase table.
Row Key Format
concat(topic, transaction_write_pointer, write_timestamp, p_seq_id)
topic
- String representation of the topic name
transaction_write_pointer
- 8 bytes transaction write pointer of the transaction that the message was published from
write_timestamp
- 8 bytes time in milliseconds of the time when the message was persisted
- This is for ordering messages published in the same transaction
p_seq_id
- 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
Column Format
Column Name | Description |
---|---|
p | Stores the message payload |
Message Identifier
With the row key format described above, each message in a topic is uniquely identified by an message ID in this format:
concat(publish_timestamp, seq_id, write_timestamp, p_seq_id)
For messages that don't store the payload in the Payload table, the write_timestamp
and p_seq_id
are both "{{0}}".
The message ID are also sortable by lexicographical order, which gives the ordering of messages as seen by any consumer.
Publishing
In this section, we will discuss the protocol for publishing messages, both transactionally and non-transactionally.
Non-transactional
For non-transactional publishing, it is relatively straightforward.
- Create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
- The sequence id can easily be generated by memorizing the last publish timestamp in the messaging service process
- Batching of concurrent requests to the same topic can use the same publish timestamp with an incremental sequence id
- Write the row with the payload stored under the payload column (the
p
column)- If there is batching, then multiple row keys will be generated in step 1 and multiple rows will be inserted as a batch operation to the backing store
Transactional
For transactional publishing, it is broken down into two different cases in order to provide better latency and throughput under different usage situations
Message Write Time is Close to Transaction Commit Time
This is mainly the case for messages published from a transaction used in the same JVM process. For example, Worker, Flowlet, Service, and Lifecycle methods of MR and Spark. All messages published will be buffered on the client side and only write to the messaging system when the transaction is ready to commit.
The mechanism on the messaging service side is pretty much the same as the non-transactional some, with the addition of the transaction write pointer being recorded in the Message Table as well.
- Create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
- Write the row with the payload stored under the payload column (the
p
column) and the transaction write pointer under the transaction column (thet
column)
Message Write Time is Not Close to Transaction Commit Time
This includes messages published from MR tasks, Spark driver and executors, as well as many of the messages being published from the same JVM process such that it exceeds the client side buffer (this should be a very rare case).
Instead of writing messages directly to the Message Table, the payloads are written to the Payload Table as the messages are published. At the transaction commit time, an entry will be inserted in the Message Table, which serves the purpose of inserting all messages published in the same transaction at that time.
- On message publish, create the row key for the Payload Table, using the topic, transaction write pointer, current timestamp and sequence id
- Sequence id is generated similar to that described above
- Batch inserts can also be done as described above
- At transaction commit time, which is notified by the client, create the row key for the Message Table, using the topic, publish timestamp (same as current timestamp) and sequence id
- Write the row with the transaction write pointer stored under the transaction column (the
t
column)
Rollback of Published Message
The publishing client is responsible for telling the messaging system to rollback published messages (by Message IDs). To rollback, simply delete the corresponding entries in the Message Table and the Payload Table. Since we encoded enough information in the Message ID, it should be just a scan and delete of the Payload Table, without the need of reading from the Message Table.
Consuming
The general principle of consuming messages is to scan the Message Table in ascending order. When it encounters an entry with the payload stored in the Payload Table, scan the Payload Table using the transaction write pointer as the prefix.
Non-transactional
This is the low latency choice since it will never get blocked by uncommitted messages. However, the side effect of that is it will see uncommitted messages. In case the uncommitted messages are rollbacked, a non-transactional consumer will see more messages than the transactional consumer, although the ordering of committed messages will remain the same.
- Scan the Message Table, with an optional start Message ID or timestamp, which can either be inclusive or exclusive
- Depending on whether the row has the payload column (the
p
column), the handling is different:- With the payload column (the
p
column)- The row represents a message and the payload is the value in the
p
column - Message ID is generated from the row key as
concat(publish_time, seq_id, 0L, 0)
- The row represents a message and the payload is the value in the
- Without the payload column (the
p
column): the transaction column (thet
column) must exist- Scan the Payload Table with prefix
concat(topic, transaction_write_pointer)
, where thetransaction_write_pointer
is the value of thet
column in the Message Table - Each row encountered during the scanning is a message and the payload is the value in the
p
column - Message ID is generated from the row key in the Message Table and the row key in the Payload Table as
concat(publish_time, seq_id, write_timestamp, p_seq_id)
- Scan the Payload Table with prefix
- With the payload column (the
Transactional
Transactional consumption basically follow the same procedures as the non-transactional one, with the addition that it will stop at the first uncommitted message when scanning the Message Table. The transaction information comes from the client and it is the client's responsibility to open a new transaction in order to get a new snapshot of committed messages in the messaging system. This will increase the latency of message consumption, but with the technique described above for message publishing, this latency should be minimal; in the range of less than a second.
Message Retention
All published message will be stored in the messaging system until it expires, which is a property defined by topic. Since we use one Message Table for all topics, we cannot use the simple TTL mechanism as provide by HBase. The support of message retention can be broken down into two parts:
- Message consumption
- During message consumption, we can simply apply a lower bound on the scan start row based on the TTL setting of the topic being consumed from
- Message cleanup
- In local mode, LevelDB is used as the backing store. The messaging system will have a cleanup thread up and running and periodically scan and delete old entries based on the TTL setting of each topic.
- In distributed mode, since HBase is used as the backing store, the best way to do cleanup is to use a coprocessor that drops expired cells at flush/compaction time. The TTL setting can be made available to the coprocessor through the table attribute.
Implementation Considerations
- To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
- Batch write should be used to give higher throughput and lower latency for concurrent publishing
- Refer to
ConcurrentStreamWriter
on the implementation
- Refer to
- When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persists after all datasets writes), meaning it has to be the last
TransactionAware
to be used in the transaction context.- This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to Apache Tephra
- Multiple instances of messaging service instances (not part of the initial scope)
- When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
- All instances are equal. This mean all instances can handle publishing and consumption from all topics
- Different instances need to generate different
seq_id
for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
- (Pro) Easy to implement as there is no coordination needed
- (Con) The consumer scan cache can take up a lot of space because all instances would be caching for all topics
- Different instances need to generate different
- Perform resource assignment through the
ResourceCoordinator
such that a given topic would only be handled by a sub-set of instances- Still need to handle sequence id generation such as to avoid key collision
- (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
- (Con) Implementation can be complicated as it involves
- Usage of
ResourceCoordinator
Usage of DistributedLock
for the topic transition to have a guaranteed leader set for a topic- Traffic forwarding and retry are needed during topic transition from one instance to another
- Usage of
- All instances are equal. This mean all instances can handle publishing and consumption from all topics
- When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
- Removing entries with invalid transaction in the MessageTable is also needed. Refer to the Tephra implementation on how this should be done.
Security
For authentication, it is pretty much the same as all other intra-CDAP RESTful calls. For authorization, we can do it at the topic level.
Tables used for store
- MessageTable
- PayloadTable
- MetadataTable
APIs
RESTful
Base URL: /v1/namespaces/<ns-id> (since REST API is internal)
Create Topic
- Request method and URI
PUT [base_url]/topics/[topic]
- Request body
- Can be empty
- If provided, it is a JSON object containing topic properties
e.g. {"ttl" : [ttl-in-seconds]}
- Response
- 200 OK if the topic was created successfully
- 409 CONFLICT if a topic of the same name exists already
- 400 BAD REQUEST if the given TTL is invalid
Update Topic
- Request method and URI
PUT [base_url]/topics/[topic]/properties
- Request body
- JSON object containing topic properties. Note that this call will replace all the existing properties.
e.g. {"ttl" : [ttl-in-seconds]}
- JSON object containing topic properties. Note that this call will replace all the existing properties.
- Response
- 200 OK if the topic properties were updated successfully
- 404 NOT FOUND if the topic is not present
- 400 BAD REQUEST if the properties were not correct
Get Topic Properties
- Request method and URI
GET [base_url]/topics/[topic]
- Request body
- JSON object containing the topic name and its properties.
e.g. {"name" : "topic1", "properties" : { "ttl" : "123231" } }
- JSON object containing the topic name and its properties.
- Response
- 200 OK
- 404 NOT FOUND if the topic is not present
List Topics
- Request method and URI
GET [base_url]/topics
- Request body
- JSON object containing a list of topic name and its properties.
e.g. [ {"name" : "topic1", "properties" : { "ttl" : "123231" } }, {"name" : "topic2", "properties" : { "ttl" : "1211" } } ]
- JSON object containing a list of topic name and its properties.
- Response
- 200 OK
Delete Topic
- Request method and URI
DELETE [base_url]/topics/[topic]
- Response
- 200 OK if the topic was deleted successfully
- 404 NOT FOUND if the topic is not present
Publish Message
There are two separate endpoints to support different publishing cases as described above. They both share the same request format.
Request Body Schema
{ "type" : "record", "name" : "PublishRequest", "fields" : [ { "name" : "transactionWritePointer", "type" : [ "long", "null" ] }, { "name" : "messages", "type" : { "type" : "array", "items" : "bytes" } } ] }
- Request body
- Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
Schema Fields:
messages
- Contains an array of byte arrays that correspond to messagestransactionWritePointer
- Corresponds to a transaction write pointer.
Store Messages to Payload Table
POST [base_url]/topics/[topic]/store
Request body should contain transactionWritePointer and messages
- Response:
404 NOT FOUND if the topic is not present
200 OK if message is persisted
Publish Messages to Message Table
- This call can be preceded by multiple calls to the 'store' endpoint. If it is preceded by calls to store with same
transactionWritePointer
, then this endpoint should be called with an empty messages field. If it is not preceded by store calls with sametransactionWritePointer
, then this call will store themessages
to the MessageTable
directly. - If the call does not contain a
transactionWritePointer
, then the messages are stored non-transactionally (ie, without a tx write ptr).POST [base_url]/topics/[topic]/publish
Request schema can optionally contain
transactionWritePointer
. If store calls were made previously with the sametransactionWritePointer
, the messages array should be empty.Response:
404 NOT FOUND if topic is not present
200 OK if messages are persisted
400 BAD REQUEST if the {{transactionWritePointer}} is null and the messages field is empty. Messages field can be empty if the transactionWritePointer is providedResponse Body:
If the publish is non-transactional, the response body is empty
If the publish is transactional, the response body will contains the information needed for rollback with the following schema
{ "type" : "record", "name" : "PublishedInfo", "fields" : [ { "name" : "transactionWritePointer", "type" : [ "long", "null" ] }, { "name" : "messages", "type" : { "type" : "array", "items" : { "type" : "record", "name" : "MessageInfo", "fields" : [ { "name" : "publishTimestamp", "type" : "long" }, { "name" : "sequenceId", "type" : "int" } ] } } } ] }
- The client shouldn't need to parse the response body, but rather treats it as opaque bytes. On rollback, simply use it as the request body.
Rollback Transactionally published messages
POST [base_url]/topics/[topic]/rollback
Request Schema
{ "transactionWritePointer" : "long", "rollbackKeys" : { "type" : "array", "items" : "bytes" } }
Response:
404 NOT FOUND if topic is not present
200 OK if messages are rolled back
Consume Message
- Request method and URI
POST [base_url]/topics/[topic]/poll
- Request Schema
{ "startFrom" : [ "bytes", "long", "null" ], "inclusive" : "boolean", "limit" : [ "integer", "null" ], "transaction" : [ "bytes" , "null" ] }
- Response Schema (will be in JSON or Avro binary based on the request Content-Type)
{ "type" : "array", "items" : {"type": "record", "name": "Message", "fields": [ { "name" : "id" , "type" : "bytes" }, {"name" : "payload" : "type" : "bytes" }] }}
- Request body
- Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
Schema Fields:
i) startFrom - can be bytes in which case it is considered as messageId, can be long in which case it is considered as timestamp
ii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
iii) limit - max number of responses to return [an hard limit will be set by the TransactionServer which will be a cConf property]
iv) transaction - serialized bytes (TransactionCodec) of the transaction object
- Response
404 NOT FOUND if topic is not present
200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema
Programmatic
Programmatic API will be used by clients to publish/poll messages from the Messaging Service. They are just a wrapper around the REST APIs.
- TopicId extends NamespaceId with a string that represents the topic name.
Creating Topic:
void create(TopicId topic) throws TopicAlreadyExistsException;
Updating Topic properties:
void update(TopicId topic, Map<String, String> properties) throws TopicAlreadyExistsException;
Deleting Topic:
void delete(TopicId topic) throws TopicNotFoundException;
Publishing Message:
For publishing messages non-transactionally:
- void publish(TopicId topic, List<byte[]> messages) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message) throws TopicNotFoundException;
For storing messages in the PayloadTable (requires Transaction)
- void store(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void store(TopicId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For publishing messages without using payload table or to publish messages stored earlier in the PayloadTable:
- void publish(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void publish(TopcId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For rolling back messages that were published transactionally:
- void rollback(TopicId topic, Transaction transaction) throws TopicNotFoundException;
Consuming Message:
For consuming messages non-transactionally:
- List<Message> poll(TopicId topic) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId messageId, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId, messageId, boolean inclusive, int maxMessages) throws TopicNotFoundException;
For consuming messages transactionally (timestamp used for polling will correspond to the publish timestamp):
- List<Message> poll(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
Public Programmatic API
Consuming Message (when executed within a Transactional, then messages are consumed transactionally, but if it is used outside of a Transactional then messages are consumed non-transactionally) :
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, int maxMessages) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages) throws TopicNotFoundException;
class Message {
MessageId id;
byte[] payload;
}
class MessageId {
long publish_timestamp;
short seq_id;
long write_timestamp;
short p_seq_id;
}