Text Data Preparation

Tokenization and Vectorization

Load in required libraries

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
def clean_text(c):
c = lower(c)
c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
c = regexp_replace(c, "[0-9]", "") # Remove numbers
c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
#c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
return c
dataset = dataset.withColumn("text", clean_text(col("text")))

RegEx tokenization

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

Remove stop words

# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

Count words

# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

Index strings

# String Indexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")

Create transformation pipeline

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
Once the transformation pipeline has been fit, you can use normal classification algorithms for classifying the text.


Get label numbers for each class

from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),

Split text body into sentences

from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
.withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df

Create `part_number` for the split sentences

from pyspark.sql.window import *
from pyspark.sql.functions import row_number
data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()