The Common Data Model in Azure Data Lake Storage – Azure Data Services

If like me, you’ve been keeping taps on what Microsoft has been up to on the Power Platform world, you would have noticed that there are two concepts that are regularly referenced in their architectures and generally associated to each other, Azure Data Lake Storage (ADLS) Gen 2 and Common Data Model (CDM).

As Francesco referred in his blog, Microsoft ultimate vision is for the CDM to be the de facto standard data model, however, although there is a fair amount of resources talking about the capabilities and features, it can be a bit confusing to understand how you can actually store your data in the CDM format in ADLS and use it to run data analytics such as data warehousing, Power BI reporting and Machine Learning.

In this blog series I’ll cover 6 different ways to instantiate a CDM model in ADLS:

As John Macintyre brilliantly summarised back in late 2018, “Azure Data Services enable advanced analytics that let you maximize the business value of data stored in CDM folders in the data lake. Data engineers and data scientists can use Azure Databricks and Azure Data Factory dataflows to cleanse and reshape data, ensuring it is accurate and complete. Data from different sources and in different formats can be normalized, reformatted, and merged to optimize the data for analytics processing. Data scientists can use Azure Machine Learning to define and train machine learning models on the data, enabling predictions and recommendations that can be incorporated into BI dashboards and reports, and used in production applications. Data engineers can use Azure Data Factory to combine data from CDM folders with data from across the enterprise to create an historically accurate, curated enterprise-wide view of data in Azure SQL Data Warehouse. At any point, data processed by any Azure Data Service can be written back to new CDM folders, to make the insights created in Azure accessible to Power BI and other CDM-enabled apps or tools.

In this blog, I’ll start by demonstrating how to consume data from CDM folders and extend it with Adventure Works data (stored in Azure SQLDB) using Azure Databricks. I follow by highlighting a few limitations I came across while testing this approach and finish explaining how to further explore the transformed data by either write back to ADLS as a new CDM folder or save it in a table in Azure Synapse Analytics.

To copy the data from Azure SQLDB and save it as a csv file in ADLS, I created a simple Data Factory pipeline with a Copy Data activity.

Next, I created an Azure Databricks cluster with the Runtime version 6.2 with Apache Spark 2.4.4 and installed a Scala library, developed by Microsoft, that helps read and write CDM folders just like other native Spark data sources. During my tests, I installed two versions, 0.3 and 0.4. As indicated by the version, these are still on it’s early days and not stable for production.

The first step on the notebook is to configure the access to the storage accounts. The secrets are stored in Azure Key Vault.

# Set up ADLS and Blob storage account access key in the notebook session conf
spark.conf.set("fs.azure.account.key.cdmdemosaukw.dfs.core.windows.net", dbutils.secrets.get(scope = "key-vault-secrets", key = "StorageAccountKey"))
spark.conf.set("fs.azure.account.key.cdmdemobsukw.blob.core.windows.net", dbutils.secrets.get(scope = "key-vault-secrets", key = "BlobStorageAccountKey"))

Next, I define the parameters that include the service principal details required by the spark-cdm methods, the name of the new CDM model produced by Databricks, the input location to the model.json and csv files, the output location and the Synapse Analytics settings.

# Spark-cdm related settings
appId = dbutils.secrets.get(scope = "key-vault-secrets", key = "AppID")
appKey = dbutils.secrets.get(scope = "key-vault-secrets", key = "AppKey")
tenantId = dbutils.secrets.get(scope = "key-vault-secrets", key = "TenantID")
cdmModelName = "DatabricksModel"

# Synapse Analytics related settings (used to setup the connection to the SQL DW instance)
asaDatabase = "cdmdemoasaukw"#<databaseName>
asaServer = "cdmdemosqlserveruks"#<servername>
asaUser = "CDMDemoAdmin"#<sqlUser>
asaPass = dbutils.secrets.get(scope = "key-vault-secrets", key = "SQLServerPassword")#<sqlUserPassword>
asaJdbcPort =  "1433"
asaJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;loginTimeout=30;"
asaUrl = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";"+dwJdbcExtraOptions

