2. Tour of the approaches
I will summarize all the approaches I investigated to answer the problematic. They are presented in a chronological order. Even if I only chose the last approach, the work made on each of the preceding ones helped me to take a step back. Also the fact that an approach has been put aside does not mean that it is not feasible.
2.1. Pure SQL
For the previous example, one can just write a sub-query to achieve the row-scoped reuse:
SELECT col2, sum(res), avg(res)
FROM (SELECT col2, f(col1) AS res FROM table)
GROUP BY col2
We can go even further and achieve the global-scoped reuse by adding a GROUP BY and a JOIN:
SELECT col2, sum(res), avg(res)
FROM (
table
JOIN
(SELECT col1, f(col1) AS res
FROM table
GROUP BY col1
) cache
ON table.col1=cache.col1
) GROUP BY col2
This sub-query based approach works just well, but it is not that satisfying:
- This makes the user less productive by requiring him to spend time to think about these complex requests, thus leading to lose the major benefit of Spark usage.
- This can quickly become unreadable and unmaintainable when things become more complex, with several functions taking several arguments and nesting possibilities.
- This forces Spark to build intermediate tables with many copies of the same results.
- This join trick is not straightforward when it comes to Spark Streaming.
- Catalyst rules seems to tend to automatically flatten the requests in many cases, for example the two following requests result in the same under-optimized physical execution plan:
SELECT res, f(res)
FROM (SELECT f(col) as res FROM table)
SELECT f(col), f(f(col))
FROM table
2.2. Catalyst rules tuning
Catalyst features a highly composable rule system. Users declaration is represented as a tree. Optimizations are rules applied one after the other on these trees. It has been released with a vast pool of rules from dozens of contributors.
Catalyst exposes extension friendly hooks like the Data Sources and the User Defined Types (UDTs), but it also let users dig into rules system for fine tuning.
To leverage this rule approach, one have to:
- Find the physical planning rules responsible for the flattening of the requests in order to disable them.
- Add new rules performing row-scoped reuse (putting global-scoped reuse objective aside).
2.3. Common Sub-expression Elimination (CSE)
CSE is the optimization which consists in pre-computing a value needed in several points further.
For example:
a := 10
b := f(a) * 2
c := f(a) + b*f(a)
can be factorized with CSE into:
a := 10
intermediate := f(a)
b := intermediate * 2
c := intermediate + b*intermediate
This mechanism tracks dependencies to ensure that a
value does not change between two usages of the non updated intermediate
variable.
The only cost added by CSE is that of storing intermediate
variable.
2.3.1. Digression: Javac, JIT and CSE
Here is the compilation pipeline in which a user’s Spark SQL request go through:
-
After having performed the Analysis, Logical Optimization and Physical Planning phases on the user request, Catalyst finishes its work by performing the generation of Java source code for fast uninterpreted row processing.
-
The Java source code generated by Catalyst will then pass into Java compiler (javac) that will compile it into bytecode. Javac does nothing in terms of CSE.
-
Then comes the Just In Time compiler (JIT): The bytecode in charge of processing the rows will be considered as “hot” by the JIT during runtime and will be compiled into native machine code. The JIT is really limited in terms of CSE in the sense it only considers expressions as contiguous bytecode blocks, thus seeing
x + y
andy + x
orx = w; x + y
andw + y
as different expressions [3]. Finer optimizations are possible but they are considered to be too expensive [4], leading to lose the JIT’s lightning compilation feature. One can figure out that the JIT is totally helpless in our case by looking at the generated code of any DataFrame (withdf.queryExecution.debug.codegen()
): We can see that UDFs are stored in an array of useful job referencies (UDFs, converters…), the same UDF being replicated in the array as many times as it is called per row, the code contains plenty of intermediate variables… All this complexity makes JIT’s CSE totally blind.
2.3.2. Catalyst’s Codegen
Catalyst’s Codegen phase seems to be an adapted place to implement a CSE mechanism for UDFs. Note that we are not considering an external extension or package but an actual framework modification, that will have to be proposed and discussed with the community in order to be usable.
It requires a dip dig into sources to integrate the feature as clean as possible in Spark, especially in org.apache.spark.sql.execution.WholeStageCodegenExec
and org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
.
2.4. Memoization
Function caching is a general concept referring to the saving of a function result to prevent having to do it again.
Here we will talk about a sub-type of function caching called Memoization. We will refer to this term, as the process of changing a deterministic function into another one that has been decorated with the ability to somehow remember a part of its results.
Memoization achieves approximate global-scoped reuse whereas CSE in Codegen achieves row-scoped reuse. Memoization might cause some overhead due to loockups and cache management costs. CSE is preferable for really cheap functions (like hashing) but when it comes to expensive UDFs and data distribution leveraging global-scoped reuse, memoization must be favored.
A wise memoization implementation is crucial for performances. Three points have to be kept in mind when designing a memoization solution [5]:
- Equality
- Precise dependences
- Space management
We will focus on equality and space management because precise dependences is too use-case-dependent for the general purpose memoization we target.
2.4.1. Equality
2.4.1.1. Data structure
When it comes to retrieving a result from some sort of lookup table, a test of equality is involved to answer “Are my new inputs already present in the keys of my lookup table ?”. A common data structure choice implements a key-value mapping, keys representing specific inputs for a given function. The equality of the inputs are dictated by the equality of their corresponding keys.
2.4.1.2. Computing keys
The way we compute keys affects both the speed of the lookups and the logic of the equality check. Here are some approaches:
- Hashing algorithms: Like MD5 or the SHA-X family. They are $O(inputsize)$ in time.
- Hash consing [6]: This approach considers that a given object in the system is unique, making its memory address become a natural key for lookups, for free. The hypothesis on the system design makes hash consing hard to leverage in practice.
- Lazy structures sharing [7]: As the systems required by hash consing never exists, this approach is an alternative: Every time a new object is seen, if we figured out that it is equal to another object with another memory address, we make them to point to the same location and we release the other one.
2.4.1.3. Inspecting Scala hashCode behavior
It is crucial to think about the equality checks because they can make drastic performance changes.
Let’s inspect few interesting Scala statements:
These ones evaluates to true
:
1.hashCode == 1L.hashCode
SortedSet(1, 2).hashCode == TreeSet(2, 1).hashCode
.List(1, 2).hashCode == Vector(1, 2).hashCode
These ones evaluates to false
:
1.hashCode == 1.0.hashCode
Array(1,2).hashCode == Array(1,2).hashCode
((i: Int) => i).hashCode == ((i: Int) => i).hashCode
One have to pay attention to these language-specific concerns and check if these behaviors are the ones expected or not. For example Scala Array
s are built directly on top of Java arrays and their hashing is done based on their memory address so if we want to have their equality checked against other sequences based on the order of their elements, we have to perform a pre-cast: Array(1, 2).toSeq.hashCode == List(1, 2).hashCode
evaluates to true
.
2.4.1.4. Function hashing
In a functional language like Scala, equality check between functions is a core issue. Out-of-the-box, Scala hashes and compares function instances by their memory address (hash consing). This is not satisfactory at all: “Claiming that functions are never equivalent, for example, is not satisfactory because the result of a call involving some function as a parameter will never be re-used” [5]. It is therefore crucial to find a way to compare functions over values, for example using a comparison based on their bytecode representation [8].
2.4.2. Space management and eviction policies
After the design of key computation comes the choices concerning the cache. Caches are most of the time key-value mappings based on tree maps or hash tables implementations for fast retrieves and insertions. All the possible implementations have a crucial issue in common: they are bounded by physical limits. We will dig into one of the main aspects to consider when designing a cache, directly introduced by their finite size: the eviction policy. The eviction policy is the algorithm that is in charge of managing introductions and deletions of entries when the cache has reached its maximum capacity.
2.4.2.1 hit-ratio
The most widely used way to evaluate an eviction policy is based on a score they are intended to maximize, called the hit-ratio: $hitratio=\frac{n_{hits}}{n_{hits} + n_{misses}}$, where $n_{hits}$ is the number of times a computation result has been retrieved directly from the cache and $n_{misses}$ is the number of times the result was not present in the cache and needed to be computed. A hit-ratio of 0 means that the results are never retrieved and need always to be computed, in the opposite a hit-ratio of 1 means that all the results needed were found in the cache.
2.4.2.2. Tour of algorithms
This is one example of computer science problems where there is no “free lunch”, meaning that “In general the replacement policy must be application-specific, because for any fixed policy there are programs whose performance is made worse by that choice” [7]. Let’s introduce some of the common algorithms.
Bélády’s
Actually there is a perfect algorithm called the Bélády’s optimal algorithm [9], also know as FutureKnown or clairvoyant algorithm, but it requires to know all the future incoming requests to apply its replacements. It discards the entries that will not come up again for the longest time in the future. As it is impossible to satisfy the clairvoyant hypothesis in practice, this algorithm is used as a theoretical upper bound for eviction policies hit-ratio performances.
Random Replacement (RR)
This policy is by far the naivest in the sense it just evicts a random entry when needed. It is the cheapest eviction algorithm both in terms of space and time, making it a fine choice for use cases with low memory and computational resources.
First In First Out (FIFO)
This policy just evicts the oldest entry present in the cache. Despite its simplicity, it may be very efficient for requests workload presenting some sort of drift pattern.
Least Recently Used (LRU)
This policy remembers when the entries have been requested for the last time. When time comes to evict one entry, it will chose the one with the oldest last usage date.
It excels on recency biased workloads which are requests streams in which same requests tend to be packed.
There is many variations of LRU. We can cite the Segmented Least Recently Used (SLRU) whose various implementations maintains several cache regions and associates a hotness score to entries. The hottest entries are placed in a protected part of the cache, making them harder to evict.
Least Frequently Used (LFU)
This policy manages counters representing how often each entry of the cache is requested. When time comes to evict one entry, it will chose the one with the smallest frequency.
It excels on frequency biased workloads which are requests streams in which many requests reappear in the long run.
This policy is good at leveraging long term patterns in requests distribution: entries that have proved their frequency value can pass through some periods of inactivity while keeping their place in the cache.
LFU algorithms have a major downside: with time going, it becomes harder and harder for newly introduced entries to stay in the cache because older entries have a frequency score growing for long and new comers are evicted before having the time to prove their frequency value. To counter this effect, one can:
- Introduce a decay of the frequency score over time. An exponential decay is often used: $score = frequencye^{-ktime}$ .
- Keep in memory the frequency scores of evicted entries causing additional space cost.
Adaptive algorithms, WC-W-TinyLFU
Some algorithms have the ability to tune themselves over time, typically fitting their trade off between frequency and recency to the workload their are facing. Because of their complexity these algorithms might be more expensive in space-time than the previous ones, but their are actually at the state-of-the-art in terms of hit-ratio performances. We can cite Adaptive Replacement Cache [10], Window Climbing Tiny Least Frequently Used (WC-W-TinyLFU) [11], or LeCaR based on regret minimization [12].
WC-W-TinyLFU’s remarkable performances made it widely adopted by companies and projects like Cassandra, LinkedIn’s feed, Akka or Apache Druid among many others. Let’s overview few points of the WC-W-TinyLFU’s algorithm behavior in steady state, i.e. when the maximum size of the cache is reached:
- WC-W-TinyLFU maintains three caches:
- Main cache: managed with an LRU policy.
- Admission filter cache: managed by TinyLFU, a light-weight LFU approximation based on Bloom filters [13].
- Window cache: managed with an SLRU policy.
- When a new entry comes:
- This new comer is put directly into window cache.
- Both window and main caches evicts an entry.
- These two unwanted entries are passed to admission filter that decides which of them will be introduced into the main cache, the other one being the thrown entry.
- The part of the total cache size allocated to window cache is dynamically tuned by stochastic hill climbing along the workload.
Note: Stochastic hill climbing is an optimization method that makes small changes on some system parameters and see the result on a performance metric. The changes causing benefits are kept. That can be viewed as a form of reinforcement learning.
Cost aware policies
Hit-ratio may sometimes not be the right score to optimize, especially when requests have heterogeneous costs, like search engines requests [14]. One can be interested in minimizing the global computation time of the workload, thus maximizing the score of cost savings:
\[cost\space\space savings\space\space score=1-\frac{cost\space\space with\space\space caching}{cost\space\space without\space\space caching}\]Note that “cost” will often refer to some sort of computation time, but things can be adapted to any other cost definition, like the cost in € of requests to a paid API.
A simple cost aware algorithm is the weighted LFU (w-LFU), it is similar to the LFU but this time the score associated with each entries is the product of the frequency by the cost, with an optional exponential decay:
\[score = cost*frequency*e^{-k*time}\]2.4.2.3. Cache scalability
Now that we have built a general view of eviction policies, we will dig into issues introduced by our specific needs. We talk about big data workloads, so basically we want to handle the three Vs: Volume, Velocity and Variety. Because in this work we only deal with structured data, we will consider the variety point off topic. Volume and velocity remains: we need to find a way to make our memoization scale spatially and temporally.
Temporal scaling
Chosing wisely an eviction policy based on its cache operations time complexity is crucial to scale to big data throughputs. The LRU and LFU implementation are considered to have $O(1)$ for both retrieves and insertions [15]. Note that this is based on the assumption that the used hash tables have also this constant time operations. For example for tree maps, this is based on the assumptions that the actual complexity $O(log_x(n_{entries}))$ equals $O(1)$ because $x$ is big enough with respect to the upper bound of $n_{entries}$ manipulated. This later statement can become false when dealing with extremely large data.
One can still find ways to achieve a true $O(1)$ for some operations using randomization to approximate a given algorithm. For example Random-LRU is an approximation algorithm of LRU with constant time eviction that does not require to perform any minimum value retrieving nor maintaining any sorted structure. It just picks randomly $k$ entries and evicts the least recently used.
Random-LRU-2 is a variant that stores two values for each entry, the last usage and the penultimate one. It compares the penultimate usage values when performing the eviction. This solves an issue faced by LRU algorithms known as the “one-hit wonder” problem: a very rare new entry that will almost never be used again will stay in LRU caches for quite a long time, protected by its accidental recency.
Memory scaling
When it comes to memory scaling, one can come up with three axes of improvement:
-
Minimize the space complexity of the cache management:
A cache lower bound space complexity is $\Theta(n_{contained\space entries})$, because it needs at least to store entries and values. The vast majority of previously presented algorithms does not change this complexity but there are exceptions and we must discard them. For example some implementations of LFU remember all the encountered entries scores, even for the evicted ones, leading to a $\Theta(n_{contained\space entries} + n_{evicted\space entries})$ space complexity. -
Maximize the available space usage:
We consider SSD or HDD accesses too slow for memoization purposes and we want our cache to be in-memory, i.e. we want cache’s data to be stored at least in Random Access Memory. Each running JVM applications have an associated part of the memory reserved for it, the so-called on-heap. The on-heap size is configurable and often chosen small because of the additional cost caused by Garbage Collection (GC) when this size grows. One can bypass these JVM considerations and leverage the entire memory space available on the system by accessing the complementary space called off-heap. This allows to get rid of GC issues for the cost of managing ourselves the memory allocations and releases. -
Cluster-wide caching: Our caching solution needs to be integrated into big data jobs running on clusters of several machines, called nodes, that can be leveraged. In that context a key point for hit-ratio performance is to be able to share caches’ data between all the nodes, thus going closer to true global-scoped reuse.
2.4.2.4. Tour of existing projects
Now that we have defined points that must be taken into account when designing a scalable memoization solution, we can start a tour of existing projects that might help us. They might be ready-to-use memoization solutions, or just cache libraries on which a layer of memoization can be built.
Out-of-the-box memoization solutions
The use of memoization to speed up big data processings is common and many programmers have already used it [16], but none of the existing implementations completely matches our needs:
- They all are too restrictive format of functions that can be memoized when we need a maximum flexibility.
- They are too much application-specific when we need a general-purpose solution
- The underlying cache lacks in scalability or do not implement any eviction policy at all.
Note that if one needs a light and fast memoization in Scala targetsl scales with no additional features, a great choice is to pick the Scalaz’s Memo.
Key-value scalable caches
We will now do a little tour of renowned projects exposing a key-value store API, paying attention to built-in eviction policies, off-heap storage feature, cluster mode feature and embedding possibilities in Scala applications.
-
Ehcache: Released in 2006, Ehcache is a famous cache solution in Java ecosystem, carried by Terracotta. It supports LRU, LFU, FIFO and Auto Ressource Control eviction policies. The downside is that to benefit from the off-heap storage capability called Big Memory and from the cluster mode it is mandatory to subscript to Terracotta paying offer.
-
Redis: Released in 2009, Redis is widely used in the industry. It is known to be fast and is totally open-source. It supports LRU, LFU and RR evictions policies. It leverages off-heap storage. Its cluster mode follows the asymmetric master-slaves model. It is not straightforward to embed it entirely, even if users-made external libraries exist.
-
Aerospike: Released in 2012, Aerospike is generaly defined as a NoSQL in-memory database. It supports only LRU eviction. It leverages off-heap storage. Its cluster mode follows a peer-to-peer mesh grid model. Like Redis, its Java API is client-oriented and it is not straightforward to embed it entirely.
-
Apache Ignite: Released in 2014, Ignite is a top level Apache Foundation project supported by GridGain. It is a multi face project but we will focus on its in-memory key-value API. It supports hybrid on-heap and off-heap storage. It supports FIFO and LRU eviction from on-heap to off-heap, Random-LRU and Random-LRU2 eviction for discarding from off-heap. Its cluster mode is similar to Aerospike, a peer-to-peer mesh grid where nodes discover each others across the network. The Java API allows to embed it out-of-the-box.
-
Caffeine: Realesed in 2015, Caffeine project describes itself as a near optimal caching library. This is the main implementation of the adaptive WC-W-TinyLFU eviction algorithm. Caffeine is made to be really fast but not to scale in cache size, thus there is no off-heap storage nor cluster mode. It is trivial to embed it through original Java 8 API or through the thin wrapper for Scala called Scaffeine.