Batch Scoring
This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see: https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook.

Create date parameter

1
dbutils.widgets.text("varReportDate", "19000101")
2
ReportDate = dbutils.widgets.get("varReportDate")
3
print(ReportDate)
Copied!

Connect to storage

1
storage_account_name = "mystorage"
2
storage_account_access_key = ""
3
4
file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
5
file_type = "csv"
6
7
spark.conf.set(
8
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
9
storage_account_access_key)
Copied!

Define input schema

1
from pyspark.sql.types import *
2
3
schema = StructType([
4
StructField("ReportingDate", DateType(), True),
5
StructField("id", StringType(), True),
6
StructField("x1", IntegerType(), True),
7
StructField("x2", DoubleType(), True)
8
])
Copied!

Read in new data

1
dataset = spark.read\
2
.format(file_type)\
3
.option("header", "true")\
4
.schema(schema)\
5
.load(file_location)
6
7
## You can avoid defining a schema by having spark infer it from your data
8
## This doesn't always work and can be slow
9
#.option("inferSchema", "true")
10
11
## Fill in na's, if needed
12
# dataset = dataset.na.fill(0)
13
display(dataset)
Copied!

Load in transformation pipeline and model

1
from pyspark.ml.tuning import CrossValidatorModel
2
from pyspark.ml import PipelineModel
3
from pyspark.sql.types import IntegerType
4
from pyspark.sql.functions import col, round
5
from pyspark.ml.regression import GeneralizedLinearRegressionModel
6
7
mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
8
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
Copied!

Score data using the model

1
## Transform new data using the pipeline
2
mydataset = mypipeline.transform(dataset)
3
## Score new data using a trained model
4
scoreddataset = mymodel.bestModel.transform(mydataset)
5
6
output = scoreddataset.select(col("id"),
7
col("ReportingDate"),
8
col("prediction").alias("MyForecast"))
9
display(output)
Copied!

Write data back out to storage

1
fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
2
output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
Copied!