Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
DanielIngenhaag
Product and Topic Expert
Product and Topic Expert

This blog is part of a blog series from SAP Datasphere with the focus on the integration of SAP Datasphere and Databricks. In this blog, we will provide an overview of the integration between SAP Datasphere and Databricks by using the premium outbound integration of Replication Flows within SAP Datasphere. A major prerequisite to use the approach we are providing in this blog, was the delivery of the premium outbound integration in SAP Datasphere that allows users to replicate data also into non-SAP target systems, e.g. 3rd party object stores, Google BigQuery or Apache Kafka. Please check the following blog in case you are interested to learn more about the premium outbound integration in SAP Datasphere:

https://blogs.sap.com/2023/11/16/replication-flow-blog-series-part-2-premium-outbound-integration/

In general, we aim to follow a strategy where we want to avoid data replication and rather use a “federation first” approach in the area of data integration in SAP Datasphere. If however, federation is not feasible in a certain scenario or does not meet your expectations, data replication can also be used to integrate data between SAP Datasphere and Databricks. Please find below a summary of links where you can already access various content around SAP Datasphere and Databricks:

In case you are interested into additional information around the partnership between SAP and Databricks, please check the following blog created by Katryn Cheng: https://blogs.sap.com/2023/12/12/maximize-the-value-of-your-business-data-with-sap-datasphere-and-d...

As a baseline for this blog, we will provide a step-by-step overview of a typical scenario for customers who want to use SAP Datasphere in conjunction with Databricks based on the current public features that SAP Datasphere and Databricks support. At the end of the blog, we will also provide a short outlook into the future about what additional functional enhancements we are planning.

For the integration scenario in this blog, we will in a first step use SAP Datasphere to replicate data out of an SAP S/4HANA system (e.g. sales order and product data) and load it into an Azure Data Lake Gen 2 storage provisioned by Databricks leveraging the Replication Flow technology. Secondly, Databricks will integrate the replicated data into Delta tables and apply machine learning algorithms to perform sales order prediction based on the replicated data from SAP S/4HANA:

DanielIngenhaag_0-1707901019839.png

 

Part 1: Replicate data from SAP S/4HANA into Azure Data Lake provisioned by Databricks

In this part of our blog, we will show you how to replicate the data from your SAP S/4HANA system into an Azure Data Lake using Replication Flows in SAP Datasphere. We will not go into each every detailed setting of a Replication Flow and if you are interested to learn more about SAP Datasphere replication flows in general, please have a look at the following blog: https://blogs.sap.com/2023/11/16/replication-flow-blog-series-part-1-overview/ 

Step 1: Establish Connectivity to SAP S/4HANA and Azure Data Lake Generation 2

In our scenario we assume that a connection towards SAP S/4HANA and Azure Data Lake Gen 2 is already available in your SAP Datasphere system. In general, the same scenario could of course also be re-used using any other object store that is supported by Databricks and Replication Flow, e.g. Google Cloud Storage (GCS) or Amazon Simple Storage (AWS S3).

In this case, we have created a connection “DATABRICKS_DEMO” leveraging the connection type “Azure Data Lake Storage Gen2” and a connection towards SAP S/4HANA sing the connection type “SAP S/4HANA on-Premise” using the SAP Cloud Connector.

DanielIngenhaag_1-1707901085040.png

In case you need more information about how to create a connection to Azure Data Lake Gen 2 & SAP S/4HANA in SAP Datasphere, please check the following links:


Step 2: Create a replication flow in SAP Datasphere

Please login into an SAP Datasphere tenant and go into the Data Builder application in order to create a replication flow:

DanielIngenhaag_2-1707901085057.png

Please select the previously created connection to SAP S/4HANA as a source connection:

DanielIngenhaag_3-1707901085239.png

DanielIngenhaag_4-1707901085014.png

Please select the “CDS - CDS Views” folder as source container, which allows us to select CDS views as source data sets in our replication flow:

DanielIngenhaag_5-1707901085054.png

In this scenario, we will replicate three custom CDS Views from SAP S/4HANA containing information about business partner, product master data as well as sales order data:

DanielIngenhaag_6-1707901085129.png

Now we configure our target connection where we want to replicate the data, which is in this case an Azure Data Lake Gen 2 object store container:

DanielIngenhaag_7-1707901085063.png

Select the pre-created connection to Azure Data Lake Gen 2 from the connection dropdown. In our case the connection is “DATABRICKS_DEMO”:

DanielIngenhaag_8-1707901085029.png

In the next step, please select the root path in your Azure Data Lake connection in the target container dialog, e.g. in our example we choose folder “/Demo_data”. This will be used as the entry folder path where all your CDS Views will be replicated by the replication flow.

After your selection, the replication flow will be updated accordingly with the pre-selected CDS Views as target objects:

DanielIngenhaag_9-1707901085019.png

Optionally, you can re-name the target data sets (= folders) for each replication object :

DanielIngenhaag_10-1707901085062.png

