Home

Works × Notes × LinkedIn × StackOverflow × Github

Context of this work

During this past two years of master, my time has been split between learning at school and at Orange. My work touched mainly software designing and data engineering.
For this master’s thesis I have been allowed to work on a subject independent from my day-to-day projects at Orange.

1. Introduction to problematic

I am going to briefly explain where does this thesis problematic comes from, beginning with an overview of Spark.

1.1. Spark overview

Spark is a top level project of the Apache Foundation that became a key framework for cluster-computing during the last years. What makes Spark so appealing is its fluent API which makes distributed jobs easy to write and to reason about, its state-of-the-art performances and its versatility. Spark is efficient from batch to streaming, with many analytics and machine learning features.
Among the four languages supported (Java, Scala, Python, R), we will use the Scala API in this work because this is the one I find myself the more productive with.

1.1.1. Spark Core

Spark Core programming paradigm is based on manipulating Resilient Distributed Datasets (RDDs) in a functional fashion. One can apply operations like map or reduce which are familiar to users who moved from Apache MapReduce. It allows to write concise abstract programs, totally blind to the underlying cluster management and computations distribution.
Operations applied on RDDs are of two types:

  1. The transformations:
    val wordCounts:  RDD[(String, Int)] = 
      wordsRDD.map(word => (word, 1)).reduceByKey(_ + _)  
    

These operations are called “lazy”, not triggering any data processing because they do not require any output production. They just add execution steps in the returned new RDD.

  1. The actions:
    val res: Array[(String, Int)] = wordCounts.collect()  
    

These operations (count, collect, saveAsTextFile…) trigger the execution of the job because they require an output. In the example, collect is returning a Scala’s array to the user.

1.1.2. Spark SQL

Spark SQL is a module built on top of Spark Core that has been first released in 2014 with Spark 1.0.0. The version of the module coming with Spark 1.3.0 has been presented in 2015 in a central article [1].

Users no longer manipulate RDDs but DataFrames built on top of them. Users can use the DataFrame’s Domain Specific Language (DSL) or SQL, which are two APIs equivalent and integrated together.

Spark SQL’s main feature is its Catalyst optimizer that acts as a compiler between SQL semantics and RDDs jobs. The role of this optimizer is to determine the most efficient execution plan for the processing that the user described, by applying extendable optimization rules at different points.

Spark SQL comes with many built-in SQL functions that helps solving the majority of analytic problems, but it also lets the user write custom User Define Functions (UDF) to address specific needs.

1.2. Definition of the problematic

Catalyst works like a charm performing predicate push-down, constant folding, null propagation and others. But one can figure out that a game changer feature might be missing: The sharing of deterministic UDFs’ results.

1.2.1. Determinism

The concept of deterministic functions is not that clear. A function is said to be deterministic if it always returns the same output given a particular input, but unlike referentially transparent functions it is also allowed to perform weak side effects. The allowed side effects are context dependent and we will stick to the Spark SQL’s team view: Deterministic functions must face retries in case of failure without changing the result of the job [2].

Note: A boolean flag .deterministic is associated to each UDF. By default UDFs are deterministic but you can convert them into non-deterministic ones with .asNondeterministic().

1.2.2. Problematic

The main problematic is that some UDFs can be very expensive, like finding if a large integer $n$ is primary [that is $O(log(n)^{12})$ in time with AKS primality test] or requesting a deterministic API of a distant server. Each of these function’s calls may take few milliseconds, and when multiplied by the dimensions of big data workloads they become bottlenecks.

Given f an expensive deterministic UDF, we want at least Catalyst to internally optimize the query

SELECT col2, sum(f(col1)), avg(f(col1)) 
  FROM table 
  GROUP BY col2

so that f would only be run one time per row instead of two. We will call this reuse type row-scoped reuse. A second optimization would be to have f ran only once per distinct value of col1, we will call this global-scoped reuse.

I figured out that none of these two optimizations are actually performed and this became the problematic I investigated in this work: Achieve global-scoped or row-scoped reuse of Spark’s deterministic UDFs results in a way as natural as possible.