Shaping Data with Pipelines
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
label = "dependentvar"
categoricalColumns = ["col1",
"col2"]
numericalColumns = ["num1",
"num2"]
#categoricalColumnsclassVec = ["col1classVec",
# "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
stages = []
One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)
for categoricalColumn in categoricalColumns:
print(categoricalColumn)
## Category Indexing with StringIndexer
stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
## Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
## Add stages
stages += [stringIndexer, encoder]
## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]
Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).
This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the
VectorAssembler
function.assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
stages += [assembler]
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features",
outputCol = "scaledFeatures",
withStd = True,
withMean = True)
stages += [scaler]
prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
dataset = pipelineModel.transform(dataset)
display(dataset)