Retrieving Group Chat Messages
November 6, 2024
Introduction
Retrieving chat messages may initially appear straightforward, but there are unique considerations for chat applications that require a tailored approach. Chat applications often feature real-time interactions, with continuous updates that demand efficient, high-throughput data handling. In these applications, read and write operations are nearly balanced, and most reads involve recent data, as older messages are rarely accessed. Given these requirements, using a NoSQL database, such as Cassandra, is beneficial. This solution is implemented using Cassandra for data storage and Spring Boot with Kotlin for backend development.
Data Modeling in Cassandra
Data modeling in Cassandra differs significantly from traditional relational database design. Rather than normalizing data and managing relationships, Cassandra prioritizes optimizing tables for specific queries. The process starts by identifying the application’s query patterns, with each table structured to handle these efficiently, avoiding joins and complex operations. A key part of this approach is carefully selecting partition keys and clustering keys. These are structured to support fast, efficient data retrieval for each query, ensuring high performance and scalability.
Partition Key
The partition key is the primary element used to distribute data across Cassandra’s nodes. It determines which node will store a particular row, making it essential for spreading data evenly across the cluster to ensure scalability. When a write request is made, Cassandra applies a hashing function to the partition key, generating a hash value that determines the partition's location on a specific node. This hashing mechanism ensures that the data is distributed uniformly across the cluster, preventing any one node from becoming a bottleneck due to an uneven distribution of data. The partition key’s design is critical to maintaining an even data distribution. If a partition key is too granular or too broad, it can result in hotspots, where certain nodes are overloaded with data while others remain underutilized. This can lead to performance issues and affect the overall scalability of the system. Properly selecting a partition key helps ensure that data is spread evenly across all nodes, allowing Cassandra to scale horizontally and maintain high availability and performance even as the dataset grows. In addition, the partition key enables efficient, direct access to the data associated with a particular key, minimizing the need for joins or additional lookups. Since all rows with the same partition key are stored together on the same node, Cassandra can quickly access the relevant partition without needing to search multiple nodes, further speeding up query performance.
Clustering Key
The clustering key determines the order of rows within each partition, enabling ordered retrieval of data. This is particularly useful in applications like chat systems where message ordering within a conversation is essential.
Challenges in the Chat Message Use Case
Two main challenges arise in retrieving messages for group chats: 1. Data Distribution and Hot Partitions: Partitioning by conversation ID can lead to hot partitions, especially if a highly active group sends a large volume of messages. This uneven data distribution strains certain nodes, impacting performance. 2. User Access Control: In group chats, users may leave and rejoin, creating intervals of participation. Messages sent during periods when a user was absent should not be visible to them upon rejoining. The data model must account for these intervals to enforce accurate message visibility.
Table Design
                    When the application is opened, the initial query fetches all conversations a user is part of by
                    querying the conversations_by_user table. This table serves as an entry point for
                    loading a user's conversation history efficiently. A "conversation" is a general term that
                    encompasses two distinct types of chats: user chats and group chats. A user chat represents a 1-on-1
                    interaction between two participants, offering direct and private communication. In contrast, a
                    group chat involves multiple participants and supports a wider range of interactions, often
                    including unique characteristics like roles, permissions, and shared content. Here’s how the table
                    is structured:
                
 
                
                    The partition key is the user’s unique identifier, which ensures that all
                    conversations for a specific user are stored together. This structure allows for fast access to all
                    of a user’s conversations.
                    The table also includes essential details like the conversation type and
                    last message in this particular chat, giving the user a quick overview. For group chats, additional
                    fields like joined_at and left_at (stored as a set of timestamps) capture
                    the user’s
                    participation history.
                    
                    Using a set instead of a list for these timestamps is critical to ensuring
                    consistency and performance.
                    In Cassandra, lists have inherent limitations and performance considerations. Operations like
                    appending or prepending are not idempotent, meaning retries due to timeouts could lead to duplicate
                    entries — an unacceptable risk in scenarios where data consistency is essential. Additionally, list
                    operations such as setting or removing an element by position incur an internal read-before-write
                    operation, which is resource-intensive and slows down performance. In contrast, sets avoid these
                    pitfalls, offering better safety and efficiency for scenarios requiring consistent and unique data,
                    such as maintaining accurate participation intervals for users. This ensures reliable and performant
                    queries for an overview of all conversations.
                    
                    The initial query provides the data needed to display an overview of all conversations, including
                    both individual and group chats, without any further lookups.
                
 
                
                    When a user selects a specific group chat, the app loads the conversation’s messages from the
                    messages_by_group_chat table. The partition key is in this case a composite partition
                    key with the
                    columns conversation_id and bucket. The conversation_id as part of the
                    partition key ensures all
                    messages for a given group chat are stored together, while the bucket column applies a
                    concept known as bucketing.
                