In this case we rename the three CDS Views using a custom description, which will later on be used as folder names:

DanielIngenhaag_11-1707901085039.png

Additionally, we can configure settings for the file handling in Azure Data Lake using the configuration icon in the replication flow:

DanielIngenhaag_12-1707901085080.png

In this case we choose the following settings to use:

  • File type - Parquet
  • File Compression - None
  • Group delta by - Date (which will group the delta records in folders on a daily basis)

DanielIngenhaag_13-1707901085061.png

Note: In such a scenario we typically recommend to use Parquet files that can be later on accessed by Databricks, which we also have tested end-2-end as part of this blog. The grouping of delta is optional and you also go ahead without using any grouping of delta records.

Please find here more information about the handling of files in replication flows.

Now it is time to save and deploy your replication:

DanielIngenhaag_14-1707901085081.png

You can specify any name for your replication flow, e.g. RF_S4_DATABRICKS_ADL

DanielIngenhaag_15-1707901085058.png

After successfully deploying your replication flow, you can hit the Run button to start the replication:

DanielIngenhaag_16-1707901085059.png

In case the replication flow is now running, you can go to the Data Integration Monitor to monitor the data replication:

DanielIngenhaag_17-1707901085126.png

In the detailed monitoring screen of your replication flow, you can check various detailed information during the execution, e.g. the status, number of replicated records etc.

DanielIngenhaag_18-1707901085243.png

Step 3: Check replicated data into Azure Data Lake Gen 2

After the initial load of all CDS Views is executed successfully, you will find the following parquet files that have been replicated into Azure Data Lake Gen 2:

DanielIngenhaag_19-1707901085056.png

All the data that belongs to the initial load of the “BusinessPartner” will be stored in several parquet files in the folder “initial”. Within the “delta” folder, all changes that were executed in the source system for the selected CDS View will be replicated to parquet files that are grouped together in a separate subfolder per day. The reason for this is the setting we have defined in step 2, where we configured the setting “Group Delta by”: “Date” in our Replication Flow.

Part 2: Load the data into Databricks using Auto Loader and Delta Live Tables

Once the data is replicated into Azure Data Lake Gen 2 using the Replication Flow, we can use Auto Loader to load into Databricks Data Intelligence Platform. Auto Loader will very efficiently process the incoming new data files incrementally and idempotently as they arrive in the cloud storage. The other advantages of Auto Loader are that it does not require any additional setup, it can infer schema, automatically do schema evolution, and also rescue data in case of errors.

Auto Loader works very well in conjunction with Delta Live Tables. Delta Live Tables is a declarative framework in Databricks that can help build medallion architecture ETL pipelines. It supports both Python and SQL languages. 

The Python code below will create the raw layer/bronze table which is using Auto Loader in Delta Live Tables. This will start ingesting all the new data files written by SAP Replication Flow into Databricks in real time using Auto Loader.

 

@dlt.create_table(comment="SAP Replication Flow Autoloader Sales Orders Bronze table")
def sap_sales_orders_bronze():
 return (spark.readStream
     .format("cloudFiles")
     .option("cloudFiles.format", "parquet")
 .load('abfss://sapdatabrickscontainer@sapdatabricksrep.dfs.core.windows.net/sapdatabrickscontainer/SalesOrders/'))

 

In Delta Live Tables we can Process change records (inserts, updates, deletes) incrementally using a simple, declarative “APPLY CHANGES INTO” SQL API or the Python equivalent shown below. Not only can we handle CDC data, but we can also enforce the quality of the data using expectations in Delta Live Tables and select if we want SCD 1 or SCD 2. By using this code we can create the silver layer for the Sales Order pipeline. For more information check this link.

 

lt.create_streaming_table(name="sap_sales_orders_silver",
 comment="SAP sales orders CDC silver table",
 table_properties={
   "quality": "silver"
 },
# Expectations
expect_all_or_drop = ({"ItemGuid valid": "ItemGuid IS NOT NULL"})
)
# Apply changes...
dlt.apply_changes(
 target = "sap_sales_orders_silver",
 source = "sap_sales_orders_bronze",
 keys = ["ItemGuid"], #Primary key to match the rows to upsert/delete
 sequence_by = col("__sequence_number"), #deduplicate by operation date getting the most recent value
 apply_as_deletes = expr("__operation_type = 'D'"), #Delete condition
 stored_as_scd_type = 2
)

 

Then, finally, the gold layer is created using these clean silver expectations. Using the “expect,” we can set data quality rules for one or multiple columns on this silver layer. For, e.g., Above expectation, we are ensuring that Item GUID is not null and the records that do not satisfy this criterion are dropped. There are many different ways expectations can be applied to ensure data quality; please explore more here.

The gold layer (Materialized View) is going to be aggregated and pruned to whatever columns are needed. A materialized view is precomputed and refreshed according to the updated schedule of the pipeline. For more information please check this link.

DanielIngenhaag_0-1708536164774.png

Machine learning (Regression) in Databricks + MLFlow:

