Retrieving Group Chat Messages

November 6, 2024

Introduction

Retrieving chat messages may initially appear straightforward, but there are unique considerations for chat applications that require a tailored approach. Chat applications often feature real-time interactions, with continuous updates that demand efficient, high-throughput data handling. In these applications, read and write operations are nearly balanced, and most reads involve recent data, as older messages are rarely accessed. Given these requirements, using a NoSQL database, such as Cassandra, is beneficial. This solution is implemented using Cassandra for data storage and Spring Boot with Kotlin for backend development.

Data Modeling in Cassandra

Data modeling in Cassandra differs significantly from traditional relational database design. Rather than normalizing data and managing relationships, Cassandra prioritizes optimizing tables for specific queries. The process starts by identifying the application’s query patterns, with each table structured to handle these efficiently, avoiding joins and complex operations. A key part of this approach is carefully selecting partition keys and clustering keys. These are structured to support fast, efficient data retrieval for each query, ensuring high performance and scalability.

Partition Key

The partition key is the primary element used to distribute data across Cassandra’s nodes. It determines which node will store a particular row, making it essential for spreading data evenly across the cluster to ensure scalability. When a write request is made, Cassandra applies a hashing function to the partition key, generating a hash value that determines the partition's location on a specific node. This hashing mechanism ensures that the data is distributed uniformly across the cluster, preventing any one node from becoming a bottleneck due to an uneven distribution of data. The partition key’s design is critical to maintaining an even data distribution. If a partition key is too granular or too broad, it can result in hotspots, where certain nodes are overloaded with data while others remain underutilized. This can lead to performance issues and affect the overall scalability of the system. Properly selecting a partition key helps ensure that data is spread evenly across all nodes, allowing Cassandra to scale horizontally and maintain high availability and performance even as the dataset grows. In addition, the partition key enables efficient, direct access to the data associated with a particular key, minimizing the need for joins or additional lookups. Since all rows with the same partition key are stored together on the same node, Cassandra can quickly access the relevant partition without needing to search multiple nodes, further speeding up query performance.

Clustering Key

The clustering key determines the order of rows within each partition, enabling ordered retrieval of data. This is particularly useful in applications like chat systems where message ordering within a conversation is essential.

Challenges in the Chat Message Use Case

Two main challenges arise in retrieving messages for group chats: 1. Data Distribution and Hot Partitions: Partitioning by conversation ID can lead to hot partitions, especially if a highly active group sends a large volume of messages. This uneven data distribution strains certain nodes, impacting performance. 2. User Access Control: In group chats, users may leave and rejoin, creating intervals of participation. Messages sent during periods when a user was absent should not be visible to them upon rejoining. The data model must account for these intervals to enforce accurate message visibility.

Table Design

When the application is opened, the initial query fetches all conversations a user is part of by querying the conversations_by_user table. This table serves as an entry point for loading a user's conversation history efficiently. A "conversation" is a general term that encompasses two distinct types of chats: user chats and group chats. A user chat represents a 1-on-1 interaction between two participants, offering direct and private communication. In contrast, a group chat involves multiple participants and supports a wider range of interactions, often including unique characteristics like roles, permissions, and shared content. Here’s how the table is structured:

Conversation Table

The partition key is the user’s unique identifier, which ensures that all conversations for a specific user are stored together. This structure allows for fast access to all of a user’s conversations. The table also includes essential details like the conversation type and last message in this particular chat, giving the user a quick overview. For group chats, additional fields like joined_at and left_at (stored as a set of timestamps) capture the user’s participation history.

Using a set instead of a list for these timestamps is critical to ensuring consistency and performance. In Cassandra, lists have inherent limitations and performance considerations. Operations like appending or prepending are not idempotent, meaning retries due to timeouts could lead to duplicate entries — an unacceptable risk in scenarios where data consistency is essential. Additionally, list operations such as setting or removing an element by position incur an internal read-before-write operation, which is resource-intensive and slows down performance. In contrast, sets avoid these pitfalls, offering better safety and efficiency for scenarios requiring consistent and unique data, such as maintaining accurate participation intervals for users. This ensures reliable and performant queries for an overview of all conversations.

The initial query provides the data needed to display an overview of all conversations, including both individual and group chats, without any further lookups.

Messages Table

When a user selects a specific group chat, the app loads the conversation’s messages from the messages_by_group_chat table. The partition key is in this case a composite partition key with the columns conversation_id and bucket. The conversation_id as part of the partition key ensures all messages for a given group chat are stored together, while the bucket column applies a concept known as bucketing.

TimeUUID

