...
Row Key Format
concat(topic, generation, publish_timestamp, seq_id)
topic
- String representation of the topic name
generation
- 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
publish_timestamp
- 8 bytes time in milliseconds
seq_id
- 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
- The 2 bytes gives us 64M messages per second per topic, which is well over the limit we need
...
Row Key Format
concat(topic, generation, transaction_write_pointer, write_timestamp, p_seq_id)
topic
- String representation of the topic name
generation
- 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
transaction_write_pointer
- 8 bytes transaction write pointer of the transaction that the message was published from
write_timestamp
- 8 bytes time in milliseconds of the time when the message was persisted
- This is for ordering messages published in the same transaction
p_seq_id
- 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
...
With the row key format described above, each message in a topic is uniquely identified by an a message ID in this format:
...
The publishing client is responsible for telling the messaging system to rollback published messages (by Message IDs). To rollback, simply delete mark the corresponding entries in the Message Table and the Payload Table. Since we encoded enough information in the Message ID, it should be just a scan and delete of the Payload Table, without the need of reading from the Message Table.
Consuming
The general principle of consuming messages is to scan the as invalid (invert the sign of Transaction write ptr). We don't have to do anything to the entries in the payload table (and let the TTL of the topic delete the entries). This is because we can simply skip reading entries in message table while transactionally reading messages and thus we won't touch the payload table. This gives us couple of benefits - i) rollback will be faster since we don't have to delete entries in payload table ii) non-transactionally consuming messages will give us the repeatable results back as long as the entries have not been removed by the expiration of TTL.
Consuming
The general principle of consuming messages is to scan the Message Table in ascending order. When it encounters an entry with the payload stored in the Payload Table, scan the Payload Table using the transaction write pointer as the prefix.
...
- Request body
- JSON object containing a list of topic name and its properties.under the namespace
e.g. [ {"name" : "topic1", "properties" : { "ttl" : "123231" } }, {"name" : "topic2", "properties" : { "ttl" : "1211" } } ]topic2" ]
- JSON object containing a list of topic name and its properties.under the namespace
- Response
- 200 OK
Delete Topic
...
There are two separate endpoints to support different publishing cases as described above. They both share the same request format.
Request Body Schema
Code Block language js { "type" : "record", "name" : "PublishRequest", "fields" : [ { "name" : "transactionWritePointer", "type" : [ "long", "null" ] }, { "name" : "messages", ""type" : { "type" : "array", "items" : "bytes" } } ] }
- Request body
- Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
Schema Fields:
messages
- Contains an array of byte arrays that correspond to messagestransactionWritePointer
- Corresponds to a transaction write pointer.
Store Messages to Payload Table
Request method and URI
POST [base_url]/topics/[topic]/store
Request body should contain
transactionWritePointer and
andmessages
- Response:
404 NOT FOUND if the topic is not present
200 OK if message is persisted
Publish Messages to Message Table
- Request method and URI
POST [base_url]/topics/[topic]/publish
- Request
Request can optionally contain
transactionWritePointer
. If store calls were made previously with the sametransactionWritePointer
, the messages array should be empty.- This call can be preceded by multiple calls to the 'store' endpoint. If it is preceded by calls to store with
- same
transactionWritePointer
, then this endpoint should be called with an empty messages field. If it is not preceded by store calls with
- same
transactionWritePointer
, then this call will store
- the
messages
to the MessageTable
directly. - If the call does not contain
- a
transactionWritePointer
, then the messages are stored non-transactionally (ie, without a tx write ptr). POST [base_url]/topics/[topic]/publish
Request schema can optionally contain
Response:transactionWritePointer
. If store calls were made previously with the sametransactionWritePointer
, the messages array should be empty. Response
404 NOT FOUND if topic is not present
200 OK if messages are persisted
400 BAD REQUEST if the {{transactionWritePointer}} is null and the messages field is empty. Messages field can be empty if the transactionWritePointer is providedResponse Body:
If the publish is non-transactional, the response body is empty
If the publish is transactional, the response body will contains
a set of rollback keysThe set of rollback keys arethe information needed for
the rollback call described belowSchema
{rollback with the following schema
array", "items" : "bytes" }Code Block language js { "type" : "
Rollback Transactionally published messages
POST [base_url]/topics/[topic]/rollback
Request Schema
404- { "transactionWritePointer
longrecord", "name" : "
rollbackKeysPublishResponse", "
typefields" : [ { "
arrayname" : "
itemstransactionWritePointer", "
"bytestype" :
}[ "long", "null"
] }
Response:
,
200 OK if messages are rolled back- { "transactionWritePointer
Consume Message
- Request method and URI
POST [base_url]/topics/[topic]/poll
- Request Schema
- { "startFrom" : [ "bytes
long{ "name" : "startTimestamp", "
,type"
null: "
]long"
inclusive}, { "
booleanname" : "
limitstartSequenceId", "
[type" :
integer"
,int"
"null" ], "transaction" : [ "bytes" , "null" ] }
- { "type
array}, { "name" : "
itemsendTimestamp", "
{type" :
type"
:long"
"record"
"name":},
"Message",
"fields":
[
id{ "name" : "
endSequenceId"
bytes, "type" : "
,int" }
{"name"
: "payload" : "type" : "bytes" }] }}
- { "startFrom" : [ "bytes
- Request body
- Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
Schema Fields:
i) startFrom - can be bytes in which case it is considered as messageId, can be long in which case it is considered as timestamp
ii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
iii) limit - max number of responses to return [an hard limit will be set by the TransactionServer which will be a cConf property]
iv) transaction - serialized bytes (TransactionCodec) of the transaction object
- Response
404 NOT FOUND if topic is not present
200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema
Programmatic
Programmatic API will be used by clients to publish/poll messages from the Messaging Service. They are just a wrapper around the REST APIs.
- TopicId extends NamespaceId with a string that represents the topic name.
Creating Topic:
void create(TopicId topic) throws TopicAlreadyExistsException;
Updating Topic properties:
void update(TopicId topic, Map<String, String> properties) throws TopicAlreadyExistsException;
Deleting Topic:
void delete(TopicId topic) throws TopicNotFoundException;
Publishing Message:
For publishing messages non-transactionally:
- void publish(TopicId topic, List<byte[]> messages) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message) throws TopicNotFoundException;
For storing messages in the PayloadTable (requires Transaction)
- void store(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void store(TopicId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For publishing messages without using payload table or to publish messages stored earlier in the PayloadTable:
- void publish(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void publish(TopcId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For rolling back messages that were published transactionally:
- void rollback(TopicId topic, Transaction transaction) throws TopicNotFoundException;
Consuming Message:
For consuming messages non-transactionally:
- List<Message> poll(TopicId topic) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId messageId, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId, messageId, boolean inclusive, int maxMessages) throws TopicNotFoundException;
For consuming messages transactionally (timestamp used for polling will correspond to the publish timestamp):
- List<Message> poll(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
Public Programmatic API
Consuming Message (when executed within a Transactional, then messages are consumed transactionally, but if it is used outside of a Transactional then messages are consumed non-transactionally) :
...
] }
- The client shouldn't need to parse the response body, but rather treats it as opaque bytes. On rollback, simply use it as the request body.
Rollback Transactionally published messages
Request method and URI
POST [base_url]/topics/[topic]/rollback
Request Body
- Use the response body as is from the publish call above
Response:
404 NOT FOUND if topic is not present
200 OK if messages are rolled back
Consume Message
- Request method and URI
POST [base_url]/topics/[topic]/poll
Request Schema
Code Block language js { "type" : "record", "name" : "ConsumeRequest", "fields" : [ { "name" : "startFrom", "type" : [ "bytes", "long", "null" ] }, { "name" : "inclusive", "type" : "boolean", "default" : true }, { "name" : "limit", "type" : [ "int", "null" ] }, { "name" : "transaction", "type" : [ "bytes", "null" ] } ] }
Response Schema (will be in JSON or Avro binary based on the request Content-Type)
Code Block language js { "type" : "array", "items" : { "type" : "record", "name": "Message", "fields": [ { "name" : "id", "type" : "bytes" }, { "name" : "payload", "type" : "bytes" } ] } }
- Request body
- Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
Schema Fields:
i) startFrom - can be bytes in which case it is considered as messageId, can be long in which case it is considered as timestamp
ii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
iii) limit - max number of responses to return [an hard limit will be set by the TransactionServer which will be a cConf property]
iv) transaction - serialized bytes (TransactionCodec) of the transaction object
- Response
404 NOT FOUND if topic is not present
200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema
Data Cleanup In Tables
Various cleanup of data needs to be performed. And the clean up strategy varies based on the underlying storage - LevelDB vs HBase. The matrix below describes the combinations and types of cleanup we need to do and how it is going to be performed.
HBase | LevelDB | |
---|---|---|
Cleanup Strategy | Coprocessors attached to tables (one for messageTable and another for payloadTable). No cleanup is required for MetadataTable. | Periodically scheduled thread that cleans up data in MessageTable and PayloadTable. Frequency of schedule is configurable via cdap-site.xml. |
TTL Expiration (MessageTable and Payload Table) | Get TableId (namespace, topic) from rowKey and get the TopicMetadata from the MetadataTable (cached). From the cell rowkey, get the timestamp of write and use the TTL and determine if that cell needs to be skipped. | Scan the tables, topic wise and remove rows that have exceeded TTL based on TopicMetadata info. |
Older Generation (MessageTable and PayloadTable) | Check the generation id of the row and compare it with the one we get from MetadataTable. If it is the current generation, then do nothing. If it is an older generation (gen < abs(currentgen) || gen == -1*currentgen), then skip the cell. | Same logic as in HBase. While pruningMessages, scan the should start with generation '1' of that topic. |
Invalid Transactions (MessageTable only) | This requires (periodically refreshed) tx.snapshot in the Coprocessor. If the cell belongs to a TX_COL column, then get the tx id from it. If the transaction id is present in the invalid list (from the tx.snapshot), then invert the sign (-1 * tx_id) and put back the value. This way, the data is still visible to non-tx consumption and will be eventually cleared by TTL or when the topic is deleted. | Not necessary in LevelDB since we don't support pruning invalid transactions in SDK! |
Latest min tx timestamp (for manual invalid TX pruning, required only for MessageTable) | The last used tx.snapshot info can be directly used to prune the invalid transaction list! So we need to log that info. Also need to write a TMS table debugger tool that can print this info as well. |