Retrieving Group Chat Messages
November 6, 2024
Introduction
Retrieving chat messages may initially appear straightforward, but there are unique considerations for chat applications that require a tailored approach. Chat applications often feature real-time interactions, with continuous updates that demand efficient, high-throughput data handling. In these applications, read and write operations are nearly balanced, and most reads involve recent data, as older messages are rarely accessed. Given these requirements, using a NoSQL database, such as Cassandra, is beneficial. This solution is implemented using Cassandra for data storage and Spring Boot with Kotlin for backend development.
Data Modeling in Cassandra
Data modeling in Cassandra differs significantly from traditional relational database design. Rather than normalizing data and managing relationships, Cassandra prioritizes optimizing tables for specific queries. The process starts by identifying the application’s query patterns, with each table structured to handle these efficiently, avoiding joins and complex operations. A key part of this approach is carefully selecting partition keys and clustering keys. These are structured to support fast, efficient data retrieval for each query, ensuring high performance and scalability.
Partition Key
The partition key is the primary element used to distribute data across Cassandra’s nodes. It determines which node will store a particular row, making it essential for spreading data evenly across the cluster to ensure scalability. When a write request is made, Cassandra applies a hashing function to the partition key, generating a hash value that determines the partition's location on a specific node. This hashing mechanism ensures that the data is distributed uniformly across the cluster, preventing any one node from becoming a bottleneck due to an uneven distribution of data. The partition key’s design is critical to maintaining an even data distribution. If a partition key is too granular or too broad, it can result in hotspots, where certain nodes are overloaded with data while others remain underutilized. This can lead to performance issues and affect the overall scalability of the system. Properly selecting a partition key helps ensure that data is spread evenly across all nodes, allowing Cassandra to scale horizontally and maintain high availability and performance even as the dataset grows. In addition, the partition key enables efficient, direct access to the data associated with a particular key, minimizing the need for joins or additional lookups. Since all rows with the same partition key are stored together on the same node, Cassandra can quickly access the relevant partition without needing to search multiple nodes, further speeding up query performance.
Clustering Key
The clustering key determines the order of rows within each partition, enabling ordered retrieval of data. This is particularly useful in applications like chat systems where message ordering within a conversation is essential.
Challenges in the Chat Message Use Case
Two main challenges arise in retrieving messages for group chats: 1. Data Distribution and Hot Partitions: Partitioning by conversation ID can lead to hot partitions, especially if a highly active group sends a large volume of messages. This uneven data distribution strains certain nodes, impacting performance. 2. User Access Control: In group chats, users may leave and rejoin, creating intervals of participation. Messages sent during periods when a user was absent should not be visible to them upon rejoining. The data model must account for these intervals to enforce accurate message visibility.
Table Design
When the application is opened, the initial query fetches all conversations a user is part of by
querying the conversations_by_user
table. This table serves as an entry point for
loading a user's conversation history efficiently. A "conversation" is a general term that
encompasses two distinct types of chats: user chats and group chats. A user chat represents a 1-on-1
interaction between two participants, offering direct and private communication. In contrast, a
group chat involves multiple participants and supports a wider range of interactions, often
including unique characteristics like roles, permissions, and shared content. Here’s how the table
is structured:

The partition key is the user’s unique identifier, which ensures that all
conversations for a specific user are stored together. This structure allows for fast access to all
of a user’s conversations.
The table also includes essential details like the conversation type and
last message in this particular chat, giving the user a quick overview. For group chats, additional
fields like joined_at
and left_at
(stored as a set of timestamps) capture
the user’s
participation history.
Using a set instead of a list for these timestamps is critical to ensuring
consistency and performance.
In Cassandra, lists have inherent limitations and performance considerations. Operations like
appending or prepending are not idempotent, meaning retries due to timeouts could lead to duplicate
entries — an unacceptable risk in scenarios where data consistency is essential. Additionally, list
operations such as setting or removing an element by position incur an internal read-before-write
operation, which is resource-intensive and slows down performance. In contrast, sets avoid these
pitfalls, offering better safety and efficiency for scenarios requiring consistent and unique data,
such as maintaining accurate participation intervals for users. This ensures reliable and performant
queries for an overview of all conversations.
The initial query provides the data needed to display an overview of all conversations, including
both individual and group chats, without any further lookups.

