PySpark DataFrame Transformation Chaining

After working with Databricks and PySpark for a while now, its clear there needs to be as much best practice defined upfront as possible when coding notebooks. While you can get away with quite a bit when writing SQL – which is all too familiar to most of us now, the transition into other languages (from a BI background) requires a bit more thought in terms of coding standards. In this short blog, I will talk about multiple DataFrame transformations and what I believe to be the best way to go about structuring your code for them.

More often than not, if you have not come from a programming background or are unfamiliar with a language, you will end up writing spaghetti code. Essentially code that reads a bit like a book, with no real structure to it, causing debugging/maintenance issues. We’ve all been there, especially at the start of a project, just wanting to get some output – but its worth trying to bear in mind some coding rules while designing to save the refactoring a few weeks down the line.

Jumping into the subject matter of the blog, this came about because I noticed some of our project code when it came to transformations was made up of re-assigning to a new data frame on each transformation. While functionality it was acceptable, it didn’t feel the right way to go about things. The same can be said with re-using the same data frame (although this caused other dependency issues). It also left us with huge chunks of code which was only possible to read by using the comments to see where some transformations stopped and others started.

After a small bit of research I discovered the concept of monkey patching (modifying a program to extend its local execution) the DataFrame object to include a transform function. This function is missing from PySpark but does exist as part of the Scala language already.

The following code can be used to achieve this, and can be stored in a generic wrapper functions notebook to separate it out from your main code. This can then be called to import the functions whenever you need them.

# Monkey patch the DataFrame object with a transform method
from pyspark.sql.dataframe import DataFrame

def transform(self, f):
    return f(self)

DataFrame.transform = transform

As part of your main code, we can then wrap the transformations into functions, passing in and returning a DataFrame. A separate statement can then be called specifying transform on the original DataFrame and the list of functions (transformations) you want to pass in. By using this method, the code is almost self-documenting as its clear what transformations you’ve then applied to move a DataFrame from one context into another. The example below only includes 2 transformations, but imagine you have 20+ to implement – this makes the code much easier to read and debug should there be an issue.

# Create started/finished timestamps and int dates in UTC as new columns in the DataFrame.
def append_utc_dates(df):
  df = df.withColumn("finishedTimestampUTC", to_utc_timestamp(col("finished"), "EST")) 
         .withColumn("startedTimestampUTC", to_utc_timestamp(col("started"), "EST")) 
         .withColumn("startedDateUTC", ConvertDateTimeToIntDate(col("startedTimestampUTC")))
         .withColumn("finishedDateUTC", ConvertDateTimeToIntDate(col("finishedTimestampUTC")))   
  return df

# Adds new attribute based on hand structure in to the DataFrame
def append_BB(df):
  df = df.join(refHandStructure, df["structure"] == refHandStructure["structure"], "left") 
         .select(df["*"], refHandStructure["handStructureName"]) 
         .withColumn("BB",when(col("HSName") == "Limit", col("x")).otherwise(col("y")))
  df = df.drop("handStructureName")
  return df

# Perform hand transformations
dfTransformed = dfRaw.transform(append_utc_dates) 
                     .transform(append_BB)
      
display(dfTransformed)

You should then use new DataFrames for changes in grain or changes in purpose (not just for each transformation).

This technique probably shouldn’t be used for some of the more basic transformations, especially if you only have 1 or 2, but more so when you have 50/100+ lines of code.  Not everything needs to be wrapped into functions, but it certainly reads better for larger notebooks!

One thought on “PySpark DataFrame Transformation Chaining

Comments are closed.