Spark, discarding bad rows, and tracing them

In this post you can find how I managed to discard errors in Apache Spark using Scala, with Datasets and case classes.

Spark, discarding bad rows, and tracing them
Sometimes one does not expect the unexpected. - Photo by Mulyadi / Unsplash

These years I have been struggling with handling errors in Spark. One tries to anticipate which errors could happen, which combination of operations could lead to an Exception, and know what to do next. But let's be honest: Murphy's Law is always in effect. In this post I am going to go through how I usually handle errors in Apache Spark.

Some context

Suppose that you are going to create a batch process in Spark using Scala 2. Your client, which is a fitness app, wants to know how much they are going to earn next month with their pro subscriptions. They know input data could be better, but they want this process not to fail unexpectedly; they want to finish and provide results as best as possible. In other words, they don't want the process to fail. Never.

Those are strong words, but you like the challenge. Your client also knows how big is the challenge, so they want to get some trace of what failed unexpectedly and output it in a table so that they can check it for the next time and correct this for the future. This could be a data quality issue, or process logic issue.

Creating a case scenario

Suppose that we have data of future scheduled subscriptions, in our metastore as next_payments :

amount date customerId
29.99 2022-01-01 1234
10.00 2022-02-01 5678

Also , we have a table of customer status, customer_status:

customerId active
1234 true
5678 false

These tables could be represented as Dataset[T] with these case classes:

case class Purchase(amount: Double, date: java.sql.Date, customerId: Int)

case class CustomerStatus(customerId: Int, status: Boolean)

the output table should be something like this:

amount month
29.99 2022-01
0 2022-02

Notice that customer 5678 is inactive, so the payment should not be in next_payments.

But it's there. So what do we do? And more important: how we code this so that it never fails?

Expecting the unexpected

The goal is to be completely fault tolerant and have a traceability of the discarded data to analyze and fix them for the next executions. The first thing you probably think is to validate our inputs before proceeding to encode our dataframe as Dataset[Purchase].

First layer of error handling: expected things

We could define a new dataframe called validated_next_payments like this:

SELECT
NP.*
FROM next_payments NP
LEFT JOIN customer_status CS
ON NP.customerId = CS.customerId
WHERE NOT CS.active
-- AND another validation
-- AND another validation
-- AND so on...
validated_next_payments

But remember! We want to keep the discarded ones. So we have to make another query to get those rows:

SELECT
NP.*
FROM next_payments NP
LEFT ANTI JOIN validated_next_payments VNP
WHERE NP.customerId = VNP.customerID
discarded_next_payments

This keeps trace of the discraded next_payments but we get this dataframe:

amount date customerId
10.00 2022-02-01 5678

Why it failed? We have to check into customer_status to realize that customerId 5678 is inactive. So the client wants to know easily why it was discarded.

So you make another attempt.

Attempt #2: Giving proper error handling in validations

You dig into Spark SQL and you notice that arrays are some data type accepted in spark. So, what if I create an array of validations, and collect all the validations? that seems good. Let's try:

WITH error_next_payment AS (
  SELECT
    NP.*,
    ARRAY(
      ("customerId cannot be inactive" msg, IF(NOT CS.active, true, false) as is_error)
      -- And other validations with struct (msg AS STRING, is_error AS BOOLEAN)
      ) as error_array
  FROM next_payments NP
  LEFT JOIN customer_status CS)

SELECT
  customerId,
  CONCAT_WS(' | ', transform(filter(`error_array`, el -> el.is_error)), errs -> errs.msg) AS errors
FROM error_next_payment
WHERE size(filter(`error_array`, el -> el.is_error)) > 0

Let's explain this:

  • First, we create error_next_payment where we collect an array of all validations in an array of a struct of string and boolean, which tells us the validation name and whether it is an error (true) or not (false)
  • Then we select this query to collect all the errors and concat all the messages with spaced-around pipe | and filter those rows where there is at least one error.

This is much better, we have a better understanding of what's happening:

amount date customerId errors
10.00 2022-02-01 5678 customerId cannot be inactive

Now that we have our discarded rows, we can get the valid ones:

SELECT
NP.*
FROM next_payment NP
LEFT ANTI JOIN errors_next_payment ENP
ON NP.customerId <=> ENP.customerId
validated_next_payments

Then we can go to Scala and do this:

import spark.implicits._

val nextPurchases: Dataset[Purchase] = spark.sql("SELECT * from validated_next_payments").as[Purchase]

Second (and ultimate) layer of error handling: unexpected things

As I previously mentioned, Murphy's law is always present with us, so what if after the validations, we start aggregating and doing some business logic:

val advancedMetrics: Dataset[AdvancedMetrics] = 
  nextPurchases
    .map(purchase => AdvancedMetrics(purchase))

and, during the initialization of the AdvancedMetrics, something bad happens; something that we didn't expect? Spark would try to retry the job until the max number of attempts have arrived, and then the spark application would be FAILED. So we are not achieving or main goal. How do we get rid of this?

