Other Common Tasks

Split Data into Training and Test Datasets

1
train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
Copied!

Rename all columns

1
column_list = data.columns
2
prefix = "my_prefix"
3
new_column_list = [prefix + s for s in column_list]
4
#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
5
6
column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]
7
8
# print(column_mapping)
9
10
data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))
Copied!

Convert PySpark DataFrame to NumPy array

1
## Convert `train` DataFrame to NumPy
2
pdtrain = train.toPandas()
3
trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
4
X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
5
y_train = pdtrain['label'].values.reshape(-1,1).ravel()
6
7
## Convert `test` DataFrame to NumPy
8
pdtest = test.toPandas()
9
testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
10
X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
11
y_test = pdtest['label'].values.reshape(-1,1).ravel()
12
13
print(y_test)
Copied!

Call Cognitive Service API using PySpark

Create `chunker` function

The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker function that we will use to split the dataset up into smaller chunks.
1
## Define Chunking Logic
2
import pandas as pd
3
import numpy as np
4
# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
5
def chunker(seq, size):
6
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
Copied!

Convert Spark DataFrame to Pandas

1
## sentiment_df_pd = sentiment_df.toPandas()
Copied!

Set up API requirements

1
# pprint is used to format the JSON response
2
from pprint import pprint
3
import json
4
import requests
5
6
subscription_key = '<SUBSCRIPTIONKEY>'
7
endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
8
sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
9
headers = {"Ocp-Apim-Subscription-Key": subscription_key}
Copied!

Create DataFrame for incoming scored data

1
from pyspark.sql.types import *
2
3
sentiment_schema = StructType([StructField("id", IntegerType(), True),
4
StructField("score", FloatType(), True)])
5
6
sentiments_df = spark.createDataFrame([], sentiment_schema)
7
8
display(sentiments_df)
Copied!

Loop through chunks of the data and call the API

1
for chunk in chunker(sentiment_df_pd, 1000):
2
print("Scoring", len(chunk), "rows.")
3
sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
4
5
response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
6
sentiments = response.json()
7
# pprint(sentiments)
8
9
sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
10
sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
11
sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
12
13
display(sentiments_df)
14
sentiments_df.count()
Copied!

Write the results out to mounted storage

1
sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")
Copied!

Find All Columns of a Certain Type

1
import pandas as pd
2
def get_nonstring_cols(df):
3
types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
4
result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
5
return result
6
7
get_nonstring_cols(df)
Copied!

Change a Column's Type

1
from pyspark.sql.types import *
2
from pyspark.sql.functions import col
3
4
df = df.withColumn('col1', col('col1').cast(IntegerType()))
Copied!

Generate StructType Schema Printout (Manual Execution)

1
## Fill in list with your desired column names
2
cols = ["col1", "col2", "col3"]
3
i = 1
4
5
for col in cols:
6
if i == 1:
7
print("schema = StructType([")
8
print("\tStructField('" + col + "', StringType(), True),")
9
10
elif i == len(cols):
11
print("\tStructField('" + col + "', StringType(), True)])")
12
13
else:
14
print("\tStructField('" + col + "', StringType(), True),")
15
16
i += 1
17
18
## Once the output has printed, copy and paste into a new cell
19
## and change column types and nullability
Copied!

Generate StructType Schema from List (Automatic Execution)

1
"""
2
Struct Schema Creator for PySpark
3
4
[<Column Name>, <Column Type>, <Column Nullable>]
5
6
Types: binary, boolean, byte, date,
7
double, integer, long, null,
8
short, string, timestamp, unknown
9
"""
10
from pyspark.sql.types import *
11
12
## Fill in with your desired column names, types, and nullability
13
cols = [["col1", "string", False],
14
["col2", "date", True],
15
["col3", "integer", True]]
16
17
## Loop to build list of StructFields
18
schema_set = ["schema = StructType(["]
19
20
for i, col in enumerate(cols):
21
colname = col[0]
22
coltype = col[1].title() + "Type()"
23
colnull = col[2]
24
25
if i == len(cols)-1:
26
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + ")])"
27
else:
28
iter_structfield = "StructField('" + colname + "', " + coltype + ", " + str(colnull) + "),"
29
30
schema_set.append(iter_structfield)
31
32
## Convert list to single string
33
schema_string = ''.join(map(str, schema_set))
34
35
## This will execute the generated command string
36
exec(schema_string)
Copied!

Make a DataFrame of Consecutive Dates

1
from pyspark.sql.functions import sequence, to_date, explode, col
2
date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
3
display(date_dim)
Copied!

Unpivot a DataFrame Dynamically (Longer)

Pivot a wide dataset into a longer form. (Similar to the pivot_longer() function from the tidyr R package or the .wide_to_long method from pandas.)
1
## UnpivotDF Function
2
def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
3
columnsValue = list(map(lambda x: str("'") + str(x) + str("',") + str(x), columns))
4
stackCols = ','.join(x for x in columnsValue)
5
6
df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
7
.select(pivotCol, unpivotColName, valueColName)
8
9
return(df_unpvt)
Copied!
1
df_unpvt = UnpivotDF(df = df,
2
columns = df.columns[1:], ## The columns to transpose into a single, longer column
3
pivotCol = "ID", ## The column to leave in place (usually an ID)
4
unpivotColName = "Category", ## The name of the new column
5
valueColName = "value") ## The name of the column of values
Copied!