Shaping Data with Pipelines

Load in required libraries

1
from pyspark.ml import Pipeline
2
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
Copied!

Define which columns are numerical versus categorical (and which is the label column)

1
label = "dependentvar"
2
categoricalColumns = ["col1",
3
"col2"]
4
5
numericalColumns = ["num1",
6
"num2"]
7
8
#categoricalColumnsclassVec = ["col1classVec",
9
# "col2classVec"]
10
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
Copied!

Set up stages

1
stages = []
Copied!

Index the categorical columns and perform One Hot Encoding

One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)
1
for categoricalColumn in categoricalColumns:
2
print(categoricalColumn)
3
## Category Indexing with StringIndexer
4
stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
5
## Use OneHotEncoder to convert categorical variables into binary SparseVectors
6
encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
7
## Add stages
8
stages += [stringIndexer, encoder]
Copied!

Index the label column and perform One Hot Encoding

1
## Convert label into label indices using the StringIndexer
2
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
3
stages += [label_stringIndexer]
Copied!
Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).

Assemble the data together as a vector

This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler function.
1
assemblerInputs = categoricalColumnsclassVec + numericalColumns
2
assembler = VectorAssembler(inputCols = assemblerInputs,
3
outputCol = "features")
4
stages += [assembler]
Copied!

Scale features using Normalization

1
from pyspark.ml.feature import StandardScaler
2
3
scaler = StandardScaler(inputCol = "features",
4
outputCol = "scaledFeatures",
5
withStd = True,
6
withMean = True)
7
stages += [scaler]
Copied!

Set up the transformation pipeline using the stages you've created along the way

1
prepPipeline = Pipeline().setStages(stages)
2
pipelineModel = prepPipeline.fit(dataset)
3
dataset = pipelineModel.transform(dataset)
Copied!

Pipeline Saving and Loading

Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

Save the transformation pipeline

1
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
2
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
Copied!

Load in the transformation pipeline

1
from pyspark.ml import PipelineModel
2
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
3
dataset = pipelineModel.transform(dataset)
4
display(dataset)
Copied!