Geospatial analysis with Azure Databricks

A few months ago, I wrote a blog demonstrating how to extract and analyse geospatial data in Azure Data Lake Analytics (ADLA). The article aimed to prove that it was possible to run spatial analysis using U-SQL, even though it does not natively support spatial data analytics. The outcome of that experience was positive, however, with serious limitations in terms of performance. ADLA dynamically provisions resources and can perform analytics on terabytes to petabytes of data, however, because it must use the SQL Server Data Types and Spatial assemblies to perform spatial analysis, all the parallelism capabilities are suddenly limited. For example, if you are running an aggregation, ADLA will split the processing between multiple vertices, making it faster, however, when running intersections between points and polygons, because it is a SQL threaded operation, it will only use one vertex, and consequently, the job might take hours to complete.

Since I last wrote my blog, the data analytics landscape has changed, and with that, new options became available, namely Azure Databricks. In this blog, I’ll demonstrate how to run spatial analysis and export the results to a mounted point using the Magellan library and Azure Databricks.

Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).

Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled.

In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

As always, first we need to import the libraries.

//Import Libraries
import magellan._
import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._

Next, we define the schema of the NYC Taxicab dataset and load the data to a DataFrame. While loading the data, we convert the pickup longitude and latitude into a Magellan Point. If there was a need, in the same operation we could also add another Magellan Point from the drop off longitude and latitude.

//Define schema for the NYC taxi data
val schema = StructType(Array(
     StructField("vendorId", StringType, false),
     StructField("pickup_datetime", StringType, false),
     StructField("dropoff_datetime", StringType, false),
     StructField("passenger_count", IntegerType, false),
     StructField("trip_distance", DoubleType, false),
     StructField("pickup_longitude", DoubleType, false),
     StructField("pickup_latitude", DoubleType, false),
     StructField("rateCodeId", StringType, false),
     StructField("store_fwd", StringType, false),
     StructField("dropoff_longitude", DoubleType, false),
     StructField("dropoff_latitude", DoubleType, false),
     StructField("payment_type", StringType, false),
     StructField("fare_amount", StringType, false),
     StructField("extra", StringType, false),
     StructField("mta_tax", StringType, false),
     StructField("tip_amount", StringType, false),
     StructField("tolls_amount", StringType, false),
     StructField("improvement_surcharge", StringType, false),
     StructField("total_amount", DoubleType, false)))

//Read data from the NYC Taxicab dataset and create a Magellan point 
val trips = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("mode", "DROPMALFORMED")
       .schema(schema)
       .load("/mnt/geospatial/nyctaxis/*")
       .withColumn("point", point($"pickup_longitude",$"pickup_latitude"))

The next step is to load the neighbourhood data. As mentioned in their documentation, Magellan supports the reading of ESRI, GeoJSON, OSM-XML and WKT formats. From the GeoJSON dataset, Magellan will extract a collection of polygons and read the metadata into a Map.

There are three things to notice in the code below. First, the extraction of the polygon, second, the selection of the key corresponding to the neighbourhood name and finally, the provision of a hint that defines what the index precision should be. This operation, alongside with the injection of a spatial join rule into Catalyst, massively increases the performance of the queries. To have a better understanding of this operation, read this excellent blog.

//Read GeoJSON file and define index precision
val neighborhoods = sqlContext.read
       .format("magellan")
       .option("type", "geojson")
       .load("/mnt/geospatial/neighborhoods/neighborhoods.geojson")
       .select($"polygon", 
        $"metadata"("neighborhood").as("neighborhood"))
       .index(30)

Now that we have our two datasets loaded, we can run our geospatial query, to identify in which neighbourhood the pickup points fall under. To achieve our goal, we need to join the two DataFrames and apply a within predicate. As a curiosity, if we consider that m represents the number of points (12.748.987), n the number of polygons (310) p the average # of edges per polygon (104) and O(mnp), then, we will roughly perform 4 trillion calculations on a single node to determine where each point falls.

//Inject rules and join DataFrames with within predicate
magellan.Utils.injectRules(spark)

val intersected = trips.join(neighborhoods)
.where($"point" within $"polygon")

The above code, does not take longer than 1 second to execute. It is only when we want to obtain details about our DataFrame that the computing time is visible. For example, if we want to know which state has the most pickups, we can write the following code which takes in average 40 seconds.

//Neighbourhoods that received the most pickups
display(intersected 
       .groupBy('neighborhood)
       .count()
       .orderBy($"count".desc))

If we want to save the data and identify which pickups fall inside the NYC neighbourhoods, then we have to rewrite our intersected DataFrame to select all columns except the Magellan Points and Polygons, add a new column to the DataFrame and export the data back to the blob, as shown below.

//select pickup points that don't fall inside a neighbourhood
val nonIntersected = trips
                       .select($"vendorId",$"pickup_datetime", $"dropoff_datetime", $"passenger_count", $"trip_distance", $"pickup_longitude", $"pickup_latitude", $"rateCodeId", $"store_fwd",$"dropoff_longitude", $"dropoff_latitude",$"payment_type",$"fare_amount", $"extra", $"mta_tax", $"tip_amount", $"tolls_amount", $"improvement_surcharge", $"total_amount")
                       .except(intersected)

//add new column intersected_flag
val intersectedFlag = "1"
val nonIntersectedFlag = "0"
val tripsIntersected = intersected.withColumn("intersected_flag",expr(intersectedFlag))
val tripsNotIntersected = nonIntersected.withColumn("intersected_flag",expr(nonIntersectedFlag))

//Union DataFrames
val allTrips = tripsNotIntersected.union(tripsIntersected)

//Save data to the blob
intersected.write
   .format("com.databricks.spark.csv")
   .option("header", "true")
   .save("/mnt/geospatial/trips/trips.csv")

In summary, reading a dataset with 1.8GB, apply geospatial analysis and export it back to the blob storage only took in average 1 min, which is miles better when compared with my previous attempt with U-SQL.

As always, if you have any comments or questions, do let me know.