Installing Databricks Cluster Libraries from a Python notebook

Working with interactive clusters in Databricks makes it possible to manually install libraries using the workspace UI. It’s done once and for all. You don’t have to worry about it anymore.

Creating job clusters is another story. You must take care of library installations prior to executing the notebooks which reference these libraries. In the case you’re using Azure Data Factory to orchestrate the whole process you’re lucky, because appending libraries to job clusters is an out-of-the-box functionality. For all other scenarios using the Databricks REST API is one possible option. In fact, you can do this right from a Python notebook. That’s what I’m going to demonstrate in the following lines.

Let’s have a look at the REST API documentation first. We’ll be using the Cluster Status and Install endpoints only. For installing a library, we need to provide the library source and its properties. We need to create a proper HTTP request body in JSON format including the library source and properties. Here’s one example:

{
  "pypi": {
    "package": "simplejson",
    "repo": "https://my-pypi-mirror.com"
  }
}

Here “pypi” is the source and {“package”: “simplejson”, “repo”: “http://my-pypi-mirror.com”} are its properties. For more examples check the documentation here.

Let’s create a new Python notebook and then some functions in it. The code is simple and self-explanatory. Comments are available where needed.

We’ll be using only json and requests libraries:

import json
import requests
from requests.auth import HTTPBasicAuth

 

Now, let’s create a REST API wrapper function:

def adbAPI(endPoint, body, method = "GET", region, token):
  """Execute HTTP request against Databricks REST API 2.0"""
  
  domain = region + ".azuredatabricks.net"
  baseURL = "https://%s/api/" % (domain)

  if method.upper() == "GET":
    response = requests.get(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )
  else:
    response = requests.post(
        baseURL + endPoint
      , auth = HTTPBasicAuth("token", token)
      , json = body
    )

  return response

As parameters we’ll take the API endpoint, HTTP request body, HTTP method (GET or POST), Databricks workspace region (westeurope, northeurope, etc.) and, finally, a Databricks token.

 

Next is a helper function for translating the library status response into a human readable format:

def describeClusterLibraryState(source, clusterId, status):
  """Converts cluster library status response to a verbose message"""
  
  result_map = {
      "NOT_INSTALLED"       : "{} library is not installed on cluster {}.".format(source.title(), clusterId)
    , "INSTALLED"           : "{} library is already installed on cluster {}.".format(source.title(), clusterId)
    , "PENDING"             : "Pending installation of {} library on cluster {} . . .".format(source, clusterId)
    , "RESOLVING"           : "Retrieving metadata for the installation of {} library on cluster {} . . .".format(source, clusterId)
    , "INSTALLING"          : "Installing {} library on cluster {} . . .".format(source, clusterId)
    , "FAILED"              : "{} library failed to install on cluster {}.".format(source.title(), clusterId)
    , "UNINSTALL_ON_RESTART": "{} library installed on cluster {} has been marked for removal upon cluster restart.".format(source.title(), clusterId)
  }

  return result_map[status.upper()]

The parameters here include the library source, the cluster ID and the API’s status response.

 

Now let’s get to the serious business. Obviously, we need a way to check the current state of the library to see if it’s not already installed or if something went wrong during installation:

def getClusterLibraryStatus(source, properties, dbRegion, dbToken, verbose = True):
  """Gets the current library status """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), 
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, 
    "Error: Empty properties provided"

  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  # Set default result to not installed
  result = describeClusterLibraryState(source, clusterId, "NOT_INSTALLED") if verbose else "NOT_INSTALLED" 

  # Execute REST API request to get the library statuses
  libStatuses = adbAPI("2.0/libraries/cluster-status?cluster_id=" + clusterId, None, "GET", dbRegion, dbToken)

  if libStatuses.status_code == 200:
    statuses = libStatuses.json()
    if "library_statuses" in statuses:
      for status in statuses["library_statuses"]:
        if status["library"] == {source: properties}:
          if verbose:
            result = describeClusterLibraryState(source, clusterId, status["status"])
          else:
            result = status["status"]
  else:
    print(status)

  return result

Parameters here include the library source and properties, Databricks region and token and a verbose flag, which if set to true (default) will make the function return human readable library status.

 

Finally, we have to create the most important function that will actually install the library:

def installClusterLibrary(source, properties, dbRegion, dbToken):
  """
  Installs a cluster library given correct source and properties are provided
  For examples see https://docs.databricks.com/api/latest/libraries.html#install
  """
  
  source = source.lower()

  # Validate input
  assert source in ("jar", "egg", "whl", "pypi", "maven", "cran"), 
    "Error: Invalid library source specified. Valid sources are: jar, egg, whl, pypi, maven, cran"
  assert properties is not None, 
    "Error: Empty properties provided"
  
  # Get the cluster ID from the Spark environment
  clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")

  status = getClusterLibraryStatus(source, properties, dbRegion, dbToken, False).upper()
  if status != "INSTALLED":
    # Create the HTTP request body based on the cluster ID, library source and properties
    body = json.loads('{"cluster_id": "' + clusterId + '", "libraries": [{"' + source + '": ' + json.dumps(properties) + '}]}')
    # Execute REST API request to install the library
    runAPI = adbAPI("2.0/libraries/install", body, "POST", dbRegion, dbToken)
    if runAPI.status_code == 200:
      print("Installation started . . .")
    else:
      print(runAPI)
  else:
    print(describeClusterLibraryState(source, clusterId, status))

  return status

The parameters here are: the library source and properties and the Databricks region and token.

 

Example function calls (azure-sqldb-spark library hosted on Maven)

Show the library status:

print(getClusterLibraryStatus("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************"))

Install the library:

installClusterLibrary("maven", {"coordinates": "com.microsoft.azure:azure-sqldb-spark:1.0.2"}, "westeurope", "dapi********************************")

One thing to note here is that the library installation starts and runs asynchronously. This means that you can execute the getClusterLibraryStatus function multiple times after installClusterLibrary and get different results until the installation is done (or failed).

Leave a Reply

Your email address will not be published. Required fields are marked *