TimeUUID
                    We use TIMEUUID for the message_id because it is specifically designed to encode both
                    a timestamp
                    and a unique identifier in one value, making it ideal for applications that need chronological
                    ordering and uniqueness. TIMEUUID is a version 1 UUID, which means it contains a timestamp (measured
                    in 100-nanosecond intervals since October 15, 1582) as well as a node identifier (usually derived
                    from the machine’s MAC address or a random value). This design allows us to easily sort records by
                    time, ensuring that messages are ordered correctly, with the most recent appearing first.
                    The key advantage of using TIMEUUID is its ability to guarantee
                    chronological order and therefore allows to retrieve the
                    latest messages first. Additionally, even if multiple messages are created at the same time, the
                    node identifier ensures that each TIMEUUID is still unique, solving the issue of potential
                    collisions.
                
Bucketing
                    Bucketing is a technique to control the size of each partition by grouping data within a time range
                    or specific count threshold. In Cassandra, larger partitions can degrade performance, particularly
                    with high-volume applications, because Cassandra reads are more resource-intensive than writes. By
                    bucketing messages, we prevent partitions from growing indefinitely, reducing the risk of “hot
                    partitions” – partitions that receive disproportionate traffic and cause latency spikes.
                    In this case, the bucket value is calculated to group messages within three-month intervals.
                    Specifically, the bucket is determined by the formula (year * 12 + month) / 3, which
                    divides the
                    calendar into four buckets per year. This means that each bucket collects messages for one quarter,
                    creating a natural boundary where, after three months, a new bucket is started.
                    By using three-month buckets, this approach limits the number of messages stored in any one
                    partition, which helps avoid performance issues like high read latencies and hot partitions.
                    Spreading messages across buckets like this keeps partition sizes manageable, while also maintaining
                    temporal ordering, making it efficient for both storage and retrieval. This partitioning strategy
                    ensures that as new messages accumulate, the system directs them to the appropriate bucket, thereby
                    balancing load across the Cassandra nodes and preserving performance over time.
                
Breakdown of Message Retrieval
This section provides a detailed, step-by-step explanation of the requirements and scenarios that the message retrieval algorithm must address to handle various edge cases effectively. The algorithm and corresponding code are progressively enhanced to tackle increasingly complex scenarios.
Simple Retrieval
The initial scenario focuses on retrieving the latest 20 messages from a chat without implementing pagination or considering user presence intervals. This involves querying only the most recent bucket and limiting the results to 20 messages. The CQL query is structured as follows:
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket LIMIT :limit
The corresponding Kotlin implementation for this query is presented below:
val currentBucket = getBucketValue(LocalDateTime.now()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, limit = 20, ) ... private fun getBucketValue(dateTime: LocalDateTime): Int { return (dateTime.year * 12 + dateTime.monthValue) / 3 }
Pagination
                    In some cases, a user may want to fetch messages starting from a specific message, rather than just
                    retrieving the most recent ones. This typically occurs during subsequent requests after the initial
                    call from the client. For example, when a user scrolls up within the chat to view older messages,
                    they will reach the top of the first 20 messages and need to load the next batch. This can be
                    achieved by providing the last known (oldest) messageId.
                    Traditional offset pagination, where rows are scanned sequentially to find the starting point and
                    select a specific number of rows, is not suitable for a decentralized database like Cassandra. In
                    offset pagination, the system would have to sequentially read rows from different nodes, making it
                    computationally expensive and inefficient. As a result, cursor-based pagination is the preferred
                    approach for Cassandra.
                    In this case, the client provides a specific messageId to retrieve older messages. The
                    query then
                    starts from the bucket corresponding to the timestamp in the given messageId. If no
                    messageId is
                    provided, the query defaults to the most recent bucket. The updated CQL query to fetch messages
                    older than the provided messageId is as follows:
                
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket AND message_id < :messageId LIMIT :limit
                    To execute this query, the corresponding bucket for the given messageId must be
                    determined. This can
                    be done by extracting the timestamp from the TimeUUID and calculating the bucket value using this
                    timestamp. If no messageId is provided by the user, a synthetic TimeUUID can be
                    generated by
                    defining an upper bound time using the helper method Uuids.endOf.
                
