Working with interactive clusters in Databricks makes it possible to manually install libraries using the workspace UI. It’s done once and for all. You don’t have to worry about it anymore.
Creating job clusters is another story. You must take care of library installations prior to executing the notebooks which reference these libraries. In the case you’re using Azure Data Factory to orchestrate the whole process you’re lucky, because appending libraries to job clusters is an out-of-the-box functionality. For all other scenarios using the Databricks REST API is one possible option. In fact, you can do this right from a Python notebook. That’s what I’m going to demonstrate in the following lines.
Let’s have a look at the REST API documentation first. We’ll be using the Cluster Status and Install endpoints only. For installing a library, we need to provide the library source and its properties. We need to create a proper HTTP request body in JSON format including the library source and properties. Here’s one example:
{ "pypi": { "package": "simplejson", "repo": "https://my-pypi-mirror.com" } }
Here “pypi” is the source and {“package”: “simplejson”, “repo”: “https://my-pypi-mirror.com”} are its properties. For more examples check the documentation here.
Let’s create a new Python notebook and then some functions in it. The code is simple and self-explanatory. Comments are available where needed.
We’ll be using only json and requests libraries:
import json import requests from requests.auth import HTTPBasicAuth
Now, let’s create a REST API wrapper function:
def adbAPI(endPoint, body, method = "GET", region, token): """Execute HTTP request against Databricks REST API 2.0""" domain = region + ".azuredatabricks.net" baseURL = "https://%s/api/" % (domain) if method.upper() == "GET": response = requests.get( baseURL + endPoint , auth = HTTPBasicAuth("token", token) , json = body ) else: response = requests.post( baseURL + endPoint , auth = HTTPBasicAuth("token", token) , json = body ) return response
As parameters we’ll take the API endpoint, HTTP request body, HTTP method (GET or POST), Databricks workspace region (westeurope, northeurope, etc.) and, finally, a Databricks token.
Next is a helper function for translating the library status response into a human readable format:
def describeClusterLibraryState(source, clusterId, status): """Converts cluster library status response to a verbose message""" result_map = { "NOT_INSTALLED" : "{} library is not installed on cluster {}.".format(source.title(), clusterId) , "INSTALLED" : "{} library is already installed on cluster {}.".format(source.title(), clusterId) , "PENDING" : "Pending installation of {} library on cluster {} . . .".format(source, clusterId) , "RESOLVING" : "Retrieving metadata for the installation of {} library on cluster {} . . .".format(source, clusterId) , "INSTALLING" : "Installing {} library on cluster {} . . .".format(source, clusterId) , "FAILED" : "{} library failed to install on cluster {}.".format(source.title(), clusterId) , "UNINSTALL_ON_RESTART": "{} library installed on cluster {} has been marked for removal upon cluster restart.".format(source.title(), clusterId) } return result_map[status.upper()]
The parameters here include the library source, the cluster ID and the API’s status response.
Now let’s get to the serious business. Obviously, we need a way to check the current state of the library to see if it’s not already installed or if something went wrong during installation:
def getClusterLibraryStatus(source, properties, dbRegion, dbToken, verbose = True): """Gets the current library status """ source = source.lower() # Validate input assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran" assert properties is not None, "Error: Empty properties provided" # Get the cluster ID from the Spark environment clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") # Set default result to not installed result = describeClusterLibraryState(source, clusterId, "NOT_INSTALLED") if verbose else "NOT_INSTALLED" # Execute REST API request to get the library statuses libStatuses = adbAPI("2.0/libraries/cluster-status?cluster_id=" + clusterId, None, "GET", dbRegion, dbToken) if libStatuses.status_code == 200: statuses = libStatuses.json() if "library_statuses" in statuses: for status in statuses["library_statuses"]: if status["library"] == {source: properties}: if verbose: result = describeClusterLibraryState(source, clusterId, status["status"]) else: result = status["status"] else: print(status) return result
Parameters here include the library source and properties, Databricks region and token and a verbose flag, which if set to true (default) will make the function return human readable library status.
Finally, we have to create the most important function that will actually install the library:
def installClusterLibrary(source, properties, dbRegion, dbToken): """ Installs a cluster library given correct source and properties are provided For examples see https://docs.databricks.com/api/latest/libraries.html#install """ source = source.lower() # Validate input assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran" assert properties is not None, "Error: Empty properties provided" # Get the cluster ID from the Spark environment clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId") status = getClusterLibraryStatus(source, properties, dbRegion, dbToken, False).upper() if status != "INSTALLED": # Create the HTTP request body based on the cluster ID, library source and properties body = json.loads('{"cluster_id": "' + clusterId + '", "libraries": [{"' + source + '": ' + json.dumps(properties) + '}]}') # Execute REST API request to install the library runAPI = adbAPI("2.0/libraries/install", body, "POST", dbRegion, dbToken) if runAPI.status_code == 200: print("Installation started . . .") else: print(runAPI) else: print(describeClusterLibraryState(source, clusterId, status)) return status
The parameters here are: the library source and properties and the Databricks region and token.
Example function calls (azure-sqldb-spark library hosted on Maven)
Show the library status:
print(getClusterLibraryStatus("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************"))
Install the library:
installClusterLibrary("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************")
One thing to note here is that the library installation starts and runs asynchronously. This means that you can execute the getClusterLibraryStatus function multiple times after installClusterLibrary and get different results until the installation is done (or failed).
How Artificial Intelligence and Data Add Value to Businesses
Knowledge is power. And the data that you collect in the course of your business
May
Databricks Vs Synapse Spark Pools – What, When and Where?
Databricks or Synapse seems to be the question on everyone’s lips, whether its people asking
1 Comment
May
Power BI to Power AI – Part 2
This post is the second part of a blog series on the AI features of
Apr
Geospatial Sample architecture overview
The first blog ‘Part 1 – Introduction to Geospatial data’ gave an overview into geospatial
Apr
Data Lakehouses for Dummies
When we are thinking about data platforms, there are many different services and architectures that
Apr
Enable Smart Facility Management with Azure Digital Twins
Before I started writing this blog, I went to Google and searched for the keywords
Apr
Migrating On-Prem SSIS workload to Azure
Goal of this blog There can be scenario where organization wants to migrate there existing
Mar
Send B2B data with Azure Logic Apps and Enterprise Integration Pack
After creating an integration account that has partners and agreements, we are ready to create
Mar