We use TIMEUUID for the message_id because it is specifically designed to encode both a timestamp and a unique identifier in one value, making it ideal for applications that need chronological ordering and uniqueness. TIMEUUID is a version 1 UUID, which means it contains a timestamp (measured in 100-nanosecond intervals since October 15, 1582) as well as a node identifier (usually derived from the machine’s MAC address or a random value). This design allows us to easily sort records by time, ensuring that messages are ordered correctly, with the most recent appearing first. The key advantage of using TIMEUUID is its ability to guarantee chronological order and therefore allows to retrieve the latest messages first. Additionally, even if multiple messages are created at the same time, the node identifier ensures that each TIMEUUID is still unique, solving the issue of potential collisions.

Bucketing

Bucketing is a technique to control the size of each partition by grouping data within a time range or specific count threshold. In Cassandra, larger partitions can degrade performance, particularly with high-volume applications, because Cassandra reads are more resource-intensive than writes. By bucketing messages, we prevent partitions from growing indefinitely, reducing the risk of “hot partitions” – partitions that receive disproportionate traffic and cause latency spikes. In this case, the bucket value is calculated to group messages within three-month intervals. Specifically, the bucket is determined by the formula (year * 12 + month) / 3, which divides the calendar into four buckets per year. This means that each bucket collects messages for one quarter, creating a natural boundary where, after three months, a new bucket is started. By using three-month buckets, this approach limits the number of messages stored in any one partition, which helps avoid performance issues like high read latencies and hot partitions. Spreading messages across buckets like this keeps partition sizes manageable, while also maintaining temporal ordering, making it efficient for both storage and retrieval. This partitioning strategy ensures that as new messages accumulate, the system directs them to the appropriate bucket, thereby balancing load across the Cassandra nodes and preserving performance over time.

Breakdown of Message Retrieval

This section provides a detailed, step-by-step explanation of the requirements and scenarios that the message retrieval algorithm must address to handle various edge cases effectively. The algorithm and corresponding code are progressively enhanced to tackle increasingly complex scenarios.

Simple Retrieval

The initial scenario focuses on retrieving the latest 20 messages from a chat without implementing pagination or considering user presence intervals. This involves querying only the most recent bucket and limiting the results to 20 messages. The CQL query is structured as follows:

</> CQL
SELECT * FROM messages_by_group_chat 
WHERE group_chat_id = :groupChatId 
AND bucket = :bucket 
LIMIT :limit 
        

The corresponding Kotlin implementation for this query is presented below:

</> Kotlin
val currentBucket = getBucketValue(LocalDateTime.now())

val messages = messageRepo.findMessages(
    groupChatId = groupChatId,
    bucket = currentBucket,
    limit = 20,
)

...

private fun getBucketValue(dateTime: LocalDateTime): Int {
    return (dateTime.year * 12 + dateTime.monthValue) / 3
}
        

Pagination

In some cases, a user may want to fetch messages starting from a specific message, rather than just retrieving the most recent ones. This typically occurs during subsequent requests after the initial call from the client. For example, when a user scrolls up within the chat to view older messages, they will reach the top of the first 20 messages and need to load the next batch. This can be achieved by providing the last known (oldest) messageId. Traditional offset pagination, where rows are scanned sequentially to find the starting point and select a specific number of rows, is not suitable for a decentralized database like Cassandra. In offset pagination, the system would have to sequentially read rows from different nodes, making it computationally expensive and inefficient. As a result, cursor-based pagination is the preferred approach for Cassandra. In this case, the client provides a specific messageId to retrieve older messages. The query then starts from the bucket corresponding to the timestamp in the given messageId. If no messageId is provided, the query defaults to the most recent bucket. The updated CQL query to fetch messages older than the provided messageId is as follows:

</> CQL
SELECT * FROM messages_by_group_chat 
WHERE group_chat_id = :groupChatId 
AND bucket = :bucket 
AND message_id < :messageId 
LIMIT :limit 
        

To execute this query, the corresponding bucket for the given messageId must be determined. This can be done by extracting the timestamp from the TimeUUID and calculating the bucket value using this timestamp. If no messageId is provided by the user, a synthetic TimeUUID can be generated by defining an upper bound time using the helper method Uuids.endOf.

</> Kotlin
val currentTimestamp = LocalDateTime.now()
var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(currentTimestamp)
var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli())

val messages = messageRepo.findMessages(
    groupChatId = groupChatId,
    bucket = currentBucket,
    messageId = lastMessageId,
    limit = 20,
)

...

private fun getBucketValueFromMessageId(messageId: MessageId): Int {
    val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId))
    val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
    return getBucketValue(localDateTime)
}
        

Handling User Presence Interval

