...
- Response
404 NOT FOUND if topic is not present
200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema
Programmatic
Programmatic API will be used by clients to publish/poll messages from the Messaging Service. They are just a wrapper around the REST APIs.
- TopicId extends NamespaceId with a string that represents the topic name.
Creating Topic:
void create(TopicId topic) throws TopicAlreadyExistsException;
Updating Topic properties:
void update(TopicId topic, Map<String, String> properties) throws TopicAlreadyExistsException;
Deleting Topic:
void delete(TopicId topic) throws TopicNotFoundException;
Publishing Message:
For publishing messages non-transactionally:
- void publish(TopicId topic, List<byte[]> messages) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message) throws TopicNotFoundException;
For storing messages in the PayloadTable (requires Transaction)
- void store(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void store(TopicId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For publishing messages without using payload table or to publish messages stored earlier in the PayloadTable:
- void publish(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- void publish(TopicId topic, byte[] message, Transaction transaction) throws TopicNotFoundException;
- void publish(TopcId topic, List<byte[]> messages, Transaction transaction) throws TopicNotFoundException;
For rolling back messages that were published transactionally:
- void rollback(TopicId topic, Transaction transaction) throws TopicNotFoundException;
Consuming Message:
For consuming messages non-transactionally:
- List<Message> poll(TopicId topic) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId messageId, boolean inclusive) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId, messageId, boolean inclusive, int maxMessages) throws TopicNotFoundException;
For consuming messages transactionally (timestamp used for polling will correspond to the publish timestamp):
- List<Message> poll(TopicId topic, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, MessageId id, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
- List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, int maxMessages, Transaction transaction) throws TopicNotFoundException;
Public Programmatic API
Consuming Message (when executed within a Transactional, then messages are consumed transactionally, but if it is used outside of a Transactional then messages are consumed non-transactionally) :
...
Data Cleanup In Tables
Various cleanup of data needs to be performed. And the clean up strategy varies based on the underlying storage - LevelDB vs HBase. The matrix below describes the combinations and types of cleanup we need to do and how it is going to be performed.
HBase | LevelDB | |
---|---|---|
Cleanup Strategy | Coprocessors attached to tables (one for messageTable and another for payloadTable). No cleanup is required for MetadataTable. | Periodically scheduled thread that cleans up data in MessageTable and PayloadTable. Frequency of schedule is configurable via cdap-site.xml. |
TTL Expiration (MessageTable and Payload Table) | Get TableId (namespace, topic) from rowKey and get the TopicMetadata from the MetadataTable (cached). From the cell rowkey, get the timestamp of write and use the TTL and determine if that cell needs to be skipped. | Scan the tables, topic wise and remove rows that have exceeded TTL based on TopicMetadata info. |
Older Generation (MessageTable and PayloadTable) | Check the generation id of the row and compare it with the one we get from MetadataTable. If it is the current generation, then do nothing. If it is an older generation (gen < abs(currentgen) || gen == -1*currentgen), then skip the cell. | Similar logic like in HBase. |
Invalid Transactions (MessageTable only) | This requires (periodically refreshed) tx.snapshot in the Coprocessor. If the cell belongs to a TX_COL column, then get the tx id from it. If the transaction id is present in the invalid list (from the tx.snapshot), then invert the sign (-1 * tx_id) and put back the value. This way, the data is still visible to non-tx consumption and will be eventually cleared by TTL or when the topic is deleted. | Similar logic like in HBase. |
Latest min tx timestamp (for manual invalid TX pruning, required only for MessageTable) | The last used tx.snapshot info can be directly used to prune the invalid transaction list! So we need to log that info. Also need to write a TMS table debugger tool that can print this info as well. |