Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Row Key Format

concat(topic, generation, publish_timestamp, seq_id)

  • topic
    • String representation of the topic name
  • generation
    • 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
  • publish_timestamp
    • 8 bytes time in milliseconds
  • seq_id
    • 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.
    • The 2 bytes gives us 64M messages per second per topic, which is well over the limit we need

...

Row Key Format

concat(topic, generation, transaction_write_pointer, write_timestamp, p_seq_id)

  • topic
    • String representation of the topic name
  • generation
    • 4 bytes int representing the generation of this topic. Generation increase by one when at topic creation time.
  • transaction_write_pointer
    • 8 bytes transaction write pointer of the transaction that the message was published from
  • write_timestamp
    • 8 bytes time in milliseconds of the time when the message was persisted
    • This is for ordering messages published in the same transaction
  • p_seq_id
    • 2 bytes sequence ID. It is for uniquely identifying messages published in the same millisecond.

...

With the row key format described above, each message in a topic is uniquely identified by an a message ID in this format:

...

The publishing client is responsible for telling the messaging system to rollback published messages (by Message IDs). To rollback, simply delete mark the corresponding entries in the Message Table and the Payload Table. Since we encoded enough information in the Message ID, it should be just a scan and delete of the Payload Table, without the need of reading from the Message Table.

Consuming

The general principle of consuming messages as invalid (invert the sign of Transaction write ptr). We don't have to do anything to the entries in the payload table (and let the TTL of the topic delete the entries). This is because we can simply skip reading entries in message table while transactionally reading messages and thus we won't touch the payload table. This gives us couple of benefits - i) rollback will be faster since we don't have to delete entries in payload table ii) non-transactionally consuming messages will give us the repeatable results back as long as the entries have not been removed by the expiration of TTL.

Consuming

The general principle of consuming messages is to scan the Message Table in ascending order. When it encounters an entry with the payload stored in the Payload Table, scan the Payload Table using the transaction write pointer as the prefix.

...

  • To avoid scanning the HBase too much, the messaging service should cache recent scans in memory. This is based on the assumption that consumers shouldn't be falling too far behind, which is one of the reason why we want a messaging system to provide faster reaction time.
  • Batch write should be used to give higher throughput and lower latency for concurrent publishing
    • Refer to ConcurrentStreamWriter on the implementation
  • When doing transactional publishing, it it important to have the messages write to the messaging system as the last "persist" operation (i.e. it persists after all datasets writes), meaning it has to be the last TransactionAware to be used in the transaction context.
    • This help lower the latency of transactional consumers, since the latency is basically the time between message persist and commit to Apache Tephra
  • Multiple instances of messaging service instances (not part of the initial scope)
    • When we reached the point where we need to scale the messaging service to multiple instances, there are couple choices on how it could look like
      1. All instances are equal. This mean all instances can handle publishing and consumption from all topics
        1. Different instances need to generate different seq_id for the same timestamp to avoid key collision. We potentially can use the instance ID as the first byte in the sequence id.
        1. (Pro) Easy to implement as there is no coordination needed
        2. (Con) The consumer scan cache can take up a lot of space because all instances would be caching for all topics
      2. Perform resource assignment through the ResourceCoordinator such that a given topic would only be handled by a sub-set of instances
        1. Still need to handle sequence id generation such as to avoid key collision
        2. (Pro) Most flexible to have a trade between scalablity vs resource usage (memory cache) and can be tuned per topic based on traffic
        3. (Con) Implementation can be complicated as it involves
          1. Usage of ResourceCoordinator
          2. Usage of DistributedLock for the topic transition to have a guaranteed leader set for a topic
          3. Traffic forwarding and retry are needed during topic transition from one instance to another
  • Removing entries with invalid transaction in the MessageTable is also needed. Refer to the Tephra implementation on how this should be done.

Security

For authentication, it is pretty much the same as all other intra-CDAP RESTful calls. For authorization, we can do it at the topic level.

Tables used for store

...

  • MessageTable
  • PayloadTable
  • MetadataTable 

