Sparkitecture
GitHub
Sponsor
Submit Code / Report an Issue
Apache Spark Documentation
Search…
Welcome to Sparkitecture!
Cloud Service Integration
Azure Storage
Azure SQL Data Warehouse / Synapse
Azure Data Factory
Data Preparation
Reading and Writing Data
Shaping Data with Pipelines
Other Common Tasks
Machine Learning
About Spark MLlib
Classification
Regression
MLflow
Feature Importance
Model Saving and Loading
Model Evaluation
Streaming Data
Structured Streaming
Operationalization
API Serving
Batch Scoring
Natural Language Processing
Text Data Preparation
Model Evaluation
Bioinformatics and Genomics
Glow
Powered By
GitBook
Batch Scoring
This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see:
https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook
.
Create date parameter
dbutils
.
widgets
.
text
(
"varReportDate"
,
"19000101"
)
ReportDate
=
dbutils
.
widgets
.
get
(
"varReportDate"
)
print
(
ReportDate
)
Connect to storage
storage_account_name
=
"mystorage"
storage_account_access_key
=
""
file_location
=
"wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_"
+
ReportDate
+
".csv"
file_type
=
"csv"
spark
.
conf
.
set
(
"fs.azure.account.key."
+
storage_account_name
+
".blob.core.windows.net"
,
storage_account_access_key
)
Define input schema
from
pyspark
.
sql
.
types
import
*
schema
=
StructType
([
StructField
(
"ReportingDate"
,
DateType
(),
True
),
StructField
(
"id"
,
StringType
(),
True
),
StructField
(
"x1"
,
IntegerType
(),
True
),
StructField
(
"x2"
,
DoubleType
(),
True
)
])
Read in new data
dataset
=
spark
.
read\
.
format
(
file_type
)
\
.
option
(
"header"
,
"true"
)
\
.
schema
(
schema
)
\
.
load
(
file_location
)
## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")
## Fill in na's, if needed
# dataset = dataset.na.fill(0)
display
(
dataset
)
Load in transformation pipeline and model
from
pyspark
.
ml
.
tuning
import
CrossValidatorModel
from
pyspark
.
ml
import
PipelineModel
from
pyspark
.
sql
.
types
import
IntegerType
from
pyspark
.
sql
.
functions
import
col
,
round
from
pyspark
.
ml
.
regression
import
GeneralizedLinearRegressionModel
mypipeline
=
PipelineModel
.
load
(
"/mnt/trainedmodels/pipeline/"
)
mymodel
=
CrossValidatorModel
.
load
(
"/mnt/trainedmodels/lr"
)
Score data using the model
## Transform new data using the pipeline
mydataset
=
mypipeline
.
transform
(
dataset
)
## Score new data using a trained model
scoreddataset
=
mymodel
.
bestModel
.
transform
(
mydataset
)
output
=
scoreddataset
.
select
(
col
(
"id"
),
col
(
"ReportingDate"
),
col
(
"prediction"
).
alias
(
"MyForecast"
))
display
(
output
)
Write data back out to storage
fileloc
=
"/mnt/output"
+
str
(
ReportDate
)
#+ ".csv"
output
.
write
.
mode
(
"overwrite"
).
format
(
"com.databricks.spark.csv"
).
option
(
"header"
,
"true"
).
csv
(
fileloc
)
Operationalization - Previous
API Serving
Next - Natural Language Processing
Text Data Preparation
Last modified
2yr ago
Export as PDF
Copy link
Outline
Create date parameter
Connect to storage
Define input schema
Read in new data
Load in transformation pipeline and model
Score data using the model
Write data back out to storage