Text Data Preparation

Tokenization and Vectorization

Load in required libraries

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
  c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
  c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
  c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
  c = regexp_replace(c, "[0-9]", "") # Remove numbers
  c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
  #c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
  return c

dataset = dataset.withColumn("text", clean_text(col("text")))

RegEx tokenization

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

Remove stop words

# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

Count words

# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

Index strings

# String Indexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")

Create transformation pipeline

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

Once the transformation pipeline has been fit, you can use normal classification algorithms for classifying the text.


Get label numbers for each class

from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),

Split text body into sentences

from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
                   .withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df

Create `part_number` for the split sentences

from pyspark.sql.window import *
from pyspark.sql.functions import row_number

data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()

Last updated