The gold layer of the delta live tables data is then subjected to regression analysis in Databricks. Databricks Data Intelligence is a cloud-based platform for big data analytics, and machine learning/AI. MLflow, integrated with Databricks, is a machine learning lifecycle management tool. Regression analysis involves examining the relationships between variables, often used for predictive modelling for this scenario it is going to Sales prediction. 

Once the gold tables are combined/joined, a feature store table is built using these joined gold tables in Databricks. A feature store is a centralized repository that enables data scientists to find and share features and also ensures that the same code used to compute the feature values is used for model training and inference.

 

from pyspark.sql.functions import *
from databricks.feature_store import *

fs = FeatureStoreClient()

# This will create Feature store based on joined data of gold tables created by DLT
fs.create_table(
   name="fs_sap_sales_orders",
   primary_keys=["SalesOrderGuid"],
   df=df_sales_data #Using the join of the gold tables created by DLT pipelines
)

 

Then we can retrieve the features from the feature store to create the training set which will be used to train the model. Using the MLflow and Feature Store we can do the model training and then the model can be used to predict the sales. We can use MLflow to log the models and the evaluation metrics and use ML libraries to build the regression models as shown below.

 

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
 
#The feature lookup will map these primary keys to the related records in the feature store.

model_feature_lookups = [
FeatureLookup(
  table_name="fs_sap_sales_orders",
  lookup_key="SalesOrderGuid",
  feature_names=[ "Weight", "price", "Height"]
  )
]

# create_training_set will use the map generated from model_feature_lookups to retrieve the specified features linked to the primary keys in the training_df dataframe
training_set = fs.create_training_set(
  df = df_sales_data,
  feature_lookups=model_feature_lookups,
  label="NetAmount"
)

# import mlflow module to log the model run details
import mlflow

# start a new run using the mlflow environment
with mlflow.start_run() as run:
 # load the training set Spark dataframe
 training_df = training_set.load_df()
  # split the training set into train and test datasets with a ratio of 80:20 respectively using a seed of 42
 train, test = training_df.randomSplit([0.8, 0.2], seed=42)
  # Prepare the training set by selecting necessary columns
 X_train = train.drop("SalesOrderGuid")
  # Prepare the labels dataset
 Y_train = train.SalesOrderGuid

 # create a vector assembler to combine the chosen feature columns
 vec_assembler = VectorAssembler(inputCols=["price", "Height", "Weight"], outputCol="features")

 # Add the new features vector to the Spark dataframe
 vec_train_df = vec_assembler.transform(training_df)

 # create a new instance of the Linear Regression algorithm
 lr = LinearRegression(featuresCol="features", labelCol="NetAmount")

 # Pipeline to chain the vector assembler and Linear Regression algorithm together
 pipeline = Pipeline(stages=[vec_assembler, lr])

 # Fit the pipeline model to the training dataset
 model = pipeline.fit(training_df)

 # Use the model to make predictions on the test dataset
 predictions = model.transform(test)

 # create an instance of the regression evaluator
 eval = RegressionEvaluator(labelCol="NetAmount", predictionCol="prediction")

 # Log the test r2 metrics in the MLflow environment
 mlflow.log_metric("r2", eval.evaluate(predictions, {eval.metricName: "r2"}))

 # Log the test root mean square error in the MLflow environment
 mlflow.log_metric("rmse", eval.evaluate(predictions, {eval.metricName: "rmse"})

 

Here is the output of predictions for this regression model:

DanielIngenhaag_22-1707901951076.png

If we don’t want to explicitly log metrics and models etc, then set autolog to False, as by default when a Python notebook is attached to a cluster, Databricks Autologging calls mlflow.autolog() to set up tracking for your models, metrics etc. This can be customized, for more info go to this link. This regression machine learning model with MLflow is just the tip of the iceberg, there are many other features like AutoML, MLOps, LLMOps, AI Governance, Vector search, Large Language Model integration/fine-tuning, etc. For more information on complete capabilities check out this link.

Conclusion and Outlook

Done! We have now successfully demonstrated the integration between SAP Datasphere and Databricks using the current public available features that are available in both products.

Thanks a lot to Krishna Satyavarapu a close collaborator and co-author of this blog as well as the remaining SAP Datasphere and Databricks team!

Additionally, as part of our ongoing partnership with Databricks, we are investigating approaches for more seamless integration between SAP Datasphere and Databricks in the future. For any news around this topic please check our SAP Datasphere roadmap explorer, where we will announce any additional enhancements in this area. 

In case you have any questions or feedback, please feel free to create a comment for this blog.

Important links & references:

Replication Flow Blog - Introduction to Replication Flows in SAP Datasphere 

SAP Datasphere - Create a Replication Flow

SAP Datasphere - Handling of target object stores in Replication Flows

SAP Datasphere - Connectivity

Databricks - Delta Lake

Databricks - Auto Loader & Delta Live Tables

Databricks - Machine Learning

2 Comments