APIs

RESTful

Base URL: /

...

v1/namespaces/<ns-id> (since REST API is internal)

Create Topic

  • Request method and URI
    • PUT [base_url]/topics/[topic]
  • Request body
    • Can be empty
    • If provided, it is a JSON object containing topic properties
      • e.g. {"ttl" : [ttl-in-seconds]}
  • Response
    • 200 OK if the topic was created successfully
    • 409 CONFLICT if a topic of the same name exists already
    • 400 BAD REQUEST if the given TTL is invalid

Update Topic

  • Request method and URI
    • PUT [base_url]/topics/[topic]/properties
  • Request body
    • JSON object containing topic properties. Note that this call will replace all the existing properties.
      • e.g. {"ttl" : [ttl-in-seconds]}
  • Response
    • 200 OK if the topic properties were updated successfully
    • 404 NOT FOUND if the topic is not present
    • 400 BAD REQUEST if the properties were not correct

...

Get Topic Properties

  • Request method and URI
    • DELETE GET [base_url]/topics/[topic]
  • Response
    • 200 OK if the topic was deleted successfully
    • 404 NOT FOUND if the topic is not present

...

  • Request Schema
    • { "tx_write_ptr" : [ "string", "null" ], "messages" : { "type" : "array", "items" : "bytes" }}
       

  • Request body
    • Can be avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:
      i) messages - Contains an array of byte arrays that correspond to messages
      ii) tx_write_ptr - Corresponds to a transaction write pointer.
       

Store Messages to Payload Table 

  • POST Request body
    • JSON object containing the topic name and its properties.
      • e.g. {"name" : "topic1", "properties" : { "ttl" : "123231" } }
  • Response
    • 200 OK
    • 404 NOT FOUND if the topic is not present

List Topics

  • Request method and URI
    • GET [base_url]/topics
  • Request body
    • JSON object containing a list of topic name under the namespace
      • e.g. [ "topic1", "topic2" ]
  • Response
    • 200 OK

Delete Topic

  • Request method and URI
    • DELETE [base_url]/topics/[topic]
    /store
  • Request body should contain tx_write_ptr and messages
     
  • Response:
  • Response
    • 200 OK if the topic was deleted successfully
    • 404 NOT FOUND if the topic is not present
    200 OK if message is persisted

Commit Messages to Message Table 

...

  • POST [base_url]/topics/[topic]/publish
     
  • Request schema can optionally contain tx_write_ptr and can optionally contain messages. If store calls were made previously with the same tx_write_ptr, the messages array should be empty.

    Response:
    404 NOT FOUND if topic is not present
    200 OK if messages are persisted

     

Rollback Transactionally published messages 

    • POST [base_url]/topics/[topic]/rollback
    • Request schema should contain tx_write_ptr and the messages field is ignored

      Response:
      404 NOT FOUND if topic is not present
      200 OK if messages are rolled back
       

Consume Message

  • Request Schema
    • { "messageId" : [ "bytes", "null" ], "timestamp" : [ "string", "null" ], "inclusive" : [ "boolean", "null" ], "limit" : [ "integer", "null" ], "transaction" : [ "bytes" , "null" ] }
       

  • Response Schema (will be in JSON or Avro based on the request Content-Type)
    • { "response" : { "type" : "array", "items" : "record" }  }  - each record contains two fields - { "messageId" , "bytes", "message" : "bytes" }

       

  • Request body
    • Can be avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:
      i) messageId - Contains bytes that correspond to the messageId
      ii) timestamp - Timestamp in ms as a string
      iii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
      iv) limit - max number of responses to return
      v) transaction -  serialized bytes of the transaction object
       

...

Publish Message

There are two separate endpoints to support different publishing cases as described above. They both share the same request format.

  • Request Body Schema

    Code Block
    languagejs
    {
      "type" : "record", 
      "name" : "PublishRequest",
      "fields" : [
        { "name" : "transactionWritePointer", "type" : [ "long", "null" ] },
        { "name" : "messages", "type" : { "type" : "array", "items" : "bytes" } }
      ]
    }
  • Request body
    • Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:

      1. messages - Contains an array of byte arrays that correspond to messages

      2. transactionWritePointer - Corresponds to a transaction write pointer.

