# Shaping Data with Pipelines

### Load in required libraries

```python
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
```

### Define which columns are numerical versus categorical (and which is the label column)

```python
label = "dependentvar"
categoricalColumns = ["col1",
                     "col2"]

numericalColumns = ["num1",
                    "num2"]

#categoricalColumnsclassVec = ["col1classVec",
#                              "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
```

### Set up stages

```
stages = []
```

### Index the categorical columns and perform One Hot Encoding

One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)

```python
for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  ## Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  ## Add stages
  stages += [stringIndexer, encoder]
```

### Index the label column and perform One Hot Encoding

```python
## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]
```

{% hint style="info" %}
&#x20;**Note:** If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).
{% endhint %}

### Assemble the data together as a vector

This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the `VectorAssembler` function.

```python
assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")
stages += [assembler]
```

### Scale features using Normalization

```python
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaledFeatures",
                        withStd = True,
                        withMean = True)
stages += [scaler]
```

### Set up the transformation pipeline using the stages you've created along the way

```python
prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
```

## Pipeline Saving and Loading

Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

### Save the transformation pipeline

```python
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
```

### Load in the transformation pipeline

```python
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://www.sparkitecture.io/data-preparation/shaping-data-with-pipelines.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
