Kuaishou’s practice on processing massive model data

2024.02.15

1. Introduction to model scenarios

1. Real-time large model

picture

*The data in this article are of immediate nature and do not represent real-time data.

Kuaishou’s model scenes are mainly real-time large models. Real-time is mainly reflected in social interaction. New users upload more than 15 million videos every day, there are more than 100 million active live broadcast users every day, and the number of uploads is increasing year-on-year every year.

The big one is mainly reflected in the traffic scale. Kuaishou's current daily activity has reached 387 million, with an average daily exposure of 100 billion levels and an average daily playback of 10 billion levels. The model size is very large, and it must be real-time. And Kuaishou's core value is equality and universal benefit, that is, when tens of millions of users are online at the same time, different content will be recommended when making personalized requests.

To sum up, data processing is characterized by being both large and real-time.

2. The recommendation business is complex

picture

The general recommendation business architecture is as shown in the figure above. In the video pool (for example, there are tens of millions of videos), it will go through four fixed stages:

  • Recall: Recall tens or thousands of videos from tens of millions of videos.
  • Rough sorting: Select thousands of videos through a rough sorting funnel.
  • Fine sorting: Thousands of videos will go through fine sorting, and the top few hundred videos will be screened.
  • Rearrange: Enter rearrangement, give model scores, and do model verification.
  • Return: With some mechanisms and diversified operations, dozens of results are finally selected and returned to the user. The entire funnel is very demanding.

Kuaishou's business types are relatively diverse, which can be mainly divided into large-scale businesses and small and medium-sized businesses.

The sample size of large-scale business is very large. For example, the samples recommended by the main website in one day may be hundreds of billions, and the storage can reach p level. Iteration mainly uses streaming iteration, that is, online iteration of features and models, which is very fast. If you choose batch iteration, it will take 30 days to trace back the sample, and the resources required are dozens of times that of streaming iteration. Kuaishou's large-scale traffic allocation is relatively large, so we tend to do online streaming iteration experiments, which are fast. The amount of resources consumed is relatively small.

For small and medium-sized businesses, the number of samples per day is about tens of billions, and the storage is about tens of terabytes. Choosing streaming iteration will require frequent online iterations, and the traffic allocation will not be enough. In this case, batch iteration is generally used as much as possible. At this time, a large number of calculation samples are required. For example, if the backtracking is at least 60 days, the backtracking sample can reach p level. Because for large models, if the amount of data is not enough and the model training is insufficient, the effect will decrease accordingly. Therefore, in this small business scenario, we still tend to use batch iteration and look back at more days of samples to make the model reach a more stable state. In this scenario, batch iterative experiments will be preferred.

3. Data volume of recommended model

picture

Here is a screenshot from a trillion-level model article previously published by Kuaishou. Kuaishou is a personalized model, so the number of parameters is very large. From the comparison in the figure, OpenAI's GPT3 parameter size is 175B, but Kuaishou's parameter size is 1900B, which has reached the trillion level. Mainly because Kuaishou uses the SIM long sequence model, which requires long-term user interest and then inputs the sequence into the model. Kuaishou has hundreds of millions of users, and life-long interests require more than 100,000 sequences, plus the superposition of hundreds of billions of samples, so the number of parameters is very large, reaching 1.9 trillion. Although the parameter types of these 1.9 trillion parameters are different from those of OpenAI's GPT 3 model, the calculation amount is also different. But from the perspective of parameter magnitude, Kuaishou recommendation is very large.

4. Evolution of language models

picture

The recommendation model is closely related to the language model. Generally, new models will iterate on the language model. After success, recommendation models will be introduced, such as DN, RNN, and Transformer. The picture above is a picture released by Amazon in March, which mainly introduces some progress of language models.

