...
- To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
- Batch write should be used to give higher throughput and lower latency for concurrent publishing
- Refer to
ConcurrentStreamWriter
on the implementation
- Refer to
- When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persists after all datasets writes), meaning it has to be the last
TransactionAware
to be used in the transaction context.- This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to Apache Tephra
- Multiple instances of messaging service instances (not part of the initial scope)
- When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
- All instances are equal. This mean all instances can handle publishing and consumption from all topics
- Different instances need to generate different
seq_id
for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
- (Pro) Easy to implement as there is no coordination needed
- (Con) The consumer scan cache can take up a lot of space because all instances would be caching for all topics
- Different instances need to generate different
- Perform resource assignment through the
ResourceCoordinator
such that a given topic would only be handled by a sub-set of instances- Still need to handle sequence id generation such as to avoid key collision
- (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
- (Con) Implementation can be complicated as it involves
- Usage of
ResourceCoordinator
Usage of DistributedLock
for the topic transition to have a guaranteed leader set for a topic- Traffic forwarding and retry are needed during topic transition from one instance to another
- Usage of
- All instances are equal. This mean all instances can handle publishing and consumption from all topics
- When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
- Removing entries with invalid transaction in the MessageTable is also needed. Refer to the Tephra implementation on how this should be done.
Security
For authentication, it is pretty much the same as all other intra-CDAP RESTful calls. For authorization, we can do it at the topic level.
Tables used for store:
MessageTable
PayloadTable
MetadataTable
APIs
RESTful
Base URL: /v3/namespaces/<ns-id>
Create Topic
- Request method and URI
PUT [base_url]/topics/[topic]
- Request body
- Can be empty
- If provided, it is a JSON object containing topic properties
e.g. {"ttl" : [ttl-in-seconds]}
...
- Request body
- JSON object containing topic properties. Note that this call will replace all the existing properties.
e.g. {"ttl" : [ttl-in-seconds]}
- JSON object containing topic properties. Note that this call will replace all the existing properties.
- Response
- 200 OK if the topic properties were updated successfully
- 404 NOT FOUND if the topic is not present
- 400 BAD REQUEST if the properties were not correct
...
- Request method and URI
POST [base_url]/topics/[topic]/publish
- Request body
JSON object containing payload and optionally transaction write pointer
{ "payload" : [ payload byte array ], "tx_write_ptr" : 12345L, "buffer" : true }
Fields:
i) payload - Contains the payload in the form of array of bytes
ii) tx_write_ptr (optional) - Contains a long that corresponds to a transaction write pointer. If this field is not provided, then the publishing of message is performed non-transactionally
iii) buffer (optional) - A boolean flag that indicates whether the messages can be buffered on the client side or should it the message be persisted in a Payload table. This option is not used when tx_write_ptr is not set (i.e., if the message is published non-transactionally)
- Response :
404 NOT FOUND if the topic is not present
200 OK if message is persisted - Commit Transactionally published messages
POST [base_url]/topics/[topic]/commit
Request body should contain the transaction write pointer that should be committed
{ "tx_write_ptr" : 12345L }
Response :404 NOT FOUND if topic is not present
200 OK if messages are persisted
- Rollback Transactionally published messages
POST [base_url]/topics/[topic]/rollback
Request body should contain the transaction write pointer that should be rolled back
{ "tx_write_ptr" : 12345L }
Response :
404 NOT FOUND if topic is not present
200 OK if messages are rolled backTBD: When or should the client retry? If it could not be rolled back, the transaction should be invalidated.
...