How to adjust Replication Factor from internal topic in ka

How to adjust Replication Factor from internal topic in ka

The Kafka replication factor is a crucial mechanism that ensures the consistency and availability of data. I’ve previously covered this topic in a post, which you can read here.

the internal topic __consumer_offsets plays a pivotal role in Kafka. It handles the consumers group offset, storing the last message consumed by specific consumer, along with information about broker connection. In the event of a broker failure, it facilitates rebalancing within the consumer group..

It is very important to have this topic well configured to ensure the fault tolerance for the consumers and data consistency, but first, let me give you a context where those problem occurred with me.

I worked in a project that utilized Kafka configured with 3 node configuration. Most topics was with Replication Factor (RF) of 3, this specific topic was with 1. Kafka was fully managed by AWS, the MSK, and every month AWS runs a security patch in the Kafka instance, this update followed a rollout strategy, addressing one node at a time.

As you may have noticed, this led to Kafka downtime for us, because with the rollout update, when the node containing the __consumer_offsets data was temporary removed to update, all the consumers stopped to work because they couldn’t sync with __consumer_offsets until the node rejoined the cluster. (you can skip direct to the code here if you want)

To address this issue, we needed to update the Replication Factor for this specific topic. How did we accomplish this? We began by creating a JSON file with the new replication factor topic distribution, as shown below:

Adjusting Replication Factor

{
    "version": 1,
    "partitions": [
      {"topic": "__consumer_offsets", "partition": 0, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 1, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 2, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 3, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 4, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 5, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 6, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 7, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 8, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 9, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 10, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 11, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 12, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 13, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 14, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 15, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 16, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 17, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 18, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 19, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 20, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 21, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 22, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 23, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 24, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 25, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 26, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 27, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 28, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 29, "replicas": [2, 0, 1]},
      {"topic": "__consumer_offsets", "partition": 30, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 31, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 32, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 33, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 34, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 35, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 36, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 37, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 38, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 39, "replicas": [0, 1, 2]},
      {"topic": "__consumer_offsets", "partition": 40, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 41, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 42, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 43, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 44, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 45, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 46, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 47, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 48, "replicas": [1, 2, 0]},
      {"topic": "__consumer_offsets", "partition": 49, "replicas": [1, 2, 0]}
    ]
}

If you pay attention to the pattern of the replica distribution, we just slides to the right the last replica node. The partition amount was take from topic information.

With this file, you can execute the Kafka script to reassign the partitions, called kafka-reassign-partitions.sh

Below it is an example:

./kafka-reassign-partitions.sh --bootstrap-server <kafka_broker_endpoint>:9198 --command-config client.properties --reassignment-json-file file-inc-replication-factor.json --execute

You can download Kafka shell scripts here

Running this command, the Kafka will execute the reassign in the topic. This can be used with any topic that you have and wish to reassign.

Comments are closed.