Kafka 資料積壓與資料重複處理的案例
2024.04.23
資料積壓處理:
- 增加消費者數量:如果資料積壓嚴重,可以增加消費者實例的數量來提高消費速度。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 增加消费者数量
props.put("max.poll.records", 500); // 每次拉取的最大记录数
props.put("max.partition.fetch.bytes", 1048576); // 每次拉取的最大字节数
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 調整消費者群組的分區分配策略:Kafka將主題的分區分配給消費者群組中的消費者實例。透過調整分區分配策略,可以確保每個消費者實例處理的分區數量均衡,進而提高整體的消費能力。
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在重新分配分区之前,进行一些清理工作
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 在分配新的分区之后,进行一些初始化工作
}
});
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 提升消費者的處理能力:優化消費者邏輯,例如使用批次處理訊息、使用多執行緒或非同步處理等方式,以提高消費者的處理速度。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<SomeRecord> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
SomeRecord processedRecord = processRecord(record);
batch.add(processedRecord);
if (batch.size() >= 100) {
// 批量处理消息
saveBatchToDatabase(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
// 处理剩余的消息
saveBatchToDatabase(batch);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 擴展Kafka叢集:增加更多的Kafka代理節點和分區,以提高整體的訊息處理能力。
資料重複處理:
- 使用訊息的唯一標識:在生產者端為每個訊息設定一個唯一的標識符,消費者在處理訊息時可以根據標識符進行去重。可以使用訊息中的某個欄位或產生全域唯一識別碼(GUID)作為訊息的識別碼。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!isMessageProcessed(messageId)) {
// 处理消息
processRecord(record);
// 标记消息为已处理
markMessageAsProcessed(messageId);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 使用事務:如果訊息的處理涉及資料的修改操作,可以使用Kafka的事務功能來保證訊息的冪等性和一致性。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
// 设置事务ID
props.put("transactional.id", "kafka-transactional-id");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
consumer.beginTransaction();
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
processRecord(record);
}
consumer.commitTransaction();
} catch (Exception e) {
consumer.abortTransaction();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 消費者端去重:在消費者端維護一個已處理訊息的記錄,例如使用資料庫或緩存,每次接收到訊息時先查詢記錄,如果已存在則忽略該訊息。
Set<String> processedMessages = new HashSet<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageId = record.key();
if (!processedMessages.contains(messageId)) {
// 处理消息
processRecord(record);
// 添加到已处理消息集合
processedMessages.add(messageId);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 消費者端冪等性處理:在消費者端的業務邏輯中實現冪等性,即使接收重複的訊息,也能確保最終的處理結果是一致的。
針對資料積壓和資料重複問題的解決方案需要根據具體的業務需求和系統情況進行調整和最佳化。此外,監控和度量系統也是非常重要的,可以幫助及時發現和解決資料積壓和重複問題