> For the complete documentation index, see [llms.txt](https://www.sparkitecture.io/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://www.sparkitecture.io/operationalization/batch-scoring.md).

# Batch Scoring

{% hint style="info" %}
This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see: <https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook>.
{% endhint %}

### Create date parameter

```python
dbutils.widgets.text("varReportDate", "19000101")
ReportDate = dbutils.widgets.get("varReportDate")
print(ReportDate)
```

### Connect to storage

```python
storage_account_name = "mystorage"
storage_account_access_key = ""

file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
file_type = "csv"

spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)
```

### Define input schema

```python
from pyspark.sql.types import *

schema = StructType([
    StructField("ReportingDate", DateType(), True),
    StructField("id", StringType(), True),
    StructField("x1", IntegerType(), True),
    StructField("x2", DoubleType(), True)
])
```

### Read in new data

```python
dataset = spark.read\
               .format(file_type)\
               .option("header", "true")\
               .schema(schema)\
               .load(file_location)

## You can avoid defining a schema by having spark infer it from your data
## This doesn't always work and can be slow
#.option("inferSchema", "true")

## Fill in na's, if needed
# dataset = dataset.na.fill(0)
display(dataset)
```

### Load in transformation pipeline and model

```python
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, round
from pyspark.ml.regression import GeneralizedLinearRegressionModel

mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
```

### Score data using the model

```python
## Transform new data using the pipeline
mydataset = mypipeline.transform(dataset)
## Score new data using a trained model
scoreddataset = mymodel.bestModel.transform(mydataset)

output = scoreddataset.select(col("id"),
                              col("ReportingDate"),
                              col("prediction").alias("MyForecast"))
display(output)
```

### Write data back out to storage

```python
fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://www.sparkitecture.io/operationalization/batch-scoring.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
