Azure Databricks – overwriting table that is also being read from

Problem Definition

Recently I have reached interesting problem in Azure Databricks Non delta. I tried to read data from the the table (table on the top of file) slightly transform it and write it back to the same location that i have been reading from. Attempt to execute code like that would manifest with exception:“org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from”.

The lets try to answer the question How to write into a table(dataframe) that we are reading from as this might be a common use case?

This problem is trivial but it is very confusing, if we do not understand how queries are processed in spark.

I explain it on example.

Let define a process that would allow us to:

  • Filter out objects that we already processed.
  • Store history of loaded object for 3 days only (History need to take minimum amount of space on the file system)
  • Todays load will be part of history for tomorrow load

Process for this task would look like this:

  1. We Read CurrentLoad (new objects that we want to load in)
  2. Read limited Historic to last 3 days (objects that we already processed)
  3. Create new set called Unprocessed by removing existing History records from CurrentLoad (That is made via left anti join)
  4. Process/transform Unprocessed (optional process that may/may not use unprocessed further in further processing)
  5. Join only last 3 days of the history with Unprocessed (To maintain only last 3 days of history and keep the set as small as possible)
  6. Overwrite History

Above process is not the most efficient way of checking new load against historic loads, but it is a good example of overwriting dataframe/table that we might read from the same time.

Diagram of this process will look like this:

image

So let’s see what might go wrong in this process. We need to add Unprocessed Dataframe (which contains reference to History Dataframe) into existing History and at the same time we need to remove first day (to maintain only 3 days of history). This operation might be not as straight forward as it may seems. The reason is that Databricks by default use lazy execution it means that execution of the code is not happening immediately. Query is not going to be evaluated until last moment – that is when we will try to write data down on the file system. Spark is doing so to optimize the queries by creating execution plan. Execution plan helps spark to evaluate quires in way that would give the best performance. If we wait with processing of the Unprocessed Dataframe, it will hold reference to two sets History and Current load, while we try to write to History. It means that we are trying to perform a write operation to the Dataframe we are reading from.

There is simple solution for this problem.

Solution

Materialize the Dataframe that you want to overwrite data with (HistoryTemp) so as it clears out the dependencies on the tables that we read from (History, CurrentLoad). The simples way to do so is to write Dataframe (HistoryTemp) to the file system into temporary location and then re-read the data into new Dataframe. This will enable us to write data into History location. This approach requires to set a temp location your temp data. Make sure that you overwrite you HistoryTemp each time You could perform additional clean up on the end of your script to delete additional temp data and metadata (table, files).

This Process would look like:
1. Calculate new Unprocessed (CurrentLoad left anti join  History)
2. Union Unprocesed with current History to persist all records that we want to save (HistoryTemp)
3. Output HistoryTemp (overwriting set) to some temp location in the file system
4. Re-read the data from that we outputted (HistoryTemp) into new DataFrame
5. Write new Dataframe to you History location. Make sure that Unprocessed, History temp set is not used further in the notebook, so if you require to use it, perform write operation on the end
image

 

Code

You can download full notebook from Reference section.

image

image

What Did not work for me

I have seen some suggestions on different forums that I tried and they DID NOT WORK for me, and i was receiving the same exceptions:

  • Creating temp view on the data and cache it:
    Unprocessed.createOrReplaceTempView(“unprocessedTemp”)
    Unprocessed.cache()
  • Checkpointing the  Unprocessed dataframe with this commands:
    spark.sparkContesxt.setCheckpointDire(“dbfs:/unprocessed/checkpointDirectory”)
    Unprocessed.rdd.checkpoint()

Reference

  1. You can download code to play around from here:
  2. users with similar problems were posting: here