In the previous two blogs in this series I showed how solving an apparently simple problem about loading a lot of data into RAM using mmap()
also turned out to require a solution that improved CPU use across cores.
In this blog, I’ll show how we dealt with the bottleneck problems that ensued, and finally, how we turned to coarse threading to utilize the available cores as well as possible whilst keeping the physical memory usage doable.
These are the stages we went through:
- Preprocessing
- Loading the Data
- Fine-grained Threading
Now we move to Stage 4, where we tackle the bottleneck problem:
4 Preprocessing Reprise
So far so good right? Well yes and no. We've improved things to use multithreading and SIMD but profiling in vtune
still showed bottlenecks. Specifically, in the IO subsystem, paging the data from disk into system memory (via mmap
). The access pattern through the data is the classic thing we see with textures in OpenGL where the texture data doesn't all fit into GPU memory, so it ends up thrashing the texture cache with the typical throw-out-the-least-recently-used-stuff as of course we need the oldest stuff again on the next iteration of the outer loop.
This is where the expanded, preprocessed data is biting us in the backside. We saved runtime cost at the expense of disk and RAM usage and this is now the biggest bottleneck to the point where we can't feed the data from disk (SSD) to the CPU fast enough to keep it fully occupied.
The obvious thing would be to reduce the data size, but how? We can't use the old BED file format, as the quantization used is too coarse for the offset + scaled data. We can't use lower precision floats as that only reduces by a small constant factor. Inspecting the data of some columns in the matrix, I noticed that there are very many repeated values. Which makes total sense given the highly quantized input data. So, we tried compressing each column using zlib. This worked like magic - the preprocessed data came out only 5% larger than the quantized original BED data file!
Because we are compressing each column of the matrix independently, and the compression ratio varies depending upon the needed dictionary size and the distribution of repeated elements throughout the column, we need a way to be able to find the start and end of each column in the compressed preprocessed bed file. So, whilst preprocessing, we also write out a binary index companion file which, for each column, stores the offset of the column start in the main file and its byte size.
So when wanting to process a column of data in the inner loop, we lookup in the index file the extent of the column's compressed representation in the mmap()
'd file, decompress that into a buffer of the right size (we know how many elements each column has, it's the number of people) and then wrap that up in the Eigen Map
helper.
Using zlib like this really helped in reducing the storage and memory needed. However, now profiling showed that the bottleneck had shifted to the decompression of the column data. Once again, we have improved things, but we still can't keep the CPU fed with enough data to occupy it for that inner loop workload.
5 Coarse Threading
How to proceed from here? What we need is a way to balance the CPU threads and cycles used for decompressing the column data with the threads and cycles used to then analyze each column. Remember that we are already using SIMD vectorization and parallel_for
and parallel_reduce
for the inner loop workload.
After thinking over this problem for a while I decided to have a go at solving it with another feature of Intel TBB, the flow graph. The flow graph is a high-level interface and data driven way to construct parallel algorithms. Once again, behind the scenes this eventually gets decomposed into the threadpool + tasks as used by parallel_for
and friends.
The idea is that you construct a graph from various pre-defined node types into which you can plug lambdas for performing certain operations. You can then set options on the different node types and connect them with edges to form a data flow graph. Once set up, you send data into the graph via a simple message class/struct and it flows through until the results fall out of the bottom of the graph.
There are many node types available but for our needs just a few will do:
- Function node: Use this along with a provided lambda to perform some operation on your data e.g. decompress a column of data or perform the
doStuff()
inner loop work. This node type can be customized as to how many parallel instantiations of tasks it can make from serial behavior to any positive number. We will have need for both as we shall see.
- Sequencer node: Use this node to ensure that data arrives at later parts of the flow graph in the correct order. Internally it buffers incoming messages and uses a provided comparison functor to re-order the messages ready for output to successor nodes.
- Limiter node: Use this node type to throttle the throughput of the graph. We can tell it a maximum number of messages to buffer from predecessor nodes. Once it reaches this limit it blocks any more input messages until another node triggers it to continue.
I've made some very simple test cases of the flow graph in case you want to see how it works and how I built up to the final graph we used in practice.
The final graph used looks like this:
A few things to note here:
- We have a function node to perform the column decompression. This is allowed to use multiple parallel tasks as each column can be decompressed independently of the others due to the way we compressed the data at preprocess time.
- To stop this from decompressing the entire data set as fast as possible and blowing up our memory usage, we limit this with a limiter node set to some small number roughly equal to the number of cores.
- We have a second function node limited to sequential behavior that calls back to our algorithm class to do the actual work on each decompressed column of data.
Then we have the two ordering nodes. Why do we need two of them? The latter one ensures that the data coming out of the decompression node tasks arrives in the order that we expect (as queued up by the inner loop). This is needed because, due to the kernel time-slicing the CPU threads, they may finish in a different order to which they were enqueued.
The requirement for the first ordering node is a little more subtle. Without it, the limiter node may select messages from the input in an order such that it fills up its internal buffer but without picking up the first message which it needs to send as an output. Without the ordering node up front, the combination of the second ordering node and the limiter node may cause the graph to effectively deadlock. The second ordering node would be waiting for the nth message, but the limiter node is already filled up with messages which do not include the nth one.
Finally, the last function node which processes the "sequential" (but still SIMD and parallel_for
pimped up) part of the work uses a graph edge to signal back to the limiter node when it is done so that the limiter node can then throw the next column of data at the decompressor function node.
With this setup, we have a high level algorithm which is self-balancing between the decompression steps and the sequential doStuff()
processing! That is actually really nice, plus it is super simple to express in just a few lines of code and it remains readable for future maintenance. The code to setup this graph and to queue up the work for each iteration is available at github.
The resulting code now uses 100% of all available cores and is balancing the work of decompression and processing the data. Meanwhile the data processing also utilizes all cores well. The upside of moving the inner loop to be represented by the flow graph means that the decompression + column processing went from 12.5s per iteration (on my hexacore i7) to 3s. The 12.5s was measured with the sequential workload already using parallel_for
and SIMD. So this is another very good saving.
Summary
We have shown how a simple "How do I use mmap()
?" mentoring project has grown beyond its initial scope and how we have used mmap, Eigen,parallel_for
/parallel_reduce
, flow graphs and zlib to nicely make the problem tractable. This has shown a nice set of performance improvements whilst at the same time keeping the disk and RAM usage within feasible limits.
- Shifted work that can be done once to a preprocessing step
- Kept the preprocessed data size down as low as possible with compression
- Managed to load even large datasets into memory at once with mmap
- Parallelized the inner loop operations at a low level with
parallel_for
- Parallelized the high-level loop using the flow graph and made it self-balancing
- Fairly optimally utilizing the available cores whilst keeping the physical memory usage down (number of threads used * col size roughly).
Thanks for reading this far! I hope this helps reduce your troubles, when dealing with big data issues.
2 Comments
28 - Aug - 2019
Dominik Haumann
Instead of zlib another de/compression algorithm that is very fast is LZ4, see benchmark: https://github.com/lz4/lz4/blob/dev/README.md You will loose some compression ratio, but in term of speed there is nothing (???) better. Of course it's a tradeoff, as always :)
2 - Sept - 2019
Sean Harmer
Thanks for the pointer. Another one worth looking at is https://github.com/facebook/zstd.