After addressing pagination, the next requirement is to ensure that users can only read messages within the period when they were part of the group chat. In some cases, the user may no longer be a part of the chat but still wish to read messages from when they were a member. Additionally, users who joined the chat at a later point should not be allowed to read messages that were sent before their entry. To enforce this constraint, the function must retrieve the relevant presence interval for the user and validate access accordingly. This step requires updating the CQL query to ensure that it respects these boundaries. Specifically, the query needs to include a lower bound to filter messages based on the user's presence interval within the chat. The updated CQL query to handle this requirement is as follows:

</> CQL
SELECT * FROM messages_by_group_chat 
WHERE group_chat_id = :groupChatId 
AND bucket = :bucket 
AND message_id < :messageId 
AND message_id >= minTimeuuid(:firstMessageTimestamp)
LIMIT :limit 
        

To determine the time bounds for messages the user is allowed to view, the userId (e.g., extracted from a JWT token) and the corresponding groupChatId are used to retrieve the chat conversation. In this scenario, the database call is assumed to return a single timestamp for when the user joined the chat and another for when they left, rather than a set of multiple values. If the leftChatAt timestamp is null - indicating that the user is still part of the chat - the current timestamp is used as the endpoint. Using the timestamp from when the user first joined the chat, the minimum possible TimeUUID for the messageId that the user can access is constructed. This is then passed into the CQL query.

</> Kotlin
val conversation = conversationRepo.findByUserIdAndChatId(userId, groupChatId) ?: return FindGroupMessagesResult.ConversationDoesNotExist
if (conversation.type != GROUP) {
    return FindGroupMessagesResult.NotGroupChat
}

val currentTimestamp = LocalDateTime.now()
val leftAtTimestamp = conversation.leftAt ?: currentTimestamp
val joinedAtTimestamp = conversation.joinedAt
val currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: getBucketValue(leftAtTimestamp)
var lastMessageId = messageId ?: Uuids.endOf(currentTimestamp.atZone(ZoneOffset.UTC).toInstant().toEpochMilli())

val messages = messageRepo.findMessages(
    groupChatId = groupChatId,
    bucket = currentBucket,
    messageId = lastMessageId,
    firstMessageTimestamp = joinedAtTimestamp,
    limit = 20,
)
        

Handling Multiple Intervals

In cases where a user has joined and left the chat multiple times, it is necessary to handle multiple presence intervals within a single bucket. This requires ensuring that messages are only retrieved for the periods during which the user was part of the chat. To achieve this, the joined_at and left_at timestamps must be retrieved and sorted in descending order, prioritizing the most recent intervals. The function must iterate over these intervals to reach the desired page size of 20, as the first interval may contain no messages or not enough messages to fill the page. This iteration process ensures that all relevant messages are considered for the final result. The Kotlin code snippet below demonstrates how to sort the joined_at and left_at timestamps and prepare for the subsequent message retrieval:

</> Kotlin
val joinedAtList = conversation.joinedAt.toList().sortedDescending()
val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf()
if (joinedAtList.size == leftAtList.size + 1) {
    leftAtList.add(LocalDateTime.now())
}
leftAtList.sortDescending()
        

With the intervals sorted, the function can now iterate over each interval, using the joined_at and left_at timestamps to retrieve the messages within the corresponding time frames. The query ensures that the message retrieval respects these bounds.

</> Kotlin
var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
val messageList = mutableListOf<MessageByGroupChat>()
for (i in joinedAtList.indicies) {
    if (lastMessageId != messageId && i != 0) {
        lastMessageId = Uuids.endOf(leftAtList[i].atZone(ZoneOffset.UTC).toInstant().toEpochMilli())
    }

    val messages = messageRepo.findMessages(
        groupChatId = groupChatId,
        bucket = currentBucket,
        messageId = lastMessageId,
        firstMessageTimestamp = joinedAtList[i],
        limit = 20,
    )
    messageList.addAll(messages)

    if (messageList.size >= 20) {
        return FindGroupMessagesResult.Success(messageList.take(20))
    }

    messageList.lastOrNull()?.messageId?.let { lastMessageId = it }
}

return FindGroupMessagesResult.Success(messageList)
        

After the first loop, the lastMessageId is updated via the upper bound timestamp of the current interval to ensure that the next query will include the correct starting point. This is obviously not done when the client passes a messageId to start reading messages from. Once the messages for an interval have been fetched, they are added to the messageList. If the messageList reaches the page size of 20, this list is returned as the result. In cases where more than 20 messages are retrieved due to concatenating messages from multiple intervals, only the first 20 messages are included. If the page size is not reached after iterating through all intervals, the remaining messages are returned, completing the pagination process.

Handling Multiple Buckets

In previous steps, it was assumed that all messages are stored within a single bucket, and the concept of bucketing was not considered. However, when an interval spans multiple buckets, the function must iterate through each bucket in that interval. The buckets are processed in descending order, starting from the latest bucket and working towards the earliest one.