Store Messages to Payload Table 
  • Request method and URI

    • POST [base_url]/topics/[topic]/store
  • Request body should contain transactionWritePointer and messages

  • Response:
    404 NOT FOUND if the topic is not present
    200 OK if message is persisted
Publish Messages to Message Table 
  • Request method and URI
    • POST [base_url]/topics/[topic]/
      poll
      publish
       
  • Request body
    • Fields (all the fields are optional):

      i) messageId: Provide a messageId as an offset into the topic message queue
      ii) timestamp: Provide a timestamp as an offset into the topic message queue. Note that either messageId or timestamp needs to be provided. If both are provided, then messageId will be used.
      iii) inclusive: This boolean flag indicates whether the offset is inclusive or exclusive
      iv) limit: Max number of messages to return, by default it is set to 100.
      v) transaction: If message polling needs to be done transactionally, then this  

  • Response :
    404 NOT FOUND if topic is not present
    200 OK
    • Response body contains an avro binary or JSON object based on the request Content-Type with the schema as mentioned above

Programmatic

Programmatic API will be used by clients to publish/poll messages from the Messaging Service. They are just a wrapper around the REST APIs.

  • TopicId extends NamespaceId with a string that represents the topic name.

...

Publishing Message:

For publishing messages non-transactionally:

  • void publish(TopicId topic, byte[] payload) throws TopicNotFoundException;

For publishing messages transactionally by providing the Transaction object and as well as a boolean variable that indicates whether the messages should be persisted in the payloadTable before the messages are 

  • void publish(TopicId topic, byte[] payload, Transaction transaction, boolean buffer) throws TopicNotFoundException;  

For committing messages published transactionally:

  • void commit(TopicId topic, Transaction transaction) throws TopicNotFoundException;

For rolling back messages that were published transactionally:

  • void rollback(TopicId topic, Transaction transaction) throws TopicNotFoundException;

 

...

  • List<Message> poll(TopicId topic) throws TopicNotFoundException; 
    List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive) throws TopicNotFoundException;
    List<Message> poll(TopicId topic, MessageId messageId, boolean inclusive) throws TopicNotFoundException;
    List<Message> poll(TopicId topic, long timestampInMs, boolean inclusive, long maxMessages) throws TopicNotFoundException;
    List<Message> poll(TopicId topic, MessageId, messageId, boolean inclusive, long maxMessages) throws TopicNotFoundException;
    class Message {
        MessageId messageId;
        byte[] payload;
    }
    class MessageId {
      long publish_timestamp;
      short seq_id;
      long write_timestamp;
      short p_seq_id;
    }Request
    • Request can optionally contain transactionWritePointer. If store calls were made previously with the same transactionWritePointer, the messages array should be empty.

    • This call can be preceded by multiple calls to the 'store' endpoint. If it is preceded by calls to store with same transactionWritePointer, then this endpoint should be called with an empty messages field. If it is not preceded by store calls with same transactionWritePointer, then this call will store the messages to the MessageTable
      directly.
    • If the call does not contain a transactionWritePointer, then the messages are stored non-transactionally (ie, without a tx write ptr).
  • Response

    404 NOT FOUND if topic is not present
    200 OK if messages are persisted
    400 BAD REQUEST if the {{transactionWritePointer}} is null and the messages field is empty. Messages field can be empty if the transactionWritePointer is provided

    Response Body:

    • If the publish is non-transactional, the response body is empty

    • If the publish is transactional, the response body will contains the information needed for rollback with the following schema

      Code Block
      languagejs
      {
        "type" : "record",
        "name" : "PublishResponse",
        "fields" : [
          { "name" : "transactionWritePointer", "type" : [ "long", "null" ] },
          { "name" : "startTimestamp", "type" : "long" },
      
          { "name" : "startSequenceId", "type" : "int" },
          { "name" : "endTimestamp", "type" : "long" },
          { "name" : "endSequenceId", "type" : "int" }
        ]
      }
    • The client shouldn't need to parse the response body, but rather treats it as opaque bytes. On rollback, simply use it as the request body.