# Blob Storage related settings (used for temporary storage)
# The value is the storage account url in the format of <accountName>.blob.core.windows.net
blobStorage = "cdmdemobsukw.blob.core.windows.net"#<blobStorageAccount>
blobContainer = "synapseanalytics"#<storageContainer>

# Input/Output file details
cdsInputLocation = "https://cdmdemosaukw.dfs.core.windows.net/commondataservice-josesdevenvironment-org6832e392/model.json"
dataflowsInputLocation = "https://cdmdemosaukw.dfs.core.windows.net/power-platform-dataflows/environments/org58af6d71/CDS Dataflow/model.json"
adventureworksInputLocation = "abfss://adventureworks@cdmdemosaukw.dfs.core.windows.net/Customer/*.csv"
databricksOutputLocation = "https://cdmdemosaukw.dfs.core.windows.net/databricks"

In this notebook, I’m combining data from the CDM Contact entity with the Adventure Works Customer entity. Because the entities have different number of fields, I created a custom function that allows the union of two dataframes with different structures.

# Create function to union two dataframes with different structure
from pyspark.sql.functions import lit

def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
    """ return ordered dataFrame by the columns order list with null in missing columns """
    if not df_missing_fields:  # no missing fields for the df
        return df.select(columns_order_list)
    else:
        columns = []
        for colName in columns_order_list:
            if colName not in df_missing_fields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)


def __add_missing_columns(df, missing_column_names):
    """ Add missing columns as null in the end of the columns list """
    list_missing_columns = []
    for col in missing_column_names:
        list_missing_columns.append(lit(None).alias(col))

    return df.select(df.schema.names + list_missing_columns)


def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
    """ return union of data frames with ordered columns by left_df. """
    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                        right_list_miss_cols)
    return left_df_all_cols.union(right_df_all_cols)


def union_d_fs(left_df, right_df):
    """ Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls """
    # Check for None input
    if left_df is None:
        raise ValueError('left_df parameter should not be None')
    if right_df is None:
        raise ValueError('right_df parameter should not be None')
        # For data frames with equal columns and order- regular union
    if left_df.schema.names == right_df.schema.names:
        return left_df.union(right_df)
    else:  # Different columns
        # Save dataFrame columns name list as set
        left_df_col_list = set(left_df.schema.names)
        right_df_col_list = set(right_df.schema.names)
        # Diff columns between left_df and right_df
        right_list_miss_cols = list(left_df_col_list - right_df_col_list)
        left_list_miss_cols = list(right_df_col_list - left_df_col_list)
        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)

In the first blog of the series I demonstrated how to extract CDS data using the Export to data lake service. While trying to read the generated data, I came across my first issue. As I tried to execute the cell, an exception with the text No value found for ‘guid’ was thrown. One user reported the same issue in the GitHub page but still no updated on how to solve the issue.

# Read CDS Contact entity generated by Export to Data Lake service
cdsContactDf = (spark.read.format("com.microsoft.cdm")
             .option("cdmModel", cdsInputLocation)
             .option("entity", "contact")
             .option("appId", appId)
             .option("appKey", appKey)
             .option("tenantId", tenantId)
             .load()
            )

In the second blog of the series I demonstrated how to extract the same CDS data but this time using Power BI Dataflows. In this scenario, the cell executed with success and I could see the structure of the entity. I believe this worked because the library was built to read the CDM structure generated by Dataflows rather from the previous service, but it’s just a theory.

# Read CDS Contact entity generated by Power BI Dataflows
dataflowsContactDf = (spark.read.format("com.microsoft.cdm")
             .option("cdmModel", dataflowsInputLocation)
             .option("entity", "Contact")
             .option("appId", appId)
             .option("appKey", appKey)
             .option("tenantId", tenantId)
             .load()
            )

