實現百萬資料從Excel匯入到資料庫的方式

2024.04.09

場景分析

這個案例實際上涉及多個方面,需要我們系統地分析。讓我們先看看,從Excel中讀取百萬級資料並將其插入資料庫時可能遇到的問題:

  1. 記憶體溢位風險

載入如此龐大的Excel資料可能會導致記憶體溢出,需要注意記憶體管理。

  1. 效能瓶頸

處理百萬級資料的讀取和插入操作可能很耗時,效能最佳化至關重要。

  1. 例外處理策略

讀取和導入過程中會有各種潛在問題,我們需妥善處理各類異常情況。

記憶體溢位問題

處理百萬級數據,直接載入到記憶體中顯然不切實際。解決之道在於採用串流讀取,分批處理資料。

在技​​術選項上,選擇EasyExcel是明智之舉。它專為處理大數據量和複雜Excel檔案進行了最佳化。 EasyExcel在解析Excel時,不會將整個檔案一次載入記憶體中,而是按行從磁碟逐一讀取資料並解析。

效能問題

針對百萬級資料的處理,單線程顯然效率低。提升效能的關鍵在於多執行緒處理。

多執行緒應用涉及兩個場景:一個是多執行緒讀取文件,另一個是多執行緒實作資料插入。這涉及到生產者-消費者模式,多執行緒讀取並多執行緒插入,以最大程度提升整體效能。

在資料插入方面,除了利用多線程,還應結合資料庫的批次插入功能以進一步提升速度。

錯誤處理

在檔案讀取和資料庫寫入過程中,可能遇到諸多問題,如資料格式錯誤、不一致性和重複資料等。

因此,應分兩步驟處理。首先進行資料檢查,在插入操作前檢查資料格式等問題,然後在插入過程中處理異常情況。

處理方式多種多樣,可透過交易回溯或記錄日誌。一般不建議直接回滾操作,而是自動重試,若嘗試多次仍無效,則記錄日誌,隨後重新插入資料。

此外,在此過程中,需考慮資料重複問題,可在Excel中設定若干欄位為資料庫唯一約束。遇到資料衝突時,可覆蓋、跳過或報錯處理。根據實際業務情況選擇合適的處理方式,一般情況下,跳過並記錄日誌是相對合理的選擇。

解決思路

所以,總體方案如下:

利用EasyExcel進行Excel資料讀取,因其逐行讀取資料而非一次載入整個檔案至記憶體。為提高並發效率,將百萬級資料分佈在不同的工作表中,利用執行緒池和多執行緒同時讀取各個工作表。在讀取過程中,使用EasyExcel的ReadListener進行資料處理。

在處理過程中,並非每個資料都直接操作資料庫,以免對資料庫造成過大壓力。設定一個批次大小,例如每1000條數據,將從Excel中讀取的數據暫時儲存在記憶體中(可使用List實作)。每讀取1000個資料後,執行資料的批次插入操作,可簡單地借助mybatis實作批次插入。

此外,在處理過程中,需要考慮並發問題,因此我們將使用線程安全的佇列來儲存記憶體中的臨時數據,例如ConcurrentLinkedQueue。

經驗證,透過上述方案,讀取並插入100萬條資料的Excel所需時間約為100秒,不超過2分鐘。

具體實現

為了提升並發處理能力,我們將百萬級資料儲存在同一個Excel檔案的不同工作表中,然後透過EasyExcel並發地讀取這些工作表資料。

EasyExcel提供了ReadListener接口,允許在每批資料讀取後進行自訂處理。我們可以基於此功能實現文件的分批讀取。

pom依賴

首先,需要加入以下依賴:

<dependencies>
    <!-- EasyExcel -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>easyexcel</artifactId>
        <version>latest_version</version>
    </dependency>

    <!-- 数据库连接和线程池 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
</dependencies>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

並發讀取多個sheet

然後實作並發讀取多個sheet的程式碼:

@Service
public class ExcelImporterService {

    @Autowired
    private MyDataService myDataService;
    
