Kafka data backlog and data duplication processing cases
2024.04.23
Data backlog processing:
- Increase the number of consumers: If the data backlog is serious, you can increase the number of consumer instances to increase the consumption speed.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 增加消费者数量
props.put("max.poll.records", 500); // 每次拉取的最大记录数
props.put("max.partition.fetch.bytes", 1048576); // 每次拉取的最大字节数
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- Adjust the partition allocation strategy of the consumer group: Kafka allocates the partitions of the topic to the consumer instances in the consumer group. By adjusting the partition allocation strategy, you can ensure that the number of partitions processed by each consumer instance is balanced, thereby improving the overall consumption capacity.
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在重新分配分区之前,进行一些清理工作
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 在分配新的分区之后,进行一些初始化工作
}
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- Improve consumer processing capabilities: Optimize consumer logic, such as using batch processing of messages, using multi-threading or asynchronous processing, to increase consumer processing speed.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<SomeRecord> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
SomeRecord processedRecord = processRecord(record);
batch.add(processedRecord);
if (batch.size() >= 100) {
// 批量处理消息
saveBatchToDatabase(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
// 处理剩余的消息
saveBatchToDatabase(batch);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- Expand Kafka cluster: Add more Kafka broker nodes and partitions to improve overall message processing capabilities.
Data duplication processing:
- Use the unique identifier of the message: Set a unique identifier for each message on the producer side, and the consumer can deduplicate the message based on the identifier when processing the message. You can use a field in the message or generate a globally unique identifier (GUID) as the identifier of the message.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!isMessageProcessed(messageId)) {
// 处理消息
processRecord(record);
// 标记消息为已处理
markMessageAsProcessed(messageId);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- Use transactions: If the processing of messages involves data modification operations, you can use Kafka's transaction function to ensure the idempotence and consistency of the messages.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 设置事务ID
props.put("transactional.id", "kafka-transactional-id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
consumer.beginTransaction();
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
processRecord(record);
}
consumer.commitTransaction();
} catch (Exception e) {
consumer.abortTransaction();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- Consumer side deduplication: Maintain a record of processed messages on the consumer side, such as using a database or cache. Each time a message is received, the record is first queried. If the message already exists, the message is ignored.
Set<String> processedMessages = new HashSet<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!processedMessages.contains(messageId)) {
// 处理消息
processRecord(record);
// 添加到已处理消息集合
processedMessages.add(messageId);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- Consumer-side idempotent processing: Implement idempotence in the business logic of the consumer-side to ensure that the final processing result is consistent even if duplicate messages are received.
Solutions to data backlog and data duplication problems need to be adjusted and optimized based on specific business needs and system conditions. In addition, monitoring and measurement systems are also very important to help detect and resolve data backlog and duplication issues in a timely manner.