</> Kotlin
val lastBucket = getBucketValue(leftAtList.first())
var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket
        

The first step in this process is to calculate the highest possible bucket value, which is determined by providing the timestamp of when the user last left the chat. If the user is still in the chat, the current timestamp is used. In the case of pagination, it is preferable not to start with the highest possible bucket value, but rather to begin from the bucket corresponding to the timestamp of the provided messageId. This helps to potentially skip unnecessary buckets and optimize performance.

</> Kotlin
for (i in joinedAtList.indicies) {
    val firstBucketOfInterval = getBucketValue(joinedAtList[i])
    val lastBucketOfInterval = getBucketValue(leftAtList[i])

    if (currentBucket < firstBucketOfInterval) {
        continue
    }

    if (currentBucket > lastBucketOfInterval) {
        currentBucket = lastBucketOfInterval
    }

    while (currentBucket >= firstBucketOfInterval) {
        val messages = messageRepo.findMessages(
            groupChatId,
            currentBucket,
            lastMessageId,
            joinedAt,
        )

        messageList.addAll(messages)

        if (messageList.size >= 20) {
            return FindGroupMessagesResult.Success(messageList.take(20))
        }

        messageList.lastOrNull()?.messageId?.let { lastMessageId = it }

        currentBucket--
    }

    currentBucket = firstBucketOfInterval
}
        

Once the bucket values are determined, the function iterates over each interval in the joinedAtList. For each interval, the start (firstBucketOfInterval) and end (lastBucketOfInterval) bucket values are calculated. After that the function checks if the messageId passed by the client falls within the current interval. If it is not, the function skips directly to the next interval and does not query the database. The while loop processes each bucket inside the interval, decrementing the currentBucket with each iteration. Once all messages for a given interval are collected, if the page size has not been reached, the currentBucket is reset to the lowest bound of the interval (firstBucketOfInterval). This step is necessary because the currentBucket would otherwise be decremented one extra time in the last iteration of the while loop. If the next interval falls within the same bucket, this adjustment ensures that the function correctly handles subsequent intervals.

Conclusion & Complete Code

This post explored an efficient approach to retrieving group chat messages by considering user presence intervals and bucketing. The solution processes presence intervals, dynamically adjusts bucket values, and retrieves messages in the correct order while avoiding unnecessary queries. By addressing hot partitioning and enabling seamless pagination, this method ensures both performance and scalability. Here is the complete code:

</> Kotlin
fun findMessages(userId: UserId, conversationId: ConversationId, messageId: MessageId?): FindGroupMessagesResult {
    val conversation = conversationRepo.findByUserIdAndConversationId(userId, conversationId) ?: return FindGroupMessagesResult.ConversationDoesNotExist
    if (conversation.type != GROUP) {
        return FindGroupMessagesResult.NotGroupChat
    }

    val joinedAtList = conversation.joinedAt.toList().sortedDescending()
    val leftAtList = conversation.leftAt?.toMutableList() ?: mutableListOf()
    if (joinedAtList.size == leftAtList.size + 1) {
        leftAtList.add(LocalDateTime.now())
    }
    leftAtList.sortDescending()

    val lastBucket = getBucketValue(leftAtList.first())
    var currentBucket = messageId?.let { getBucketValueFromMessageId(it) } ?: lastBucket
    var lastMessageId = messageId ?: Uuids.endOf(leftAtList.first().atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
    val messageList = mutableListOf<MessageByGroupChat>()

    for (i in joinedAtList.indicies) {
        val firstBucketOfInterval = getBucketValue(joinedAtList[i])
        val lastBucketOfInterval = getBucketValue(leftAtList[i])

        if (currentBucket < firstBucketOfInterval) {
            continue
        }

        if (currentBucket > lastBucketOfInterval) {
            currentBucket = lastBucketOfInterval
        }

        while (currentBucket >= firstBucketOfInterval) {
            val messages = messageRepo.findMessages(
                groupChatId,
                currentBucket,
                lastMessageId,
                joinedAt,
            )

            messageList.addAll(messages)

            if (messageList.size >= 20) {
                return FindGroupMessagesResult.Success(messageList.take(20))
            }

            messageList.lastOrNull()?.messageId?.let { lastMessageId = it }

            currentBucket--
        }

        currentBucket = firstBucketOfInterval
    }

    return FindGroupMessagesResult.Success(messageList)
}

private fun getBucketValue(dateTime: LocalDateTime): Int {
    return (dateTime.year * 12 + dateTime.monthValue) / 3
}

private fun getBucketValueFromMessageId(messageId: MessageId): Int {
    val instant = Instant.ofEpochMilli(Uuids.unixTimestamp(messageId))
    val localDateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
    return getBucketValue(localDateTime)
}