train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
column_list = data.columnsprefix = "my_prefix"new_column_list = [prefix + s for s in column_list]#new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID latercolumn_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]# print(column_mapping)# data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))
## Convert `train` DataFrame to NumPypdtrain = train.toPandas()trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)y_train = pdtrain['label'].values.reshape(-1,1).ravel()## Convert `test` DataFrame to NumPypdtest = test.toPandas()testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)y_test = pdtest['label'].values.reshape(-1,1).ravel()print(y_test)
The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker
function that we will use to split the dataset up into smaller chunks.
## Define Chunking Logicimport pandas as pdimport numpy as np# Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficientlydef chunker(seq, size):return (seq[pos:pos + size] for pos in range(0, len(seq), size))
## sentiment_df_pd = sentiment_df.toPandas()
# pprint is used to format the JSON responsefrom pprint import pprintimport jsonimport requestssubscription_key = '<SUBSCRIPTIONKEY>'endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"headers = {"Ocp-Apim-Subscription-Key": subscription_key}
from pyspark.sql.types import *sentiment_schema = StructType([StructField("id", IntegerType(), True),StructField("score", FloatType(), True)])sentiments_df = spark.createDataFrame([], sentiment_schema)display(sentiments_df)
for chunk in chunker(sentiment_df_pd, 1000):print("Scoring", len(chunk), "rows.")sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)sentiments = response.json()# pprint(sentiments)sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))sentiments_df_chunk = spark.createDataFrame(sentiments_pd)sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)display(sentiments_df)sentiments_df.count()
sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")