電商真實對帳系統是如何設計並優化的
往期文章在熱點資料如何更新的一篇文章中有提到對帳系統。其實我在實際業務場景中是曾經遇到類似對帳的最佳化問題的。說優化之前要掌握一點就是一定要掌握Java並發包的相關特性。本章節對此有很大依賴。
- 熱點數據高效率更新文章:
inventory hint,解決熱點資料如何有效率地更新
Java並發包簡說
CountDownLatch和CyclicBarrier
差別
倒數計時鎖存器:
- 不可以重複使用,計數器無法被重置
經典案例例如門衛休息休要等所有人下班才可以關門休息CyclicBarrier:
- 可以重複使用,是一個同步輔助類,允許一組執行緒相互等待,直到到達某個公共屏障點(common barrier point)。假設設計一組固定大小的執行緒的程式中,這些執行緒必須不是的互相等待,此時就可以使用CyclicBarrier。因為該barrier在釋放等待線程後可以重複使用,從而稱為循環的barrier。
經典案例:例如運動員跑步,需要所有人準備好之後裁判才可以發令讓大家在同一時刻去跑。有依賴關係
案例
有一天,老大匆忙趕來,提到對帳系統最近越來越緩慢,希望能迅速進行最佳化。經過了解對帳系統的業務流程,發現其實相當簡單:使用者透過線上商城下單會產生電子訂單並儲存在訂單資料庫中;隨後物流會產生派送單用於出貨,派送單則儲存於派送單庫中。為了避免漏發或重複派送,對帳系統每天會查核是否有異常訂單。
目前對帳系統的處理邏輯很簡單:先查詢訂單,然後查詢派送單,接著比對訂單和派送單,將差異記錄寫入差異庫。對帳系統的核心程式碼經過抽象化後,也不複雜,主要是在單執行緒中循環執行訂單和派送單的查詢,進行對帳操作,最後將結果寫入差異庫。
虛擬程式碼
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
利用Java並行優化對帳系統
首先要找出對帳系統的瓶頸所在。目前,由於訂單量和派送單量龐大,導致查詢未對帳訂單 getPOrders() 和查詢派送單 getDOrders() 的速度較慢。是否有一種快速優化的方法呢?目前對帳系統是單線程執行的。對於這樣的串列系統,優化效能的第一個想法是能否利用多執行緒並行處理。
因此,我們可以看出對帳系統的瓶頸在哪裡:查詢未對帳訂單 getPOrders() 和查詢派送單 getDOrders() 是否能夠並行處理呢?很顯然,這兩個操作之間並沒有依賴關係。將這兩個耗時操作並行化後,與單執行緒執行相比,您會發現在相同時間段內,並行執行的吞吐量接近單執行緒的兩倍,最佳化效果頗為明顯。
有了這個思路,接下來我們來看看如何用程式碼實作。在下面的程式碼中,我們建立了兩個執行緒T1 和T2,分別並行執行查詢未對帳訂單 getPOrders() 和查詢派送單 getDOrders() 的操作。主執行緒則負責執行對帳操作 check()和將差異寫入 save() 的操作。值得注意的是:主執行緒需要等待執行緒T1 和T2 執行完畢後才能執行 check() 和 save() 這兩個操作。為此,我們透過呼叫T1.join() 和T2.join() 實作等待,當執行緒T1 和T2 結束時,呼叫了T1.join() 和T2.join() 的主執行緒將從阻塞狀態中解除,隨後執行後續的check() 和save() 操作。
虛擬程式碼
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
用CountDownLatch 實作執行緒等待
經過上述優化,基本上可以向老闆報告工作完成了,但仍有一些遺憾之處。我相信您也已經注意到了,在 while 循環中每次都會創建新的線程,而創建線程是一個耗時的操作。因此,最好能夠重複利用已建立的執行緒。您想到了線程池,確實,線程池能夠解決這個問題。
透過執行緒池進行最佳化後:我們先建立了一個固定大小為2的執行緒池,並在 while 迴圈中重複利用這些執行緒。一切看起來都進行得很順利,但似乎有一個問題無法解決,即主線程如何知道 getPOrders() 和 getDOrders() 這兩個操作何時執行完成。在前面的方案中,主執行緒透過呼叫執行緒 T1 和 T2 的 join() 方法來等待它們退出,但是在執行緒池方案中,執行緒根本就不會退出,因此 join() 方法失效了。
那麼,如何解決這個問題呢?您可以想出許多方法,其中最直接的方法是使用一個計數器。將其初始值設為2,執行完 pos = getPOrders(); 後減1,執行完 dos = getDOrders(); 後也減1。主執行緒在這之後等待計數器等於0;當計數器等於0時,表示這兩個查詢操作已執行完畢。等待計數器為0其實是一種條件變量,使用管程實作起來也不複雜。
然而,我並不建議在實際專案中實作上述方案,因為Java並發套件中已經提供了類似功能的工具類別:CountDownLatch,我們直接使用即可。在下面的程式碼範例中,我們在 while 迴圈中首先建立了一個CountDownLatch,計數器的初始值為2。在 pos = getPOrders(); 和 dos = getDOrders(); 兩個語句後,透過呼叫 latch.countDown(); 來實現對計數器的減1操作。在主執行緒中,透過呼叫 latch.await(); 來實現對計數器等於0的等待。
虛擬程式碼
// 创建 2 个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
進一步優化效能
經過以上一連串的優化,終於可以鬆一口氣,準備交付專案。然而,在交付之前,再次審視一番是值得的,或許還有優化的空間。
前面我們已經實現了將 getPOrders() 和 getDOrders() 這兩個查詢操作並行化,但是這兩個查詢操作與對帳操作 check() 和 save() 之間仍然是串行執行的。很顯然,這兩個查詢操作與對帳操作也可以並行執行,即在執行對帳操作的同時可以進行下一輪的查詢操作。
接下來,我們再思考如何實現這項優化。兩次查詢操作能夠與對帳操作並行執行,而對帳操作又依賴查詢操作的結果,明顯具有生產者-消費者模型的特徵。兩次查詢操作充當生產者,對帳操作為消費者。為了實現這種模型,我們需要一個佇列來儲存生產者產生的數據,而消費者則從佇列中取出資料執行對應操作。
針對這個對帳項目,我設計了兩個佇列,其元素之間存在對應關係。具體來說,訂單查詢操作將訂單查詢結果插入訂單隊列,派送單查詢作業將派送單插入派送單隊列,這兩個隊列的元素之間是一一對應的。使用兩個佇列的好處在於,對帳操作可以每次從訂單佇列取出一個元素和派送單佇列中取出一個元素,然後執行對帳操作,確保資料的一致性。
接下來,讓我們看看如何透過雙隊列實現完全並行化。一個直接的思路是:一個執行緒 T1 執行訂單查詢工作,另一個執行緒 T2 執行派送單查詢工作。當執行緒 T1 和 T2 都各自生產完一條資料時,通知執行緒 T3 執行對帳操作。這個想法看似簡單,實際上仍然存在一個條件:T1 和 T2 的工作節奏必須一致,保持同步,否則一個快一個慢將影響各自生產數據並通知 T3 的過程。
只有在T1和T2各自生產一條資料時才能繼續執行,也就是說,T1和T2需要互相等待,保持步調一致。同時,當T1和T2都生產完一筆資料時,還需能通知T3執行對帳操作。
用CyclicBarrier 實作線程同步
接下來我們將實作上述方案中所提到的方法。此方案的困難點在於兩個面向:一是確保執行緒 T1 和 T2 的步調一致,二是能夠有效通知執行緒 T3。
在解決這兩個困難的過程中,仍然可以利用一個計數器。將計數器初始化為2,每當執行緒 T1 和 T2 生產完一條資料時,都會將計數器減1。若計數器大於0,則執行緒 T1 或 T2需要等待。當計數器等於0時,通知執行緒 T3,喚醒等待的執行緒 T1 或 T2,並將計數器重設為2。如此,線程 T1 和 T2 在生產下一條資料時,可以繼續使用這個計數器。
建議不要在實際專案中直接實作這個邏輯,因為Java並發包中已經提供了相關的工具類別:CyclicBarrier。在下面的程式碼中,我們首先建立了一個初始值為2的CyclicBarrier計數器。需要注意的是,在建立CyclicBarrier時,傳入了一個回呼函數。當計數器減至0時,此回呼函數會被呼叫。
線程 T1 負責查詢訂單,每查到一條數據,調用barrier.await()將計數器減1,並等待計數器變為0。線程 T2 負責查詢派送單,處理方式與線程 T1 類似。當 T1 和 T2 都呼叫barrier.await()時,計數器會減至0,此時 T1 和 T2可以繼續執行下一步操作,並呼叫barrier的回呼函數執行對帳操作。
值得一提的是,CyclicBarrier的計數器具有自動重置功能。當計數器減至0時,會自動重新設定為您設定的初始值。這項特性確實方便實用。
虛擬程式碼
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
CountDownLatch 和CyclicBarrier 是Java並發套件提供的兩個非常方便的執行緒同步工具類別。在這裡,有必要再次強調它們之間的區別:CountDownLatch 主要用於解決一個線程等待多個線程的情況,可以類比於旅遊團團長必須等待所有遊客齊集後才能繼續前行;而CyclicBarrier 則是一組線程互相等待,有點像幾個驢子朋友的互助合作。此外,CountDownLatch 的計數器不支援重複利用,即一旦計數器降至0,後續呼叫await()的執行緒將直接通過。相較之下,CyclicBarrier 的計數器可以循環利用,同時具有自動重置功能,一旦計數器減至0,將會自動重設為設定的初始值。此外,CyclicBarrier 還支援設定回呼函數,功能更加豐富。