val currentTimestamp = LocalDateTime.now() var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(currentTimestamp) var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, limit = 20, ) ... private fun getBucketValueFromMessageId(messageId: MessageId): Int { val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId)) val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) return getBucketValue(localDateTime) }
Handling User Presence Interval
After addressing pagination, the next requirement is to ensure that users can only read messages within the period when they were part of the group chat. In some cases, the user may no longer be a part of the chat but still wish to read messages from when they were a member. Additionally, users who joined the chat at a later point should not be allowed to read messages that were sent before their entry. To enforce this constraint, the function must retrieve the relevant presence interval for the user and validate access accordingly. This step requires updating the CQL query to ensure that it respects these boundaries. Specifically, the query needs to include a lower bound to filter messages based on the user's presence interval within the chat. The updated CQL query to handle this requirement is as follows:
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket AND message_id < :messageId AND message_id >= minTimeuuid(:firstMessageTimestamp) LIMIT :limit
                    To determine the time bounds for messages the user is allowed to view, the userId
                    (e.g., extracted from a JWT token) and the corresponding groupChatId are used to
                    retrieve the chat conversation. In this
                    scenario, the database call is assumed to return a single timestamp for when the user joined the
                    chat and another for when they left, rather than a set of multiple values. If the
                    leftChatAt timestamp is null - indicating that the user is still part of the chat - the
                    current timestamp is used as the endpoint.
                    Using the timestamp from when the user first joined the chat, the minimum possible TimeUUID for the
                    messageId that the user can access is constructed. This is then passed into the CQL
                    query.
                
val conversation = conversationRepo.findByUserIdAndChatId(userId, groupChatId) ?: return FindGroupMessagesResult.ConversationDoesNotExist if (conversation.type != GROUP) { return FindGroupMessagesResult.NotGroupChat } val currentTimestamp = LocalDateTime.now() val leftAtTimestamp = conversation.leftAt ?: currentTimestamp val joinedAtTimestamp = conversation.joinedAt val currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(leftAtTimestamp) var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, firstMessageTimestamp = joinedAtTimestamp, limit = 20, )
Handling Multiple Intervals
                    In cases where a user has joined and left the chat multiple times, it is necessary to handle
                    multiple presence intervals within a single bucket. This requires ensuring that messages are only
                    retrieved for the periods during which the user was part of the chat. To achieve this, the
                    joined_at and left_at timestamps must be retrieved and sorted in
                    descending order,
                    prioritizing the most recent intervals.
                    The function must iterate over these intervals to reach the desired page size of 20, as the first
                    interval may contain no messages or not enough messages to fill the page. This iteration process
                    ensures that all relevant messages are considered for the final result.
                    The Kotlin code snippet below demonstrates how to sort the joined_at and
                    left_at timestamps
                    and prepare for the subsequent message retrieval:
                
val joinedAtList = conversation.joinedAt.toList().sortedDescending() val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf() if (joinedAtList.size == leftAtList.size + 1) { leftAtList.add(LocalDateTime.now()) } leftAtList.sortDescending()
                    With the intervals sorted, the function can now iterate over each interval, using the
                    joined_at and left_at timestamps to retrieve the messages within the
                    corresponding time frames. The query ensures
                    that the message retrieval respects these bounds.
                
var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli()); val messageList = mutableListOf<MessageByGroupChat>() for (i in joinedAtList.indicies) { if (lastMessageId != messageId && i != 0) { lastMessageId = Uuids.endOf(leftAtList[i].atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) } val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, firstMessageTimestamp = joinedAtList[i], limit = 20, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } } return FindGroupMessagesResult.Success(messageList)
                    After the first loop, the lastMessageId is updated via the upper bound timestamp of the
                    current
                    interval to ensure that the next query will include the correct starting point. This is obviously
                    not done when the client passes a messageId to start reading messages from.
                    Once the messages for an interval have been fetched, they are added to the messageList.
                    If the
                    messageList reaches the page size of 20, this list is returned as the result.
                    In cases where more than 20 messages are retrieved due to concatenating messages from multiple
                    intervals, only the first 20 messages are included. If the page size is not reached after iterating
                    through all intervals, the remaining messages are returned, completing the pagination process.
                
