Shaping Data with Pipelines

Load in required libraries

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler

Define which columns are numerical versus categorical (and which is the label column)

label = "dependentvar"
categoricalColumns = ["col1",
                     "col2"]

numericalColumns = ["num1",
                    "num2"]

#categoricalColumnsclassVec = ["col1classVec",
#                              "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

Set up stages

stages = []

Index the categorical columns and perform One Hot Encoding

One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)

for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  ## Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  ## Add stages
  stages += [stringIndexer, encoder]

Index the label column and perform One Hot Encoding

## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]

Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).

Assemble the data together as a vector

This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler function.

assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")
stages += [assembler]

Scale features using Normalization

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaledFeatures",
                        withStd = True,
                        withMean = True)
stages += [scaler]

Set up the transformation pipeline using the stages you've created along the way

prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

Pipeline Saving and Loading

Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

Save the transformation pipeline

pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))

Load in the transformation pipeline

from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)

Last updated