How is the real e-commerce reconciliation system designed and optimized?

Preface

Previous articles mentioned the reconciliation system in an article on how to update hot data. In fact, I have encountered optimization problems similar to reconciliation in actual business scenarios. One thing to master before talking about optimization is that you must master the relevant features of Java concurrent packages. This chapter relies heavily on this.

  • Articles on efficient update of hot data:

Inventory hint, solves how to update hot data efficiently

A brief introduction to Java concurrency package

CountDownLatch和CyclicBarrier

the difference

CountDownLatch:

  • Cannot be reused, counter cannot be reset

A classic example is that the doorman has to wait until everyone gets off work before he can close the door and take a break. CyclicBarrier:

  • Reusable, it is a synchronization helper class that allows a group of threads to wait for each other until a common barrier point is reached. Suppose you design a program with a set of fixed-size threads, and these threads must not wait for each other. In this case, you can use CyclicBarrier. Because the barrier can be reused after the waiting thread is released, it is called a cyclic barrier.

Classic case: For example, when athletes are running, everyone needs to be ready before the referee can issue an order for everyone to run at the same time. There are dependencies

Case

One day, the boss rushed over and mentioned that the reconciliation system had become slower and slower recently and hoped to optimize it quickly. After understanding the business process of the reconciliation system, I found that it is actually quite simple: when the user places an order through the online mall, an electronic order will be generated and stored in the order database; then the logistics will generate a delivery order for delivery, and the delivery order will be saved in the delivery order database. middle. In order to avoid missed or repeated deliveries, the reconciliation system will check whether there are abnormal orders every day.

The processing logic of the current reconciliation system is very simple: first query the order, then query the delivery order, then compare the order and delivery order, and write the difference record into the difference database. The core code of the reconciliation system is not complicated after being abstracted. It mainly performs query of orders and delivery orders in a single thread, performs reconciliation operations, and finally writes the results to the difference library.

pseudocode