It can be seen that before 17 years ago, the RNN model was mainly used. The RNN model is trained after traversing the data in order. The model does not require high parallel computing power, but the model convergence is more complicated because there may be a problem of gradient disappearance. After the emergence of Transformer in 2017, the language model broke through the original limitations and could perform concurrent iterations, so its computing power increased massively.

The tree in the figure is divided into three parts: (1) The red line part is the encoder-only technology, the earliest is the Bert model; (2) The green line is the encoder-decoder type, Google mainly chooses this type; (3) The blue line mainly It is the type selected by ChatGPT in the open API. This type of model is the best developed because it is simple enough and only needs to consider the decoder. The amount of calculation is small, and the model effect will be very good.

2. Large-scale model data processing

1. Background-effectiveness

picture

Kuaishou has high requirements for data timeliness. After users see the video, it will be fed back to Kuaishou's log collection system. The user's behavior will be spliced ​​into the recommendation log in real time (the recommendation log is the characteristics dropped by the recommendation service). The feature flow plus behavior The stream becomes a sample stream and enters subsequent feature processing and then enters model training. After the model training is completed, it is updated to the online prediction in real time. The online prediction will recommend some videos that best meet the user's needs based on the model update. The link requires that the delay must be within one second, and user behavior needs to be fed back into the model as soon as possible, so the timeliness requirements for big data processing are very high.

2. Large data volume processing

picture

Kuaishou has tens of millions of users online. Without considering the diversity of behaviors, the QPS is at least tens of millions. If the diversity of behaviors is distinguished, the number of combinations will be even more explosive. During the peak period, it needs to process about 30T per second. status.

The industry solution mainly uses the Flink streaming framework, but if Flink is used directly to introduce state join, it will cause a large number of slow nodes when there are thousands of concurrent connections. Because if the 30T state is 1000 concurrent, 30G state needs to be stored. If there are 10000 concurrent, 3G must be stored. 3G has a very high probability of slow nodes under 10,000 concurrent connections. In this case, if a slow node appears, it will take several hours to recover, which is definitely intolerable for the recommendation system.

So Kuaishou chose a compromise plan, sinking the state to high-performance storage, and then using stateless hash join to create a real-time join state. As long as the user's behavior and characteristics are all present, the download of the sample will be triggered immediately. This ensures that behavior can be fed back to the model in a timely manner. Although the features and behaviors come in different orders, large-scale high-performance joins can be achieved through external states and the parallel operations of the Flink streaming framework.

3. Complex feature calculation

picture

After the above processing is completed, there is a feature calculation scenario, which mainly includes two types of calculations, scalar calculation and vector calculation. Scalar computation is similar to feature processing, such as summing and averaging certain values. In vector calculation, the same operation is performed on the same column of a batch of samples, and is calculated on the GPU through CUDA. In this way, high-performance computing is achieved by using the collaboration between GPU and CPU. Some scalar operations are calculated on the CPU, and memory access is also performed on the CPU, and then transferred to the GPU for high-performance GPU calculations.

In order to ensure the flexibility of algorithm iteration, DSL abstraction is adopted. Because SQL cannot fully describe all feature processing scenarios. For example, if there are some operations in the time window, if you do it through SQL, you need to write some custom UDF, which is not conducive to iteration. Therefore, our DSL is described in Python, and users can directly call the lower-level efficient execution operators through Python. The first step is to write the computing layer and use C++ to implement some efficient operators, including CUDA and CPU-related calculations, which are also done through the C++ library. Under the runtime, Flink's distributed framework and GNI are used to call these C++ operators to achieve high-performance and high-throughput processing.

4. Features of recommended scenarios

There are two characteristics in the recommended scenario, one is the integration of batch and flow, and the other is tide.

picture

