PySpark Planning — A Guide for 1st Timers
This is the 2nd post in the PySpark XP series (click for the 1st), and its main theme is Spark plans.
In this post you will learn how to decipher Spark plans, and learn ways to leverage your new knowledge in order to optimize Spark jobs performance.
Tip #1 —Just try
Some Spark developers avoid reading the plan printed by
.explain() at all cost.
I get it, it might be frightening.
One may look online for the meaning of each curse that appears on the plan — I believe it will be beneficial for programming better Spark applications.
On the other hand, one can just try and play with the plan. I believe it will be a better learning experience.
Lets try something out —
We created 2 dataframes, each with 1 integer column, which consists of 2M rows with integers up to 2M. We then joined them. The resulting dataframe,
df, is made of 2 columns where each row contains the same value twice.
Let us try and guess what Spark does under the hood —
Just from looking briefly, I see something called SortMergeJoin, so this helps understanding the “under the hood” of this join execution.
Moreover, I bet you could interpret most of these lines without a problem.
Take a minute to do so.
Now let’s look. For a start —
- Rows where Range appears looks like our use of the
.range()function to generate data.
- Rows where Project appears looks like our “select” clause over the generated data using
.range(). The first one is our calculation of
3*id % 5with the column naming, and the second one is just the column naming.
And now a question — what is Exchange?
Tip #2 — Exchange
When you decipher Spark plans, you will probably encounter the word Exchange.
It translates to shuffling data.
Spark moves data from one executor to another, to be able to process the wanted dataframe’s data.
A common exchange method is hashpartitioning — just as in Figure 2.
Lets observe the lower hashpartitioning row in the plan.
This tells us that Spark distributes the data of
df2 using a hash over
column_2 into 200 different partitions, over our executors.
More on the magic number 200 on the 4th tip.
Tip #3 —Skewed exchanging
What about the first hashpartitioning row? Is it different than the 2nd?
Lets look —
I guess it looks the same, give or take.
Wait a second…
column_1 holds only 5 values, since we used the modulo operator with 5 over it.
This means that
df1's data is distributed into at most 5 partitions ! At least 195 partitions are empty…
Lets check that out —
And I thought Spark is supposed to be highly parallelized…
This is a common case of a very slow, or rather stuck, Spark application. Many times it corresponds to the case we see a stage has 200 tasks, and 199 of them finished already.
Understanding Spark’s mechanism, and hashpartitioning in particular, helps us diagnose our application, and realize where bottlenecks and low concurrency may occur.
How will you solve it? Keep reading to the 5th tip to see my suggestion.
Tip #4— What is this 200 magic number?
This of course it the default value of Spark’s
This parameter decides how many partitions a shuffling operation will produce.
For example, Join and groupBy can cause shuffling. This practically ensures that your Spark application encounters this parameter.
Therefore its fine tuning can yield enhanced performance of your application.
Check out the notebook to see how to change this parameter.
Tip #5 — Solving skewness
There are several ways approaching this problem.
Over some datasets one could leverage prior knowledge to overcome this problem.
If we fall to the other case:
- In Spark 3 there is a new feature called adaptive query execution that “solves” the problem automatically.
- Otherwise, there is a method called salting that might solve our problem.
Many posts were written regarding salting (a reference at the end of this post), which is a cool trick, but not very intuitive at first glance.
The following is the concept of salting. For a more thorough explanation check out the notebook for a code example (same notebook from the 4th tip).
- We concatenate a random value to the key of the “OK” dataframe (not the one that cause the skewed data),
df2in our case. An integer from 1 to 100 for example (this is the salting value).
- Now we would like a join command with
df1to yield the desired result. We will do so by…
df1100 times (the same as the salting value), and add a salt column, with values 1 to 100 (a different value for each row’s replica).
- Join the dataframes with — the salted key in
df2with the concatenation of the key and the salt in
Now it is your turn to try and understand Spark plans. Good luck!
Thanks for reading my 5 tips about PySpark planning.