while(存在未对账订单){
  // 查询未对账订单
  pos = getPOrders();
  // 查询派送单
  dos = getDOrders();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

Using Java to optimize the reconciliation system in parallel

The first step is to identify the bottlenecks in the reconciliation system. Currently, due to the huge volume of orders and delivery orders, the speed of querying getPOrders() for unreconciled orders and querying getDOrders() for delivery orders is slow. Is there a quick way to optimize it? Currently the reconciliation system is executed in a single thread. For such a serial system, the first idea to optimize performance is whether multi-threaded parallel processing can be used.

Therefore, we can see where the bottleneck of the reconciliation system is: Can querying unreconciled orders getPOrders() and querying delivery orders getDOrders() be processed in parallel? Obviously, there is no dependency between these two operations. After parallelizing these two time-consuming operations, compared with single-threaded execution, you will find that the throughput of parallel execution is nearly twice that of single-threaded execution in the same time period, and the optimization effect is quite obvious.

With this idea in mind, let's see how to implement it in code. In the code below, we create two threads, T1 and T2, to perform the operations of querying unreconciled orders getPOrders() and querying delivery orders getDOrders() in parallel respectively. The main thread is responsible for performing the reconciliation operation check() and writing the differences to save(). It is worth noting that the main thread needs to wait for threads T1 and T2 to finish executing before it can perform the check() and save() operations. To do this, we wait by calling T1.join() and T2.join(). When threads T1 and T2 end, the main thread that called T1.join() and T2.join() will be released from the blocking state. , and then perform subsequent check() and save() operations.

pseudocode

while(存在未对账订单){
  // 查询未对账订单
  Thread T1 = new Thread(()->{
    pos = getPOrders();
  });
  T1.start();
  // 查询派送单
  Thread T2 = new Thread(()->{
    dos = getDOrders();
  });
  T2.start();
  // 等待 T1、T2 结束
  T1.join();
  T2.join();
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

Use CountDownLatch to implement thread waiting

After the above optimization, you can basically report to your boss that the work is completed, but there are still some regrets. I believe you have also noticed that a new thread is created every time in the while loop, and creating a thread is a time-consuming operation. Therefore, it is best to reuse already created threads. You thought of the thread pool, and indeed, the thread pool can solve this problem.

After optimization via thread pool: We first created a thread pool with a fixed size of 2 and reused the threads in a while loop. Everything seems to be going smoothly, but there seems to be an issue that cannot be solved, which is how the main thread knows when the two operations getPOrders() and getDOrders() have completed. In the previous scenario, the main thread waits for threads T1 and T2 to exit by calling their join() method, but in the thread pool scenario, the thread will not exit at all, so the join() method fails.

So, how to solve this problem? There are many ways you can think of it, the most straightforward of which is to use a counter. Set its initial value to 2, decrease it by 1 after executing pos = getPOrders();, and decrease it by 1 after executing dos = getDOrders();. After that, the main thread waits for the counter to equal 0; when the counter equals 0, it means that the two query operations have been completed. Waiting for the counter to reach 0 is actually a condition variable, and it is not complicated to implement using a monitor.

However, I do not recommend implementing the above solution in actual projects, because the Java concurrency package already provides a tool class with similar functions: CountDownLatch, which we can use directly. In the code example below, we first create a CountDownLatch in a while loop with an initial value of 2. After the two statements pos = getPOrders(); and dos = getDOrders();, the counter is decremented by 1 by calling latch.countDown();. In the main thread, wait for the counter to equal 0 by calling latch.await();.

pseudocode

// 创建 2 个线程的线程池
Executor executor = 
  Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为 2
  CountDownLatch latch = 
    new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

Further optimize performance

After the above series of optimizations, we can finally breathe a sigh of relief and prepare to deliver the project. However, before delivering, it’s worth taking a second look and perhaps there is room for optimization.

Previously, we have implemented parallelization of the two query operations getPOrders() and getDOrders(), but these two query operations and the reconciliation operations check() and save() are still executed serially. Obviously, these two query operations and reconciliation operations can also be executed in parallel, that is, the next round of query operations can be performed while performing the reconciliation operation.

Next, let’s think about how to achieve this optimization. The two query operations can be executed in parallel with the reconciliation operation, which in turn depends on the results of the query operation, which is obviously a characteristic of the producer-consumer model. The two query operations act as producers, and the reconciliation operation acts as consumers. In order to implement this model, we need a queue to store the data generated by the producer, and the consumer takes the data from the queue to perform corresponding operations.

For this reconciliation project, I designed two queues with corresponding relationships between their elements. Specifically, the order query operation inserts the order query results into the order queue, and the delivery order query operation inserts the delivery order into the delivery order queue. There is a one-to-one correspondence between the elements of the two queues. The advantage of using two queues is that the reconciliation operation can take out one element from the order queue and one element from the delivery order queue each time, and then perform the reconciliation operation to ensure data consistency.

Next, let's see how to achieve full parallelization with double queues. A direct idea is: one thread T1 performs order query work, and another thread T2 performs delivery order query work. When threads T1 and T2 have each produced a piece of data, thread T3 is notified to perform the reconciliation operation. This idea seems simple, but in fact there is still one condition: the working rhythm of T1 and T2 must be consistent and synchronized, otherwise one will be faster and the other will be slower, which will affect the respective processes of producing data and informing T3.

Execution can only continue when T1 and T2 each produce a piece of data. In other words, T1 and T2 need to wait for each other to keep in step. At the same time, when both T1 and T2 have produced a piece of data, they need to be able to notify T3 to perform the reconciliation operation.

Using CyclicBarrier to achieve thread synchronization

Next we will implement the methods mentioned in the above scheme. The difficulty of this solution lies in two aspects: first, ensuring that threads T1 and T2 are in sync, and second, being able to effectively notify thread T3.

In the process of solving these two difficulties, a counter can still be used. Initialize the counter to 2, and decrement the counter by 1 every time threads T1 and T2 finish producing a piece of data. If the counter is greater than 0, thread T1 or T2 needs to wait. When the counter equals 0, notify thread T3, wake up the waiting thread T1 or T2, and reset the counter to 2. In this way, threads T1 and T2 can continue to use this counter when producing the next piece of data.

It is recommended not to directly implement this logic in actual projects, because the Java concurrency package already provides related tool classes: CyclicBarrier. In the code below, we first create a CyclicBarrier counter with an initial value of 2. It should be noted that when creating CyclicBarrier, a callback function is passed in. This callback function will be called when the counter reaches 0.

Thread T1 is responsible for querying orders. Every time a piece of data is found, barrier.await() is called to decrement the counter by 1 and wait for the counter to become 0. Thread T2 is responsible for querying delivery orders, and the processing method is similar to thread T1. When both T1 and T2 call barrier.await(), the counter will be reduced to 0. At this time, T1 and T2 can continue to perform the next operation and call the barrier's callback function to perform the reconciliation operation.

It is worth mentioning that CyclicBarrier's counter has an automatic reset function. When the counter decreases to 0, it will automatically reset to the initial value you set. This feature is really convenient and practical.

pseudocode

// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池 
Executor executor = 
  Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
  new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
  
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
  
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.

CountDownLatch and CyclicBarrier are two very convenient thread synchronization tool classes provided by the Java concurrency package. Here, it is necessary to emphasize the difference between them again: CountDownLatch is mainly used to solve the situation where one thread waits for multiple threads, which can be compared to the tour group leader who must wait for all tourists to gather before moving forward; while CyclicBarrier is a The group threads wait for each other, a bit like the mutual assistance and cooperation between several friends. In addition, CountDownLatch's counter does not support reuse, that is, once the counter drops to 0, subsequent threads calling await() will pass directly. In contrast, CyclicBarrier's counter can be recycled and has an automatic reset function. Once the counter reaches 0, it will automatically reset to the set initial value. In addition, CyclicBarrier also supports setting callback functions, which has richer functions.