The two scenarios of batch survey and online experiment will require batch-stream integration, because after the survey characteristics or survey model structure are completed in the batch scenario, it needs to be done online, so there is a need for a unified description language that integrates batch and stream. unified execution engine. When users conduct batch research, they will use DSL, Hadoop and Spark to calculate all data and do model iterations. After the model iteration is successful, the features are online, either to the streaming general feature processing framework, or to a processing framework specialized in the streaming feature framework. The reason why there are two nodes here is mainly because there are some features that are common to all models, so they may be under a common framework, so they only need to be calculated once. Under the specialized operators are some features unique to the model, so they are treated separately. But the two calculation engines and language descriptions are actually the same. Similarly, these general-processed data need to be placed in a batch scenario. In the batch scenario, many iterations are based on base features and will add their own cost-effectiveness features, so Delta is also calculated in the batch scenario.

After the online service is completed, the online service will be available. There will be a high-performance storage and computing library to handle this, which will be discussed later. In streaming scenarios, the focus is on high throughput, low latency, and high availability. In batch scenarios, the main focus is on high throughput and high reliability.

picture

Another feature is request tide. The picture above is a schematic diagram of the request tide (not the actual traffic of Kuaishou). As can be seen from the figure, there are two peaks: morning peak and evening peak. During peak periods, sufficient online computing power needs to be provided, and during low peak periods, redundant computing power must be utilized.

In this case, Kuaishou's big data processing framework and all online modules need to make some changes to the cloud-native architecture based on the characteristics of Tide, such as fast recovery, automatic scaling, and rapid scaling. The main reason for fast scaling is that automatic scaling is not guaranteed to be efficient. For example, if automatic scaling takes an hour or several hours, then online requests will suffer relatively large losses during these hours.

In addition, it is also necessary to unify the resource pool of online services and the resource pool of big data processing, so that all resources can use redundant computing power for batch scenarios, large model pre-training scenarios, or large model batch estimation during off-peak periods. scenarios so that resources can be utilized. All of Kuaishou's current architectures are evolving towards cloud-native architectures.

3. Large-scale model data storage

1. Storage characteristics

picture

The first feature of large-scale data storage is ultra-low latency, because storage nodes store status, and some computing nodes require a lot of status information to perform calculations, so storage nodes spend most of their time on leaf nodes, and it is recommended There are thousands of modules in online experiments, and each module can only provide a timeout of less than ten milliseconds or at most tens of milliseconds. Therefore, it is necessary to ensure that all storage nodes are low-latency, high-throughput, and highly available.

There is a mutual switching process between the recommendation experiment and the recommendation service base. Generally, there are a lot of parallel experiments. After the experiment is completed, it will be switched to an online base, so the traffic it will bear will be very large. For example, in the training service base, there are recalled bases, rough sorting bases and fine sorting bases. Each base needs to bear tens of millions of QPS and provide ultra-high reliability. Therefore, in the online storage part, a large number of all-memory architectures are used.

picture

Secondly, Kuaishou has a need for large storage. As mentioned earlier, the Kuaishou large model has 1.9 trillion parameters. If it is replaced with an ordinary eight-dimensional float, the required storage will also be 64T, and there will also be a full user behavior sequence with about 180T of status information. If full memory storage is to be used, more than 2,000 machines will be needed. Moreover, all states need to be restored within 30 minutes, because if the recommendation system does not recover for more than 30 minutes, it will have a very large impact on the line and the user experience will be very poor.

In response to the above needs, our solutions mainly include the following:

  • Access to feature score: Feature score can be understood as feature importance, that is, some features with relatively low importance and minimal impact on the estimated effect are not placed in online storage.
  • Elimination of LRU and LFU: Because it is an online model, reliability needs to be ensured, that is, the memory needs to be maintained within a stable range and cannot continue to grow. Therefore, we eliminate the furthest updated ones first and retain the earliest accessed ones first.
  • NVM new hardware technology: The resource consumption of full-memory architecture is also a very big problem. We introduced NVM hardware technology. NVM is a persistent storage, a newly released hardware from Intel. It will be between DR and SSD, with a speed close to memory and a storage space close to SSD, taking into account both storage and performance.