    public void doImport() {
        // Excel文件的路径
        String filePath = "users/paidaxing/workspace/excel/test.xlsx";

        // 需要读取的sheet数量
        int numberOfSheets = 20;

        // 创建一个固定大小的线程池,大小与sheet数量相同
        ExecutorService executor = Executors.newFixedThreadPool(numberOfSheets);

        // 遍历所有sheets
        for (int sheetNo = 0; sheetNo < numberOfSheets; sheetNo++) {
            // 在Java lambda表达式中使用的变量需要是final
            int finalSheetNo = sheetNo;

            // 向线程池提交一个任务
            executor.submit(() -> {
                // 使用EasyExcel读取指定的sheet
                EasyExcel.read(filePath, MyDataModel.class, new MyDataModelListener(myDataService))
                         .sheet(finalSheetNo) // 指定sheet号
                         .doRead(); // 开始读取操作
            });
        }

        // 启动线程池的关闭序列
  executor.shutdown();

        // 等待所有任务完成,或者在等待超时前被中断
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            // 如果等待过程中线程被中断,打印异常信息
            e.printStackTrace();
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.

這段程式碼透過建立一個固定大小的執行緒池來並發讀取一個包含多個sheets的Excel檔案。每個sheet的讀取作為一個單獨的任務提交給執行緒池。

我們在程式碼中用了一個MyDataModelListener,這個類別是ReadListener的一個實作類別。當EasyExcel讀取每一行資料時,它會自動呼叫我們傳入的這個ReadListener實例的invoke方法。在這個方法中,我們就可以定義如何處理這些資料。

MyDataModelListener也包含doAfterAllAnalysed方法,這個方法在所有資料都讀取完畢後被呼叫。這裡可以執行一些清理工作,或處理剩餘的資料。

讀監聽器

接下來,我們來實作這個我們的ReadListener:

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;

// 自定义的ReadListener,用于处理从Excel读取的数据
public class MyDataModelListener implements ReadListener<MyDataModel> {
    // 设置批量处理的数据大小
    private static final int BATCH_SIZE = 1000;
    // 用于暂存读取的数据,直到达到批量大小
    private List<MyDataModel> batch = new ArrayList<>();

    
    private MyDataService myDataService;

    // 构造函数,注入MyBatis的Mapper
    public MyDataModelListener(MyDataService myDataService) {
        this.myDataService = myDataService;
    }

    // 每读取一行数据都会调用此方法
    @Override
    public void invoke(MyDataModel data, AnalysisContext context) {
        //检查数据的合法性及有效性
        if (validateData(data)) {
            //有效数据添加到list中
            batch.add(data);
        } else {
            // 处理无效数据,例如记录日志或跳过
        }
        
        // 当达到批量大小时,处理这批数据
        if (batch.size() >= BATCH_SIZE) {
            processBatch();
        }
    }

    
    private boolean validateData(MyDataModel data) {
        // 调用mapper方法来检查数据库中是否已存在该数据
        int count = myDataService.countByColumn1(data.getColumn1());
        // 如果count为0,表示数据不存在,返回true;否则返回false
        if(count == 0){
         return true;
        }
        
        // 在这里实现数据验证逻辑
        return false;
    }


    // 所有数据读取完成后调用此方法
    @Override
    public void doAfterAllAnalysed(AnalysisContext context) {
        // 如果还有未处理的数据,进行处理
        if (!batch.isEmpty()) {
            processBatch();
        }
    }

    // 处理一批数据的方法
    private void processBatch() {
        int retryCount = 0;
        // 重试逻辑
        while (retryCount < 3) {
            try {
                // 尝试批量插入
                myDataService.batchInsert(batch);
                // 清空批量数据,以便下一次批量处理
                batch.clear();
                break;
            } catch (Exception e) {
                // 重试计数增加
                retryCount++;
                // 如果重试3次都失败,记录错误日志
                if (retryCount >= 3) {
                    logError(e, batch);
                }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.

透過自訂MyDataModelListener,在讀取Excel檔案過程中可實現資料處理。每讀取一條資料後,將其加入列表,在列表累積達到1000條時,執行一次資料庫批次插入操作。若插入失敗,則進行重試;若多次嘗試仍失敗,則記錄錯誤日誌。

批量插入

這裡批量插入,用到了MyBatis的批量插入,程式碼實作如下:

import org.apache.ibatis.annotations.Mapper;
import java.util.List;

@Mapper
public interface MyDataMapper {
    void batchInsert(List<MyDataModel> dataList);

    int countByColumn1(String column1);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

mapper.xml檔:

<insert id="batchInsert" parameterType="list">
    INSERT INTO paidaxing_test_table_name (column1, column2, ...)
    VALUES 
    <foreach collection="list" item="item" index="index" separator=",">
        (#{item.column1}, #{item.column2}, ...)
    </foreach>
</insert>

<select id="countByColumn1" resultType="int">
    SELECT COUNT(*) FROM your_table WHERE column1 = #{column1}
</select>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.