Spark, discarding bad rows, and tracing them
These years I have been struggling with handling errors in Spark. One tries to anticipate which errors could happen, which combination of operations could lead to an Exception, and know what to do next. But let's be honest: Murphy's Law is always in effect. In this post I am going to go through how I usually handle errors in Apache Spark.
Some context
Suppose that you are going to create a batch process in Spark using Scala 2. Your client, which is a fitness app, wants to know how much they are going to earn next month with their pro subscriptions. They know input data could be better, but they want this process not to fail unexpectedly; they want to finish and provide results as best as possible. In other words, they don't want the process to fail. Never.
Those are strong words, but you like the challenge. Your client also knows how big is the challenge, so they want to get some trace of what failed unexpectedly and output it in a table so that they can check it for the next time and correct this for the future. This could be a data quality issue, or process logic issue.
Creating a case scenario
Suppose that we have data of future scheduled subscriptions, in our metastore as next_payments
:
amount | date | customerId |
---|---|---|
29.99 | 2022-01-01 | 1234 |
10.00 | 2022-02-01 | 5678 |
Also , we have a table of customer status, customer_status
:
customerId | active |
---|---|
1234 | true |
5678 | false |
These tables could be represented as Dataset[T]
with these case classes:
case class Purchase(amount: Double, date: java.sql.Date, customerId: Int)
case class CustomerStatus(customerId: Int, status: Boolean)
the output table should be something like this:
amount | month |
---|---|
29.99 | 2022-01 |
0 | 2022-02 |
Notice that customer 5678
is inactive, so the payment should not be in next_payments
.
But it's there. So what do we do? And more important: how we code this so that it never fails?
Expecting the unexpected
The goal is to be completely fault tolerant and have a traceability of the discarded data to analyze and fix them for the next executions. The first thing you probably think is to validate our inputs before proceeding to encode our dataframe as Dataset[Purchase]
.
First layer of error handling: expected things
We could define a new dataframe called validated_next_payments
like this:
But remember! We want to keep the discarded ones. So we have to make another query to get those rows:
This keeps trace of the discraded next_payments
but we get this dataframe:
amount | date | customerId |
---|---|---|
10.00 | 2022-02-01 | 5678 |
Why it failed? We have to check into customer_status
to realize that customerId 5678
is inactive. So the client wants to know easily why it was discarded.
So you make another attempt.
Attempt #2: Giving proper error handling in validations
You dig into Spark SQL and you notice that arrays are some data type accepted in spark. So, what if I create an array of validations, and collect all the validations? that seems good. Let's try:
WITH error_next_payment AS (
SELECT
NP.*,
ARRAY(
("customerId cannot be inactive" msg, IF(NOT CS.active, true, false) as is_error)
-- And other validations with struct (msg AS STRING, is_error AS BOOLEAN)
) as error_array
FROM next_payments NP
LEFT JOIN customer_status CS)
SELECT
customerId,
CONCAT_WS(' | ', transform(filter(`error_array`, el -> el.is_error)), errs -> errs.msg) AS errors
FROM error_next_payment
WHERE size(filter(`error_array`, el -> el.is_error)) > 0
Let's explain this:
- First, we create
error_next_payment
where we collect an array of all validations in an array of a struct of string and boolean, which tells us the validation name and whether it is an error (true) or not (false) - Then we select this query to collect all the errors and concat all the messages with spaced-around pipe
|
and filter those rows where there is at least one error.
This is much better, we have a better understanding of what's happening:
amount | date | customerId | errors |
---|---|---|---|
10.00 | 2022-02-01 | 5678 | customerId cannot be inactive |
Now that we have our discarded rows, we can get the valid ones:
Then we can go to Scala and do this:
import spark.implicits._
val nextPurchases: Dataset[Purchase] = spark.sql("SELECT * from validated_next_payments").as[Purchase]
Second (and ultimate) layer of error handling: unexpected things
As I previously mentioned, Murphy's law is always present with us, so what if after the validations, we start aggregating and doing some business logic:
val advancedMetrics: Dataset[AdvancedMetrics] =
nextPurchases
.map(purchase => AdvancedMetrics(purchase))
and, during the initialization of the AdvancedMetrics, something bad happens; something that we didn't expect? Spark would try to retry the job until the max number of attempts have arrived, and then the spark application would be FAILED
. So we are not achieving or main goal. How do we get rid of this?
Then I realized that we could encapsulate this into a Try
:
val advancedMetrics: Dataset[Try[AdvancedMetrics]] =
nextPurchases
.map(purchase => Try(AdvancedMetrics(purchase)))
So then we could do some Scala pattern matching and get all the Success[T]
and get all the Failure[Throwable]
and convert them into a case class of the form of the first layer, something like
case class Error(customerId: String, errors: String)
But the problem here is that Try[AdvancedMetrics]
does not have an encoder in spark.implicits
.
So we use Encoders.kryo
to serialize into kryo. Easy fix, but bigger sizes and more shuffle, especially if you are using that dataset afterwards for joining, for example.
Also, if you are going to get all the errors and convert into a Dataset[Error]
, you don't have the customerId
anymore; you would have to do it twice.
So that's where Either
comes to rescue:
implicit val tryAdvancedMetricsEncoder: Encoder[AdvancedMetrics] =
Encoders.kryo[Either[AdvancedMetrics, Error]]
val advancedMetrics: Dataset[Either[AdvancedMetrics, Error]] =
nextPurchases
.map(purchase => Try(AdvancedMetrics(purchase)) match {
case Success(value) => Left(value)
case Failure(exc) => Right(Error(purchase.customerId, exc.getLocalizedMessage)) // For example...
}
)
Either[A,B]
also has the problem of encoding: it does not have an encoder from spark.implicits
. This leads us to use Kryo
, but what can we do?
Well, we could make a little transformation, we could transform into something that Spark likes so much, something like Dataset[(List[AdvancedMetrics), List[Error])]
:
val advancedMetrics: Dataset[(List[AdvancedMetrics], List[Error])] =
nextPurchases
.map(purchase => Try(AdvancedMetrics(purchase)) match {
case Success(value) => (value :: Nil, Nil)
case Failure(exc) => (Nil, Error(purchase.customerId, exc.getLocalizedMessage)) :: Nil) // For example...
}
)
This has the advantage of using spark primitives and therefore spark.implicits
apply here, so you don't have to serialize into kryo.
But let's be honest, this is quite dirty and we don't want to mess this code with collateral effects in order to Spark to work with Encoders.Product
.
So this is probably one of the few cases that maybe implicit conversions would be good?
So here we are supercharging Either[A,B]
by providing it a new method called encode
, which will transform into Tuple[List[A], List[B]]
.
So our main code would be like:
import SparkFriendlyConversions._
val advancedMetrics = // We can even use type inference
nextPurchases
.map(purchase => {
val eitherMetrics = Try(AdvancedMetrics(purchase)) match {
case Success(value) => Left(value)
case Failure(exc) => Error(purchase.customerId, exc.getLocalizedMessage)
}
eitherMetrics.encode
}
)
val goodAdvancedMetrics: Dataset[AdvancedMetrics] =
advancedMetrics.flatMap(_._1)
val errorsMetrics: Dataset[Error]=
advancedMetrics.flatMap(_._2)
This makes our main code readable and we get rid of the collaterals we have to make in order to still use the default encoders.
Conclusion
Making a Spark process fault tolerant in terms of business logic could be hard, especially if these of the following factors appear:
- Data quality is bad
- Data is not strongly typed (i.e. you are using relaxed types like
String
) - You are not well aware of the consequences of your business logic
For that, encapsulating your transformations in Try[T]
and always giving traceability of your discarded rows will give you a high sensibility of what's happening.
Performance is one of the priorities to take into account at the time of the writing. That is why we get rid of Kryo encoding and try to use Product
encoding.
Also, this applies if your business logic is too complex (much more than what it has been presented in this post), and DataFrame
is not your best friend for that.
What should be in a perfect world
This case scenario has been created in the worst world, where the data quality is a mess, and you are not sure about the business logic and what could happen. But in order to become a better world, consider these:
- Try to validate your data as soon as possible, and make it available for all the spark processes that are going to use the same inputs as yours. You filter out the bad rows and makes your processes less redundant because of having to assume everytime that data is not good.
- Strongly type your databases and sources, in order to reduce massively the probability of bad inputs.
Where to go next
I have in mind to give a try the spark Accumulators
but that brings me a lot of questions unresolved for me now like:
- What if EVERYTHING is discarded?
- How about duplicity? The discard would be created an amount of times equal of the times that the spark stage is executed (this is well known).
Thank you for reading, and feel free to contact me to discuss about this!