Handling Multiple Buckets
In previous steps, it was assumed that all messages are stored within a single bucket, and the concept of bucketing was not considered. However, when an interval spans multiple buckets, the function must iterate through each bucket in that interval. The buckets are processed in descending order, starting from the latest bucket and working towards the earliest one.
val lastBucket = getBucketValue(leftAtList.first()) var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket
                    The first step in this process is to calculate the highest possible bucket value, which is
                    determined by providing the timestamp of when the user last left the chat. If the user is still in
                    the chat, the current timestamp is used. In the case of pagination, it is preferable not to start
                    with the highest possible bucket value, but rather to begin from the bucket corresponding to the
                    timestamp of the provided messageId. This helps to potentially skip unnecessary buckets
                    and optimize
                    performance.
                
for (i in joinedAtList.indicies) { val firstBucketOfInterval = getBucketValue(joinedAtList[i]) val lastBucketOfInterval = getBucketValue(leftAtList[i]) if (currentBucket < firstBucketOfInterval) { continue } if (currentBucket > lastBucketOfInterval) { currentBucket = lastBucketOfInterval } while (currentBucket >= firstBucketOfInterval) { val messages = messageRepo.findMessages( groupChatId, currentBucket, lastMessageId, joinedAt, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } currentBucket-- } currentBucket = firstBucketOfInterval }
                    Once the bucket values are determined, the function iterates over each interval in the
                    joinedAtList.
                    For each interval, the start (firstBucketOfInterval) and end
                    (lastBucketOfInterval)
                    bucket values
                    are calculated.
                    After that the function checks if the messageId passed by the client falls within the
                    current interval. If it is not, the function skips directly to the next interval and does not query
                    the database.
                    The while loop processes each bucket inside the interval, decrementing the
                    currentBucket with each
                    iteration.
                    Once all messages for a given interval are collected, if the page size has not been reached, the
                    currentBucket is reset to the lowest bound of the interval
                    (firstBucketOfInterval).
                    This step is
                    necessary because the currentBucket would otherwise be decremented one extra time in
                    the last
                    iteration of the while loop. If the next interval falls within the same bucket, this adjustment
                    ensures that the function correctly handles subsequent intervals.
                
Conclusion & Complete Code
This post explored an efficient approach to retrieving group chat messages by considering user presence intervals and bucketing. The solution processes presence intervals, dynamically adjusts bucket values, and retrieves messages in the correct order while avoiding unnecessary queries. By addressing hot partitioning and enabling seamless pagination, this method ensures both performance and scalability. Here is the complete code:
fun findMessages(userId: UserId, conversationId: ConversationId, messageId: MessageId?): FindGroupMessagesResult { val conversation = conversationRepo.findByUserIdAndConversationId(userId, conversationId) ?: return FindGroupMessagesResult.ConversationDoesNotExist if (conversation.type != GROUP) { return FindGroupMessagesResult.NotGroupChat } val joinedAtList = conversation.joinedAt.toList().sortedDescending() val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf() if (joinedAtList.size == leftAtList.size + 1) { leftAtList.add(LocalDateTime.now()) } leftAtList.sortDescending() val lastBucket = getBucketValue(leftAtList.first()) var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli()); val messageList = mutableListOf<MessageByGroupChat>() for (i in joinedAtList.indicies) { val firstBucketOfInterval = getBucketValue(joinedAtList[i]) val lastBucketOfInterval = getBucketValue(leftAtList[i]) if (currentBucket < firstBucketOfInterval) { continue } if (currentBucket > lastBucketOfInterval) { currentBucket = lastBucketOfInterval } while (currentBucket >= firstBucketOfInterval) { val messages = messageRepo.findMessages( groupChatId, currentBucket, lastMessageId, joinedAt, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } currentBucket-- } currentBucket = firstBucketOfInterval } return FindGroupMessagesResult.Success(messageList) } private fun getBucketValue(dateTime: LocalDateTime): Int { return (dateTime.year * 12 + dateTime.monthValue) / 3 } private fun getBucketValueFromMessageId(messageId: MessageId): Int { val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId)) val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) return getBucketValue(localDateTime) }