How can you handle skewed data in Hive or Spark?
How can we get rid of data skewness in Spark or Hive?
What is skewed data and how we can handle it?
The above are the commonly asked in interview questions and also it is super important to know about it because Data Engineers who handle huge amounts of data will definitely come across this situation.
What is Skewness in Data?
Data skew means data is distributed unevenly or asymmetrically.
Let's try to understand this in better way. Assume that you are data engineer and working at some organization. You got a task to analyze huge amounts of data of people from different countries. You designed a MapReduce job for that and it is taking lot of time. Now you checked the status of the job on UI. Which shows that all the MR jobs completed except for India and China.
MapReduce job will launch several Mappers and Reducers. Reducers will start once Mappers done with their tasks. Reducers will process the data based on Keys. If we have used country name as key, then the reducers working on India and China data will take lot of time as the population in these countries is more compared to other countries. So the other reducer jobs will be completed but not for these two countries. As the population of India and China is more, the data will contain more number of rows for these two countries and the same is lesser for other countries. So this kind of unevenly distributed data will be called as Skewed Data.
How can we handle skewed data in Hive?
Skewed Table:
Skewed table means, a table which has some of the values more in number compared to other values. Values those appear frequently will be splitted into separate files and other data will be in separate file. In this case two operations will happen.
- automatic splitting into files
- skipping/including whole file if possible
Below is the command to create a skewed table:
CREATE TABLE TableName (col_1 STRING, col_2 STRING)
SKEWED BY (col_1) ON ('value1','value2');
List Bucketing:
This is enhanced version of skewed table. In this table separate directories will be created for each skewed key and corresponding skewed data of that key will go into the directory. Remain non-skewed data will into separate directory. In this case the main operations are:
- directory per key
- for small skewed keys quantities
Below is the command to create this table:
CREATE TABLE TableName (col_1 STRING, col_2 STRING)
SKEWED BY (col_1) ON ('value1','value2') STORED AS DIRECTORIES;
And also know about Skew Join in Hive.
Setting up the Hive properties to handle skewness:
While performing join operation with skewed data it is better to set below properties as shown here:
hive.optimize.skewjoin=true;
hive.skewjoin.key = 500000;
And while performing in group by below parameters to be set:
hive.groupby.skewindata = true;
How can we handle data skewness in Spark?
Skew Join:
Spark SQL allows users to use skew join like Hive does. But while supplying SQL query to Spark SQL we need specify the skewed table as shown in below command:
SELECT /*+ SKEW('table1') */ *
FROM table1 AS t1, table2 AS t2
WHERE t1.col_name = t2.col_name;
Data Broadcast:
When we joining a skewed dataset with other dataset it is better to broadcast the smaller dataset. By doing so you will avoid shuffle and have more chance to keep evenly distributed partitions. But we should be careful about broadcast join as it can lead to out of memory issues. For safety purpose we can increase the below property value to fit enough amount of broadcast dataset into memory.
spark.sql.autoBroadcastJoinThreshold
Data Preprocessing:
If there are too many null values in the data then join operation may lead to skew the entire process. So preprocess the data before starting the actual process.
Salting:
This is the best solution for solving the data skewness and it is guaranteed that it will reduce the skewness. But salting is little bit complicated solution. What is salting? Salting is to changed the skewed data into evenly distributed data by introducing alias names for all the values which are appearing more frequently.
Below is the code that shows how salting will be done by changing the commonly appearing values names into different names to avoid skewness.
val users = Seq(("user1"), ("user2"), ("user3")).toDF("id")
val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1"), (5L, "user1")
, (6L, "user1"), (7L, "user1")).toDF("order_id", "user_id")
// 1) Take a bigger dataset and add a column with some randomness
// Here I'm simply adding a number between 0 and 2
val ordersWithSaltedCol = orders.withColumn("order_join_key", functions.concat($"user_id",
functions.floor(functions.rand(2) * 2))
)
// 2) Later add a new column to cover all random possibilities.
// Create one line for each possibility. Here I'm using explode function.
val usersWithSaltedCol = users.withColumn("salt", functions.array(functions.lit(0), functions.lit(1), functions.lit(2)))
.withColumn("user_salt", functions.explode($"salt"))
.withColumn("user_join_key", functions.concat($"id", $"user_salt"))
// 3) Make a join with the salted column
val result = usersWithSaltedCol.join(ordersWithSaltedCol, $"user_join_key" === $"order_join_key")
val mappedUsers = result.collect().map(row => s"${row.getAs[String]("id")}_${row.getAs[Int]("order_id")}")
mappedUsers should have size 7
mappedUsers should contain allOf("user1_1", "user2_2", "user3_3", "user1_4", "user1_5", "user1_6", "user1_7")
result.show(true)
Please comment your thoughts about this post..