Then I realized that we could encapsulate this into a Try :

val advancedMetrics: Dataset[Try[AdvancedMetrics]] = 
  nextPurchases
    .map(purchase => Try(AdvancedMetrics(purchase)))    

So then we could do some Scala pattern matching and get all the Success[T] and get all the Failure[Throwable] and convert them into a case class of the form of the first layer, something like

case class Error(customerId: String, errors: String)

But the problem here is that Try[AdvancedMetrics] does not have an encoder in spark.implicits.

So we use Encoders.kryo to serialize into kryo. Easy fix, but bigger sizes and more shuffle, especially if you are using that dataset afterwards for joining, for example.

Also, if you are going to get all the errors and convert into a Dataset[Error], you don't have the customerId anymore; you would have to do it twice.

So that's where Either comes to rescue:

implicit val tryAdvancedMetricsEncoder: Encoder[AdvancedMetrics] = 
 Encoders.kryo[Either[AdvancedMetrics, Error]]

val advancedMetrics: Dataset[Either[AdvancedMetrics, Error]] = 
  nextPurchases
    .map(purchase => Try(AdvancedMetrics(purchase)) match {
        case Success(value) => Left(value)
        case Failure(exc) => Right(Error(purchase.customerId, exc.getLocalizedMessage)) // For example...
      }
    )
 

Either[A,B] also has the problem of encoding: it does not have an encoder from spark.implicits. This leads us to use Kryo, but what can we do?

Well, we could make a little transformation, we could transform into something that Spark likes so much, something like Dataset[(List[AdvancedMetrics), List[Error])]:

val advancedMetrics: Dataset[(List[AdvancedMetrics], List[Error])] = 
  nextPurchases
    .map(purchase => Try(AdvancedMetrics(purchase)) match {
        case Success(value) => (value :: Nil, Nil)
        case Failure(exc) => (Nil, Error(purchase.customerId, exc.getLocalizedMessage)) :: Nil) // For example...
      }
    )

This has the advantage of using spark primitives and therefore spark.implicits apply here, so you don't have to serialize into kryo.

But let's be honest, this is quite dirty and we don't want to mess this code with collateral effects in order to Spark to work with Encoders.Product.

So this is probably one of the few cases that maybe implicit conversions would be good?

trait SparkEncoder {
  def encode: Any
}

object SparkFriendlyConversions {
  implicit class SparkFriendlyEither[A, B](either: Either[A, B]) extends SparkEncoder {
  def encode: (List[A], List[B]) = either match {
    case Left(a) => (a :: Nil, Nil)
    case Right(b) => (Nil, b :: Nil)
    case _ =>
      throw new Exception("impossible")
    }
  }
}
In scala 3 this wont work, but let's enjoy in the meanwhile.

So here we are supercharging Either[A,B] by providing it a new method called encode, which will transform into Tuple[List[A], List[B]].

So our main code would be like:

import SparkFriendlyConversions._

val advancedMetrics = // We can even use type inference 
  nextPurchases
    .map(purchase => {
       val eitherMetrics = Try(AdvancedMetrics(purchase)) match {
          case Success(value) => Left(value)
          case Failure(exc) => Error(purchase.customerId, exc.getLocalizedMessage)
        }
        
        eitherMetrics.encode
      }
    )

val goodAdvancedMetrics: Dataset[AdvancedMetrics] = 
  advancedMetrics.flatMap(_._1)

val errorsMetrics: Dataset[Error]=
  advancedMetrics.flatMap(_._2)

This makes our main code readable and we get rid of the collaterals we have to make in order to still use the default encoders.

Conclusion

Making a Spark process fault tolerant in terms of business logic could be hard, especially if these of the following factors appear:

  • Data quality is bad
  • Data is not strongly typed (i.e. you are using relaxed types like String)
  • You are not well aware of the consequences of your business logic

For that, encapsulating your transformations in Try[T] and always giving traceability of your discarded rows will give you a high sensibility of what's happening.

Performance is one of the priorities to take into account at the time of the writing. That is why we get rid of Kryo encoding and try to use Product encoding.

Also, this applies if your business logic is too complex (much more than what it has been presented in this post), and DataFrame is not your best friend for that.

What should be in a perfect world

This case scenario has been created in the worst world, where the data quality is a mess, and you are not sure about the business logic and what could happen. But in order to become a better world, consider these:

  • Try to validate your data as soon as possible, and make it available for all the spark processes that are going to use the same inputs as yours. You filter out the bad rows and makes your processes less redundant because of having to assume everytime that data is not good.
  • Strongly type your databases and sources, in order to reduce massively the probability of bad inputs.

Where to go next

I have in mind to give a try the spark Accumulators but that brings me a lot of questions unresolved for me now like:

  • What if EVERYTHING is discarded?
  • How about duplicity? The discard would be created an amount of times equal of the times that the spark stage is executed (this is well known).

Thank you for reading, and feel free to contact me to discuss about this!