2. Storage solution-NVM Table

picture

The storage solution is NVM Table, which is divided into three layers of heterogeneous storage: the physical layer provides the underlying storage API, including NVM storage and memory storage; the intermediate memory pool encapsulates unified management functions and manages both NVM and memory modules; the upper layer business Use an API of the memory pool to call the underlying NVM and memory to provide unified query logic.

In terms of data structure layout, memory pool uses block interface abstraction. Divide NVM and memory into several different blocks that can be accessed through global unified addresses, so that zero copy access can be liberalized. For some frequently accessed keys, they will be placed on mem-key. Keys that are rarely accessed will be placed on NVM. Some index keys will be accessed frequently, but after the key is found, its value will only be used when it is finally returned to the upstream, and the magnitude is large, so the value is placed in persistent storage. There are many Key queries and they are also relatively small, so they are placed in memory, thus realizing the zero-copy technology of memory and NVM. The hash table here uses industry-leading lock-free technology to reduce competition in critical sections and achieve efficient storage.

From a scenario test data of NVM Table, we can see that the ultimate throughput of its network is equivalent to that of JIRA. Cross-network access generally reaches the limit of the network, so NVM bandwidth can completely cover the network bandwidth, and the bottleneck is mainly on the network. This ensures that NVM has both cost benefits and large storage and high throughput benefits. On the other hand, recovery time also dropped by 120x. It initially took two hours to restore T's data, but after using NVM it only took 2 minutes.

picture

3. Storage solution - strong consistency

picture

In terms of storage, there is also a need for strong consistency, mainly because there are also some advertisements and e-commerce recommendations in the recommendation scenario, and a lot of copies need to be stored. Because when some new short videos or new materials come in, all downstream modules will have a concurrent distribution. It needs to ensure that these videos reach all recommendation services within 10 seconds, and the status in all recommendation services needs to be consistent. Otherwise, it will have a great impact on the performance of the model.

We adopt the model of Raft protocol plus BT. The Raft protocol is mainly responsible for group selection and data synchronization. The BT mode is mainly to transform the BT synchronization mode, such as synchronization on the scale of tens of thousands of machines. If master-slave synchronization is used at the same time, the export bandwidth of the master node may be If the number of slave nodes is more than a thousand times, the bandwidth will become a bottleneck, very little status will be issued, and high throughput and data synchronization will be affected.

Our solution is distributed balanced tree distribution. We construct a balanced binary tree and organize all master and slave nodes. Each node only manages a limited number of slave nodes, thereby ensuring that the bandwidth required for synchronization from the master node to the leaf nodes remains unchanged. However, The bandwidth of a single node is limited to less than or equal to 2, so that both one-time and efficient synchronization can be achieved globally, and all video status can be distributed to each node within 10 seconds.

4. Outlook

picture

The development of recommendation models is related to language models. From DNN models to Wide&Deep, to Transformer, to SIM long sequence and generative models, the models have grown many times. In addition to the growth of models, the growth of computing power will also increase exponentially with the growth of videos and users. Judging from statistical data, the computing power of recommended models has increased nearly 10 times in the past two years. Our solution is mainly to optimize the engineering architecture and new hardware technology.

picture

The generative model will bring about an explosion in the amount of calculation, because it is a token-based recommendation. Each recommendation requires all previous tokens as context. In this case, the generated effect will be the best. If there is no token-based, then the computing power will not increase exponentially. Therefore, the pressure for recommendation will mainly come from the large-scale improvement of state storage, because the current recommendation model is mainly pointwise recommendation, and the computing power of the long sequence recommendation model is also limited. If all deep-level model recommendations are adopted, the state storage will increase by another 10 times, which will be a huge challenge. Therefore, we need to use some new hardware, such as CXL, NVM and the newly launched Grace architecture, as well as engineering optimization, such as state difference, transmission calculation, etc., to meet future challenges.