The second issue happened when I tried to display the dataframe. Here, I had different messages, depending on the version of the library.

Using version 0.3,  java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String. 

Using version 0.4, java.io.InvalidClassException: com.microsoft.cdm.utils.DataConverter; local class incompatible: stream classdesc serialVersionUID = -628576079859327802, local class serialVersionUID = 7378383711017995819

On their GitHub page, the team mentioned they fixed this problem during version 0.3, however, it’s clearly not stable enough. In summary, I couldn’t display the data.

display(dataflowsContactDf)

The next two step reads the Adventure Works data and applies two transformations. First, selects the intended columns and renames them according to the CDM Contact entity. If the columns have different names the union function will create extra columns rather than appending the data. The second transformation is required due to another limitation of the spark-cdm library. Apparently, timestamp and Int32 data are not a valid format, which is why, I converted from Integer to String and from Timestamp to Date.

# Read Customer entity from Adventure Works
adventureworksCustomerDf = (spark.read
                           .option("header", True)
                           .option("inferSchema", True)
                           .csv(adventureworksInputLocation)
            )
# Restructure Customer entity
adventureworksContactDf = (adventureworksCustomerDf
                           .selectExpr("CAST(CustomerID AS String) AS contactId", "Title AS salutation", "FirstName AS firstName", "MiddleName AS middleName", 
                                       "LastName AS lastName", "Suffix as suffix", "CompanyName AS company", "SalesPerson AS salesPerson", "EmailAddress AS EMailAddress1",
                                       "Phone AS telephone1", "PasswordHash AS passwordHash", "CAST(ModifiedDate AS Date) AS modifiedOn")
                          )

The next step invokes the union function. The outcome is a new dataframe that contains all the columns of the CDM Contact entity plus a new one from the Adventure Works Customer entity, SalesPerson.

# Union Contact dataframes
databricksContactEntity = union_d_fs(dataflowsContactDf, adventureworksContactDf)

At this point, I consumed CDM data from ADLS and enrich it by adding external data from Adventure Works. The next logical step is to save these changes so further analysis can be performed.

One of the recommended approaches is to save it back to ADLS as a new CDM folder. Once the data is landed in the lake, we can attach it to Power BI as an external source, as demonstrated here, and consume it directly in Power BI.

Unfortunately, the spark-cdm library creates some issues. As indicated above,there are a few problems while trying to display a dataframe with CDM data. Not surprisingly, the outcome of the cell below is an exception. Although this happens, a CDM folder is still created, however, the strictly necessary model.json file is not created, invalidating data consumers to properly read and understand the data.

Interestingly enough, saving the Adventure Works entity as a new CDM folder works as expected, which indicates the problem is once again related with unsupported data types.

# Save Contact entity to a new CDM folder in ADLS
(databricksContactEntity.write.format("com.microsoft.cdm")
                             .option("entity", "Contact")
                             .option("appId", appId)
                             .option("appKey", appKey)
                             .option("tenantId", tenantId)
                             .option("cdmFolder", databricksOutputLocation)
                             .option("cdmModelName", cdmModelName)
                             .save())

The other option is to persist the data to a database, such as Synapse Analytics. To do that, we can use ADF and Polybase to read from the new CDM folder, as demonstrated in this tutorial, or ingest directly from Databricks.

Unfortunately, this option also has a few limitations. Some of the data types need to be converted before writing to a Synapse Analytics table. The result of an unsupported data type is an exception such as java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String

# Save Contact entity in Synapse Analytics
(databricksContactEntity.write
 .format("com.databricks.spark.sqldw")
 .option("url", asaUrl)
 .option("forwardSparkAzureStorageCredentials", "true")
 .option("dbTable", "Contact")
 .option("tempDir", "wasbs://"+blobContainer+ "@" + blobStorage + "/tmpdir")  #only Azure BlobStore locations are supported 
 .mode("overwrite")
 .save()
)