Rollback Transactionally published messages 

  • Request method and URI

    • POST [base_url]/topics/[topic]/rollback
  • Request Body

    • Use the response body as is from the publish call above
  • Response:

    404 NOT FOUND if topic is not present
    200 OK if messages are rolled back

Consume Message

  • Request method and URI
    • POST [base_url]/topics/[topic]/poll
  • Request Schema

    Code Block
    languagejs
    {
      "type" : "record",
      "name" : "ConsumeRequest",
      "fields" : [
        { "name" : "startFrom", "type" : [ "bytes", "long", "null" ] },
        { "name" : "inclusive", "type" : "boolean", "default" : true },
        { "name" : "limit", "type" : [ "int", "null" ] },
        { "name" : "transaction", "type" : [ "bytes", "null" ] }
      ]
    }
  • Response Schema (will be in JSON or Avro binary based on the request Content-Type)

    Code Block
    languagejs
    {
      "type" : "array", 
      "items" : {
        "type" : "record", 
        "name": "Message", 
        "fields": [ 
          { "name" : "id", "type" : "bytes" }, 
          { "name" : "payload", "type" : "bytes" }
        ] 
      }
    }
  • Request body
    • Can be Avro binary or JSON, based on the request - Content-Type: application/json or avro/binary
    • Schema Fields:
      i) startFrom - can be bytes in which case it is considered as messageId, can be long in which case it is considered as timestamp
      ii) inclusive - boolean field that says whether the messageId/timestamp should be inclusive
      iii) limit - max number of responses to return [an hard limit will be set by the TransactionServer which will be a cConf property]
      iv) transaction -  serialized bytes (TransactionCodec) of the transaction object

  • Response
    404 NOT FOUND if topic is not present
    200 OK Response body contains an Avro binary or JSON object based on the request Content-Type with the response schema

Data Cleanup In Tables

Various cleanup of data needs to be performed. And the clean up strategy varies based on the underlying storage - LevelDB vs HBase. The matrix below describes the combinations and types of cleanup we need to do and how it is going to be performed.

 HBaseLevelDB
Cleanup StrategyCoprocessors attached to tables (one for messageTable and another for payloadTable). No cleanup is required for MetadataTable.Periodically scheduled thread that cleans up data in MessageTable and PayloadTable. Frequency of schedule is configurable via cdap-site.xml.
TTL Expiration (MessageTable and Payload Table)Get TableId (namespace, topic) from rowKey and get the TopicMetadata from the MetadataTable (cached). From the cell rowkey, get the timestamp of write and use the TTL and determine if that cell needs to be skipped.Scan the tables, topic wise and remove rows that have exceeded TTL based on TopicMetadata info.
Older Generation (MessageTable and PayloadTable)Check the generation id of the row and compare it with the one we get from MetadataTable. If it is the current generation, then do nothing. If it is an older generation (gen < abs(currentgen) || gen == -1*currentgen), then skip the cell.Same logic as in HBase. While pruningMessages, scan the should start with generation '1' of that topic.
Invalid Transactions (MessageTable only)This requires (periodically refreshed) tx.snapshot in the Coprocessor. If the cell belongs to a TX_COL column, then get the tx id from it. If the transaction id is present in the invalid list (from the tx.snapshot), then invert the sign (-1 * tx_id) and put back the value. This way, the data is still visible to non-tx consumption and will be eventually cleared by TTL or when the topic is deleted.Not necessary in LevelDB since we don't support pruning invalid transactions in SDK!
Latest min tx timestamp (for manual invalid TX pruning, required only for MessageTable)The last used tx.snapshot info can be directly used to prune the invalid transaction list! So we need to log that info. Also need to write a TMS table debugger tool that can print this info as well.