When a user selects a specific group chat, the app loads the conversation’s messages from the
messages_by_group_chat
table. The partition key is in this case a composite partition
key with the
columns conversation_id
and bucket. The conversation_id
as part of the
partition key ensures all
messages for a given group chat are stored together, while the bucket
column applies a
concept known as bucketing.
TimeUUID
We use TIMEUUID for the message_id
because it is specifically designed to encode both
a timestamp
and a unique identifier in one value, making it ideal for applications that need chronological
ordering and uniqueness. TIMEUUID is a version 1 UUID, which means it contains a timestamp (measured
in 100-nanosecond intervals since October 15, 1582) as well as a node identifier (usually derived
from the machine’s MAC address or a random value). This design allows us to easily sort records by
time, ensuring that messages are ordered correctly, with the most recent appearing first.
The key advantage of using TIMEUUID is its ability to guarantee
chronological order and therefore allows to retrieve the
latest messages first. Additionally, even if multiple messages are created at the same time, the
node identifier ensures that each TIMEUUID is still unique, solving the issue of potential
collisions.
Bucketing
Bucketing is a technique to control the size of each partition by grouping data within a time range
or specific count threshold. In Cassandra, larger partitions can degrade performance, particularly
with high-volume applications, because Cassandra reads are more resource-intensive than writes. By
bucketing messages, we prevent partitions from growing indefinitely, reducing the risk of “hot
partitions” – partitions that receive disproportionate traffic and cause latency spikes.
In this case, the bucket value is calculated to group messages within three-month intervals.
Specifically, the bucket is determined by the formula (year * 12 + month) / 3
, which
divides the
calendar into four buckets per year. This means that each bucket collects messages for one quarter,
creating a natural boundary where, after three months, a new bucket is started.
By using three-month buckets, this approach limits the number of messages stored in any one
partition, which helps avoid performance issues like high read latencies and hot partitions.
Spreading messages across buckets like this keeps partition sizes manageable, while also maintaining
temporal ordering, making it efficient for both storage and retrieval. This partitioning strategy
ensures that as new messages accumulate, the system directs them to the appropriate bucket, thereby
balancing load across the Cassandra nodes and preserving performance over time.
Breakdown of Message Retrieval
This section provides a detailed, step-by-step explanation of the requirements and scenarios that the message retrieval algorithm must address to handle various edge cases effectively. The algorithm and corresponding code are progressively enhanced to tackle increasingly complex scenarios.
Simple Retrieval
The initial scenario focuses on retrieving the latest 20 messages from a chat without implementing pagination or considering user presence intervals. This involves querying only the most recent bucket and limiting the results to 20 messages. The CQL query is structured as follows:
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket LIMIT :limit
The corresponding Kotlin implementation for this query is presented below:
val currentBucket = getBucketValue(LocalDateTime.now()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, limit = 20, ) ... private fun getBucketValue(dateTime: LocalDateTime): Int { return (dateTime.year * 12 + dateTime.monthValue) / 3 }
Pagination
In some cases, a user may want to fetch messages starting from a specific message, rather than just
retrieving the most recent ones. This typically occurs during subsequent requests after the initial
call from the client. For example, when a user scrolls up within the chat to view older messages,
they will reach the top of the first 20 messages and need to load the next batch. This can be
achieved by providing the last known (oldest) messageId.
Traditional offset pagination, where rows are scanned sequentially to find the starting point and
select a specific number of rows, is not suitable for a decentralized database like Cassandra. In
offset pagination, the system would have to sequentially read rows from different nodes, making it
computationally expensive and inefficient. As a result, cursor-based pagination is the preferred
approach for Cassandra.
In this case, the client provides a specific messageId
to retrieve older messages. The
query then
starts from the bucket corresponding to the timestamp in the given messageId
. If no
messageId is
provided, the query defaults to the most recent bucket. The updated CQL query to fetch messages
older than the provided messageId is as follows:
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket AND message_id < :messageId LIMIT :limit
To execute this query, the corresponding bucket for the given messageId
must be
determined. This can
be done by extracting the timestamp from the TimeUUID and calculating the bucket value using this
timestamp. If no messageId
is provided by the user, a synthetic TimeUUID can be
generated by
defining an upper bound time using the helper method Uuids.endOf
.
val currentTimestamp = LocalDateTime.now() var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(currentTimestamp) var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, limit = 20, ) ... private fun getBucketValueFromMessageId(messageId: MessageId): Int { val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId)) val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) return getBucketValue(localDateTime) }
Handling User Presence Interval
After addressing pagination, the next requirement is to ensure that users can only read messages within the period when they were part of the group chat. In some cases, the user may no longer be a part of the chat but still wish to read messages from when they were a member. Additionally, users who joined the chat at a later point should not be allowed to read messages that were sent before their entry. To enforce this constraint, the function must retrieve the relevant presence interval for the user and validate access accordingly. This step requires updating the CQL query to ensure that it respects these boundaries. Specifically, the query needs to include a lower bound to filter messages based on the user's presence interval within the chat. The updated CQL query to handle this requirement is as follows:
SELECT * FROM messages_by_group_chat WHERE group_chat_id = :groupChatId AND bucket = :bucket AND message_id < :messageId AND message_id >= minTimeuuid(:firstMessageTimestamp) LIMIT :limit
To determine the time bounds for messages the user is allowed to view, the userId
(e.g., extracted from a JWT token) and the corresponding groupChatId
are used to
retrieve the chat conversation. In this
scenario, the database call is assumed to return a single timestamp for when the user joined the
chat and another for when they left, rather than a set of multiple values. If the
leftChatAt
timestamp is null - indicating that the user is still part of the chat - the
current timestamp is used as the endpoint.
Using the timestamp from when the user first joined the chat, the minimum possible TimeUUID for the
messageId
that the user can access is constructed. This is then passed into the CQL
query.
val conversation = conversationRepo.findByUserIdAndChatId(userId, groupChatId) ?: return FindGroupMessagesResult.ConversationDoesNotExist if (conversation.type != GROUP) { return FindGroupMessagesResult.NotGroupChat } val currentTimestamp = LocalDateTime.now() val leftAtTimestamp = conversation.leftAt ?: currentTimestamp val joinedAtTimestamp = conversation.joinedAt val currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(leftAtTimestamp) var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, firstMessageTimestamp = joinedAtTimestamp, limit = 20, )
Handling Multiple Intervals
In cases where a user has joined and left the chat multiple times, it is necessary to handle
multiple presence intervals within a single bucket. This requires ensuring that messages are only
retrieved for the periods during which the user was part of the chat. To achieve this, the
joined_at
and left_at
timestamps must be retrieved and sorted in
descending order,
prioritizing the most recent intervals.
The function must iterate over these intervals to reach the desired page size of 20, as the first
interval may contain no messages or not enough messages to fill the page. This iteration process
ensures that all relevant messages are considered for the final result.
The Kotlin code snippet below demonstrates how to sort the joined_at
and
left_at
timestamps
and prepare for the subsequent message retrieval:
val joinedAtList = conversation.joinedAt.toList().sortedDescending() val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf() if (joinedAtList.size == leftAtList.size + 1) { leftAtList.add(LocalDateTime.now()) } leftAtList.sortDescending()
With the intervals sorted, the function can now iterate over each interval, using the
joined_at
and left_at
timestamps to retrieve the messages within the
corresponding time frames. The query ensures
that the message retrieval respects these bounds.
var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli()); val messageList = mutableListOf<MessageByGroupChat>() for (i in joinedAtList.indicies) { if (lastMessageId != messageId && i != 0) { lastMessageId = Uuids.endOf(leftAtList[i].atZone(ZoneOffset.UTC).toInstant().toEpochMilli()) } val messages = messageRepo.findMessages( groupChatId = groupChatId, bucket = currentBucket, messageId = lastMessageId, firstMessageTimestamp = joinedAtList[i], limit = 20, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } } return FindGroupMessagesResult.Success(messageList)
After the first loop, the lastMessageId
is updated via the upper bound timestamp of the
current
interval to ensure that the next query will include the correct starting point. This is obviously
not done when the client passes a messageId
to start reading messages from.
Once the messages for an interval have been fetched, they are added to the messageList
.
If the
messageList
reaches the page size of 20, this list is returned as the result.
In cases where more than 20 messages are retrieved due to concatenating messages from multiple
intervals, only the first 20 messages are included. If the page size is not reached after iterating
through all intervals, the remaining messages are returned, completing the pagination process.
Handling Multiple Buckets
In previous steps, it was assumed that all messages are stored within a single bucket, and the concept of bucketing was not considered. However, when an interval spans multiple buckets, the function must iterate through each bucket in that interval. The buckets are processed in descending order, starting from the latest bucket and working towards the earliest one.
val lastBucket = getBucketValue(leftAtList.first()) var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket
The first step in this process is to calculate the highest possible bucket value, which is
determined by providing the timestamp of when the user last left the chat. If the user is still in
the chat, the current timestamp is used. In the case of pagination, it is preferable not to start
with the highest possible bucket value, but rather to begin from the bucket corresponding to the
timestamp of the provided messageId
. This helps to potentially skip unnecessary buckets
and optimize
performance.
for (i in joinedAtList.indicies) { val firstBucketOfInterval = getBucketValue(joinedAtList[i]) val lastBucketOfInterval = getBucketValue(leftAtList[i]) if (currentBucket < firstBucketOfInterval) { continue } if (currentBucket > lastBucketOfInterval) { currentBucket = lastBucketOfInterval } while (currentBucket >= firstBucketOfInterval) { val messages = messageRepo.findMessages( groupChatId, currentBucket, lastMessageId, joinedAt, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } currentBucket-- } currentBucket = firstBucketOfInterval }
Once the bucket values are determined, the function iterates over each interval in the
joinedAtList
.
For each interval, the start (firstBucketOfInterval
) and end
(lastBucketOfInterval
)
bucket values
are calculated.
After that the function checks if the messageId
passed by the client falls within the
current interval. If it is not, the function skips directly to the next interval and does not query
the database.
The while loop processes each bucket inside the interval, decrementing the
currentBucket
with each
iteration.
Once all messages for a given interval are collected, if the page size has not been reached, the
currentBucket
is reset to the lowest bound of the interval
(firstBucketOfInterval
).
This step is
necessary because the currentBucket
would otherwise be decremented one extra time in
the last
iteration of the while loop. If the next interval falls within the same bucket, this adjustment
ensures that the function correctly handles subsequent intervals.
Conclusion & Complete Code
This post explored an efficient approach to retrieving group chat messages by considering user presence intervals and bucketing. The solution processes presence intervals, dynamically adjusts bucket values, and retrieves messages in the correct order while avoiding unnecessary queries. By addressing hot partitioning and enabling seamless pagination, this method ensures both performance and scalability. Here is the complete code:
fun findMessages(userId: UserId, conversationId: ConversationId, messageId: MessageId?): FindGroupMessagesResult { val conversation = conversationRepo.findByUserIdAndConversationId(userId, conversationId) ?: return FindGroupMessagesResult.ConversationDoesNotExist if (conversation.type != GROUP) { return FindGroupMessagesResult.NotGroupChat } val joinedAtList = conversation.joinedAt.toList().sortedDescending() val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf() if (joinedAtList.size == leftAtList.size + 1) { leftAtList.add(LocalDateTime.now()) } leftAtList.sortDescending() val lastBucket = getBucketValue(leftAtList.first()) var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli()); val messageList = mutableListOf<MessageByGroupChat>() for (i in joinedAtList.indicies) { val firstBucketOfInterval = getBucketValue(joinedAtList[i]) val lastBucketOfInterval = getBucketValue(leftAtList[i]) if (currentBucket < firstBucketOfInterval) { continue } if (currentBucket > lastBucketOfInterval) { currentBucket = lastBucketOfInterval } while (currentBucket >= firstBucketOfInterval) { val messages = messageRepo.findMessages( groupChatId, currentBucket, lastMessageId, joinedAt, ) messageList.addAll(messages) if (messageList.size >= 20) { return FindGroupMessagesResult.Success(messageList.take(20)) } messageList.lastOrNull()?.messageId?.let { lastMessageId = it } currentBucket-- } currentBucket = firstBucketOfInterval } return FindGroupMessagesResult.Success(messageList) } private fun getBucketValue(dateTime: LocalDateTime): Int { return (dateTime.year * 12 + dateTime.monthValue) / 3 } private fun getBucketValueFromMessageId(messageId: MessageId): Int { val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId)) val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC) return getBucketValue(localDateTime) }