Shaping Data with Pipelines
Load in required libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
Define which columns are numerical versus categorical (and which is the label column)
label = "dependentvar"
categoricalColumns = ["col1",
"col2"]
numericalColumns = ["num1",
"num2"]
#categoricalColumnsclassVec = ["col1classVec",
# "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
Set up stages
stages = []
Index the categorical columns and perform One Hot Encoding
One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)
for categoricalColumn in categoricalColumns:
print(categoricalColumn)
## Category Indexing with StringIndexer
stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
## Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
## Add stages
stages += [stringIndexer, encoder]
Index the label column and perform One Hot Encoding
## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]
Assemble the data together as a vector
This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler
function.
assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
stages += [assembler]
Scale features using Normalization
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
outputCol = "scaledFeatures",
withStd = True,
withMean = True)
stages += [scaler]
Set up the transformation pipeline using the stages you've created along the way
prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
Pipeline Saving and Loading
Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.
Save the transformation pipeline
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
Load in the transformation pipeline
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)
Last updated
Was this helpful?