Batch Scoring

This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see:

Create date parameter

dbutils.widgets.text("varReportDate", "19000101")
ReportDate = dbutils.widgets.get("varReportDate")

Connect to storage

storage_account_name = "mystorage"
storage_account_access_key = ""

file_location = "wasbs://<container>" + ReportDate + ".csv"
file_type = "csv"


Define input schema

from pyspark.sql.types import *

schema = StructType([
    StructField("ReportingDate", DateType(), True),
    StructField("id", StringType(), True),
    StructField("x1", IntegerType(), True),
    StructField("x2", DoubleType(), True)

Read in new data

dataset =\
               .option("header", "true")\

## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")

## Fill in na's, if needed
# dataset =

Load in transformation pipeline and model

from import CrossValidatorModel
from import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
from import GeneralizedLinearRegressionModel

mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")

Score data using the model

## Transform new data using the pipeline
mydataset = mypipeline.transform(dataset)
## Score new data using a trained model
scoreddataset = mymodel.bestModel.transform(mydataset)

output ="id"),

Write data back out to storage

fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"

Last updated