arrow-left

Only this pageAll pages
gitbookPowered by GitBook
1 of 36

Sparkitecture

Loading...

Cloud Service Integration

Loading...

Loading...

Loading...

Data Preparation

Loading...

Loading...

Loading...

Machine Learning

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Streaming Data

Loading...

Operationalization

Loading...

Loading...

Natural Language Processing

Loading...

Loading...

Bioinformatics and Genomics

Loading...

Welcome to Sparkitecture!

arrow-up-right

Created by: Colby T. Ford, Ph.D.

PySpark Edition | A work in progress... | Created using GitBook.comarrow-up-right

hashtag
About

Sparkitecture is a collection of “cookbook-style” scripts for simplifying data engineering and machine learning in Apache Spark.

circle-info

This is an open source project (GPL v3.0) for the Spark community. If you have ideas or contributions you'd like to add, submit a or a write your code/tutorial/page and create a in the GitHub repo.

hashtag
How to Cite

BibTex

Text Citation

@misc{sparkitecture,

author = {Colby T. Ford},

title = {Sparkitecture - {A} collection of "cookbook-style" scripts for simplifying data engineering and machine learning in {Apache Spark}.},

month = oct,

year = 2019,

doi = {10.5281/zenodo.3468502},

url = {https://doi.org/10.5281/zenodo.3468502}

}

Colby T. Ford. (2019, October) Sparkitecture - A collection of "cookbook-style" scripts for simplifying data engineering and machine learning in Apache Spark., (Version v1.0.0). Zenodo. http://doi.org/10.5281/zenodo.3468502arrow-up-right

Feature Requestarrow-up-right
Pull Requestarrow-up-right

Azure Storage

Storage is a managed service in Azure that provides highly available, secure, durable, scalable, and redundant storage for your data. Azure Storage includes both Blobs, Data Lake Store, and others.

triangle-exclamation

Databricks-Specific Functionality

hashtag
Mounting Blob Storage

Once you create your blob storage account in Azure, you will need to grab a couple bits of information from the Azure Portal before you mount your storage.

  • You can find your Storage Account Name (which will go in below) and your Key (which will go in below) under Access Keys in your Storage Account resource in Azure.

  • Go into your Storage Account resource in Azure and click on Blobs. Here, you will find all of your containers. Pick the one you want to mount and copy its name into below.

  • As for the mount point (/mnt/<FOLDERNAME> below), you can name this whatever you'd like, but it will help you in the long run to name it something useful along the lines of storageaccount_container.

Once you have the required bits of information, you can use the following code to mount the storage location inside the Databricks environment

You can then test to see if you can list the files in your mounted location

hashtag
Resources:

  • To learn how to create an Azure Storage service, visit

hashtag
Mounting Data Lake Storage

For finer-grained access controls on your data, you may opt to use Azure Data Lake Storage. In Databricks, you can connect to your data lake in a similar manner to blob storage. Instead of an access key, your user credentials will be passed through, therefore only showing you data that you specifically have access to.

hashtag
Pass-through Azure Active Directory Credentials

To pass in your Azure Active Directory credentials from Databricks to Azure Data Lake Store, you will need to enable this feature in Databricks under New Cluster > Advanced Options.

Note: If you create a High Concurrency cluster, multiple users can use the same cluster. The Standard cluster mode will only allow a single user's credential at a time.

Azure SQL Data Warehouse / Synapse

hashtag
Set up Azure SQL DW connection parameters

hashtag
Define a query

hashtag
Create a Spark DataFrame using the SQL DW data
dwDatabase = "<DATABASENAME>" ## The Azure SQL Data Warehouse database name
dwServer = "<DWNAME>.database.windows.net" ## The Azure SQL Server
dwUser = "<USERNAME>" ## The dedicated loading user login 
dwPass = dbutils.secrets.get(scope = "<SECRETNAME>", key = "<KEYNAME>") ## The dediciated loading user login password
dwJdbcPort =  "1433" 
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
sqlQuery = """
  SELECT *, 'AzureSqlDw' AS SourceSystem
  FROM dbo.<TABLENAME>
"""
data = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", sqlDwUrlSmall) \
  .option("tempDir", tempDir) \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", sqlQuery) \
  .load() \
  .createOrReplaceTempView("<TEMPVIEWNAME>")
  #.write.saveAsTable("<TABLENAME>")
https://docs.microsoft.com/en-us/azure/storage/arrow-up-right
DOI

Reading and Writing Data

hashtag
Reading in Data

hashtag
...from Mounted Storage

dataset = spark.read.format('csv') \
                    .options(header='true', inferSchema='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')

## or spark.read.format('csv')...
## Formats: json, parquet, jdbc, orc, libsvm, csv, text, avro

hashtag
...when Schema Inference Fails

hashtag
Writing out Data

hashtag
Other Resources

Apache Spark Data Sources Documentation:

dbutils.fs.mount(
  source = "wasbs://<CONTAINERNAME>@<STORAGEACCOUNT>.blob.core.windows.net",
  mount_point = "/mnt/<FOLDERNAME>/",
  extra_configs = {"fs.azure.account.key.<STORAGEACCOUNT>.blob.core.windows.net":"<KEYGOESHERE>"})
display(dbutils.fs.ls("/mnt/<FOLDERNAME>"))
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}
dbutils.fs.mount(
  source = "abfss://<CONTAINERNAME>@<STORAGEACCOUNT>.dfs.core.windows.net/",
  mount_point = "/mnt/<FOLDERNAME>",
  extra_configs = configs)

Regression

df.coalesce(1) \
   .write.format("com.databricks.spark.csv") \
   .option("header", "true") \
   .save("file.csv")
https://spark.apache.org/docs/latest/sql-data-sources.htmlarrow-up-right
from pyspark.sql.types import *

schema = StructType([StructField('ID', IntegerType(), True),
                     StructField('Value', DoubleType(), True),
                     StructField('Category', StringType(), True),
                     StructField('Date', DateType(), True)])

dataset = sqlContext.read.format('csv') \
                    .schema(schema) \
                    .options(header='true', delimiter= ',') \
                    .load('/mnt/<FOLDERNAME>/<FILENAME>.csv')

About Spark MLlib

MLlib is Apache Spark's scalable machine learning library.

MLlib works with Spark's APIs and with NumPy in Python and with R libraries. Since Spark excels at iterative computation, MLlib runs very fast with highly-scalable, high-quality algorithms that leverage iteration.

hashtag
Included Functionality:

hashtag
ML algorithms include:

  • Classification: logistic regression, naive Bayes,...

  • Regression: generalized linear regression, survival regression,...

  • Decision trees, random forests, and gradient-boosted trees

hashtag
ML workflow utilities include:

  • Feature transformations: standardization, normalization, hashing,...

  • ML Pipeline construction

  • Model evaluation and hyper-parameter tuning

hashtag
Other utilities include:

  • Distributed linear algebra: SVD, PCA,...

  • Statistics: summary statistics, hypothesis testing,...

hashtag
Resources

Classification

hashtag
Description:

Classification algorithms are used to identify into which classes observations of data should fall. This problem could be considered part of pattern recognition in that we use training data (historical information) to recognize patterns to predict where new data should be categorized.

hashtag
Common Use Cases:

  • Fraudulent activity detection

  • Loan default prediction

  • Spam vs. ham

hashtag
Classification Algorithms included in MLlib:

  • (both binomial and multiclass)

Azure Data Factory

hashtag
Transformation with Azure Databricks

Using Azure Databricks with Azure Data Factory, notebooks can be run from an end-to-end pipeline that contains the Validation, Copy data, and Notebook activities in Azure Data Factory.

Logistic Regression

hashtag
Setting Up a Logistic Regression Classifier

circle-info

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Decision Tree

hashtag
Setting Up a Decision Tree Classifier

circle-info

Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

Model Saving and Loading

hashtag
Model Saving

hashtag
Save model(s) to mounted storage

MLflow

MLflow is an open source library by the Databricks team designed for managing the machine learning lifecycle. It allows for the creation of projects, tracking of metrics, and model versioning.

hashtag
Install mlflow using pip

circle-exclamation

MLflow can be used in any Spark environmnet, but the automated tracking and UI of MLflow is Databricks-Specific Functionality.

Model Evaluation

hashtag
Multiclass classification evaluator

Recommendation: alternating least squares (ALS)
  • Clustering: K-means, Gaussian mixtures (GMMs),...

  • Topic modeling: latent Dirichlet allocation (LDA)

  • Frequent itemsets, association rules, and sequential pattern mining

  • ML persistence: saving and loading models and Pipelines
    Spark MLlib Websitearrow-up-right
    Getting Starting Guidearrow-up-right
    Customer segmentation
  • Benign vs. malignant tumor classification

  • and many more...

  • Gradient-boosted trees

  • Multilayer perceptron

  • Linear Support Vector Machine

  • One-vs-Rest classifier (a.k.a. One-vs-All)

  • Naïve Bayes

  • Logistic regression
    Decision trees
    Random forests
    from pyspark.mllib.evaluation import MulticlassMetrics
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    
    for model in ["lrpredictions", "nbpredictions", "rfpredictions"]:
        
        df = globals()[model]
        ########################################
        # Compute raw scores on the test set
        predictionAndLabels = df.select("prediction", "label").rdd
    
        # Instantiate metrics object
        metrics = MulticlassMetrics(predictionAndLabels)
    
        # Overall statistics
        precision = metrics.precision()
        recall = metrics.recall()
        f1Score = metrics.fMeasure()
        print("Summary Stats for: ", model)
        #print(metrics.confusionMatrix())
        print("Accuracy = %s" % evaluator.evaluate(df))
        print("Precision = %s" % precision)
        print("Recall = %s" % recall)
        print("F1 Score = %s" % f1Score)
    
        # Weighted stats
        #print("Weighted recall = %s" % metrics.weightedRecall)
        #print("Weighted precision = %s" % metrics.weightedPrecision)
        #print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
        #print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
        #print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
        print("\n")
    hashtag
    Load in required libraries

    hashtag
    Initialize Logistic Regression object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    hashtag
    Load in required libraries

    hashtag
    Initialize Decision Tree object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    hashtag
    Remove a model

    Spark MLlib models are actually a series of files in a directory. So, you will need to recursively delete the files in model's directory, then the directory itself.

    hashtag
    Score new data using a trained model

    hashtag
    Load in required libraries

    hashtag
    Load in the transformation pipeline

    hashtag
    Load in the trained model

    hashtag
    Remove unnecessary columns from the scored data

    hashtag

    Track metrics and parameters

    MLflow GitHub: https://github.com/mlflow/mlflow/arrow-up-right

    pip install mlflow
    import mlflow
    
    ## Log Parameters and Metrics from your normal MLlib run
    with mlflow.start_run():
      # Log a parameter (key-value pair)
      mlflow.log_param("alpha", 0.1)
    
      # Log a metric; metrics can be updated throughout the run
      mlflow.log_metric("AUC", 0.871827)
      mlflow.log_metric("F1", 0.726153)
      mlflow.log_metric("Precision", 0.213873)
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    lr = LogisticRegression(labelCol="label", featuresCol="features")
    lrparamGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0, 2.0])
                 .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
                 .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
                 .build())
    lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")
    # Create 5-fold CrossValidator
    lrcv = CrossValidator(estimator = lr,
                        estimatorParamMaps = lrparamGrid,
                        evaluator = lrevaluator,
                        numFolds = 5)
    lrcvModel = lrcv.fit(train)
    print(lrcvModel)
    lrpredictions = lrcvModel.transform(test)
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
    dtparamGrid = (ParamGridBuilder()
                 .addGrid(dt.maxDepth, [2, 5, 10])
                 .addGrid(dt.maxBins, [10, 20])
                 .build())
    dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    dtcv = CrossValidator(estimator = dt,
                          estimatorParamMaps = dtparamGrid,
                          evaluator = dtevaluator,
                          numFolds = 5)
    dtcvModel = dtcv.fit(train)
    print(dtcvModel)
    dtpredictions = dtcvModel.transform(test)
    print('Accuracy:', dtevaluator.evaluate(dtpredictions))
    print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)
    lrcvModel.save("/mnt/trainedmodels/lr")
    rfcvModel.save("/mnt/trainedmodels/rf")
    dtcvModel.save("/mnt/trainedmodels/dt")
    display(dbutils.fs.ls("/mnt/trainedmodels/"))
    dbutils.fs.rm("/mnt/trainedmodels/dt", True)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.functions import col, round
    from pyspark.sql.types import IntegerType, FloatType
    pipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    ## Fit the pipeline to new data
    transformeddataset = pipeline.transform(dataset)
    model = CrossValidatorModel.load("/mnt/trainedmodels/lr/")
    ## Score the data using the model
    scoreddataset = model.bestModel.transform(transformeddataset)
    ## Function to extract probability from array
    getprob = udf(lambda v:float(v[1]),FloatType())
    
    ## Select out the necessary columns
    output = scoreddataset.select(col("ID"),
                                  col("label"),
                                  col("rawPrediction"),           
                                  getprob(col("probability")).alias("probability"),
                                  col("prediction"))
    Validation ensures that your source dataset is ready for downstream consumption before you trigger the copy and analytics job.
  • Copy data duplicates the source dataset to the sink storage, which is mounted as DBFS in the Azure Databricks notebook. In this way, the dataset can be directly consumed by Spark.

  • Notebook triggers the Databricks notebook that transforms the dataset. It also adds the dataset to a processed folder or Azure SQL Data Warehouse.

  • hashtag
    Import a notebook for Transformation

    To import a Transformation notebook to your Databricks workspace:

    1. Sign in to your Azure Databricks workspace, and then select Import. Your workspace path can be different from the one shown, but remember it for later.

    2. Select Import from: URL. In the text box, enter https://adflabstaging1.blob.core.windows.net/share/Transformations.html.

    3. Now let's update the Transformation notebook with your storage connection information.

      In the imported notebook, go to command 5 as shown in the following code snippet.

      • Replace with your own storage connection information.

      • Use the storage account with the sinkdata container.

    4. Generate a Databricks access token for Data Factory to access Databricks.

      1. In your Databricks workspace, select your user profile icon in the upper right.

      2. Select User Settings.

    hashtag
    How to use this template

    1. Go to the Transformation with Azure Databricks template and create new linked services for following connections.

      • Source Blob Connection - to access the source data.

        For this exercise, you can use the public blob storage that contains the source files. Reference the following screenshot for the configuration. Use the following SAS URL to connect to source storage (read-only access):

        https://storagewithdata.blob.core.windows.net/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      • Destination Blob Connection - to store the copied data.

        In the New linked service window, select your sink storage blob.

      • Azure Databricks - to connect to the Databricks cluster.

        Create a Databricks-linked service by using the access key that you generated previously. You can opt to select an interactive cluster if you have one. This example uses the New job cluster option.

    2. Select Use this template. You'll see a pipeline created.

    hashtag
    Pipeline introduction and configuration

    In the new pipeline, most settings are configured automatically with default values. Review the configurations of your pipeline and make any necessary changes.

    1. In the Validation activity Availability flag, verify that the source Dataset value is set to SourceAvailabilityDataset that you created earlier.

    2. In the Copy data activity file-to-blob, check the Source and Sink tabs. Change settings if necessary.

      • Source tab

      • Sink tab

    3. In the Notebook activity Transformation, review and update the paths and settings as needed.

      Databricks linked service should be pre-populated with the value from a previous step, as shown:

      To check the Notebook settings:

      1. Select the Settings tab. For Notebook path

    4. Verify that the Pipeline Parameters match what is shown in the following screenshot:

    5. Connect to your datasets.

    circle-info

    In below datasets, the file path has been automatically specified in the template. If any changes required, make sure that you specify the path for both container and directory in case any connection error.

    • SourceAvailabilityDataset - to check that the source data is available.

    • SourceFilesDataset - to access the source data.

    • DestinationFilesDataset - to copy the data into the sink destination location. Use the following values:

      • Linked service - sinkBlob_LS, created in a previous step.

      • File path - sinkdata/staged_sink.

    1. Select Debug to run the pipeline. You can find the link to Databricks logs for more detailed Spark logs.

      You can also verify the data file by using Azure Storage Explorer.

    circle-info

    For correlating with Data Factory pipeline runs, this example appends the pipeline run ID from the data factory to the output folder. This helps keep track of files generated by each run.

    hashtag
    Next steps

    • Introduction to Azure Data Factoryarrow-up-right

    • Transformation with Azure Databricksarrow-up-right

    • Run a Databricks notebook with the Databricks Notebook Activity in Azure Data Factoryarrow-up-right

    Linear Regression

    hashtag
    Setting Up Linear Regression

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    hashtag
    Load in required libraries

    hashtag
    Initialize Linear Regression object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Get model information

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Decision Tree

    hashtag
    Setting Up Decision Tree Regression

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    hashtag
    Load in required libraries

    hashtag
    Initialize Decision Tree object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Naïve Bayes

    hashtag
    Setting Up a Naïve Bayes Classifier

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    hashtag
    Load in required libraries

    hashtag
    Initialize Naïve Bayes object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidatorfunction to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Random Forest

    hashtag
    Setting Up Random Forest Regression

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    hashtag
    Load in required libraries

    hashtag
    Initialize Random Forest object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Random Forest

    hashtag
    Setting Up a Random Forest Classifier

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    hashtag
    Load in required libraries

    hashtag
    Initialize Random Forest object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    Structured Streaming

    hashtag
    Read in Streaming Data

    hashtag
    Reading JSON files from storage

    from pyspark.sql.types import *
    
    inputPath = "/mnt/data/jsonfiles/"
    
    # Define your schema if it's known (rather than relying on Spark to infer the schema)
    jsonSchema = StructType([StructField("time", TimestampType(), True),
                             StructField("id", IntegerType(), True),
                             StructField("value", StringType(), True)])
    
    streamingInputDF = spark.readStream \
                            .schema(jsonSchema) \
                            .option("maxFilesPerTrigger", 1) \ # Treat a sequence of files as a stream by picking one file at a time
                            .json(inputPath)

    hashtag
    References

    • Databricks Structured Streaming:

    Gradient-Boosted Trees

    hashtag
    Setting Up Gradient-Boosted Tree Classifier

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    Gradient-Boosted Trees

    hashtag
    Setting Up Gradient-Boosted Tree Regression

    circle-info

    Note: Make sure you have your training and test data already vectorized and ready to go before you begin trying to fit the machine learning model to unprepped data.

    https://docs.databricks.com/spark/latest/structured-streaming/index.htmlarrow-up-right

    Select Generate New Token under the Access Tokens tab.

  • Select Generate.

  • Save the access token for later use in creating a Databricks linked service. The access token looks something like dapi32db32cbb4w6eee18b7d87e45exxxxxx.

    , verify that the default path is correct. You might need to browse and choose the correct notebook path.
  • Expand the Base Parameters selector and verify that the parameters match what is shown in the following screenshot. These parameters are passed to the Databricks notebook from Data Factory.

  • hashtag
    Load in required libraries

    hashtag
    Initialize Gradient-Boosted Tree object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    hashtag
    Load in required libraries

    hashtag
    Initialize Gradient-Boosted Tree object

    hashtag
    Create a parameter grid for tuning the model

    hashtag
    Define how you want the model to be evaluated

    hashtag
    Define the type of cross-validation you want to perform

    hashtag
    Fit the model to the data

    hashtag
    Score the testing dataset using your fitted model for evaluation purposes

    hashtag
    Evaluate the model

    circle-info

    Note: When you use the CrossValidator function to set up cross-validation of your models, the resulting model object will have all the runs included, but will only use the best model when you interact with the model object using other functions like evaluate or transform.

    ## Supply storageName and accessKey values  
    storageName = ""  
    accessKey = ""  
    
    ## Attempt to Mount Data Factory Data in Azure Storage
    dbutils.fs.mount(
        source = "wasbs://sinkdata\@"+storageName+".blob.core.windows.net/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.windows.net": accessKey})  
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    lr = LinearRegression(labelCol="label", featuresCol="features")
    lrparamGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
                 #  .addGrid(lr.regParam, [0.01, 0.1, 0.5])
                 .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
                 #  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .addGrid(lr.maxIter, [1, 5, 10, 20, 50])
                 #  .addGrid(lr.maxIter, [1, 5, 10])
                 .build())
    lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    lrcv = CrossValidator(estimator = lr,
                        estimatorParamMaps = lrparamGrid,
                        evaluator = lrevaluator,
                        numFolds = 5)
    lrcvModel = lrcv.fit(train)
    print(lrcvModel)
    lrcvSummary = lrcvModel.bestModel.summary
    print("Coefficient Standard Errors: " + str(lrcvSummary.coefficientStandardErrors))
    print("P Values: " + str(lrcvSummary.pValues)) # Last element is the intercept
    lrpredictions = lrcvModel.transform(test)
    print('RMSE:', lrevaluator.evaluate(lrpredictions))
    from pyspark.ml.regression import DecisionTreeRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")
    dtparamGrid = (ParamGridBuilder()
                 .addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
                 #.addGrid(dt.maxDepth, [2, 5, 10])
                 .addGrid(dt.maxBins, [10, 20, 40, 80, 100])
                 #.addGrid(dt.maxBins, [10, 20])
                 .build())
    dtevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    dtcv = CrossValidator(estimator = dt,
                          estimatorParamMaps = dtparamGrid,
                          evaluator = dtevaluator,
                          numFolds = 5)
    dtcvModel = dtcv.fit(train)
    print(dtcvModel)
    dtpredictions = dtcvModel.transform(test)
    print('RMSE:', dtevaluator.evaluate(dtpredictions))
    from pyspark.ml.classification import NaiveBayes
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    nb = NaiveBayes(labelCol="label", featuresCol="features")
    nbparamGrid = (ParamGridBuilder()
                   .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
                   .build())
    nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    nbcv = CrossValidator(estimator = nb,
                          estimatorParamMaps = nbparamGrid,
                          evaluator = nbevaluator,
                          numFolds = 5)
    nbcvModel = nbcv.fit(train)
    print(nbcvModel)
    nbpredictions = nbcvModel.transform(test)
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    rf = RandomForestRegressor(labelCol="label", featuresCol="features")
    rfparamGrid = (ParamGridBuilder()
                 #.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
                   .addGrid(rf.maxDepth, [2, 5, 10])
                 #.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
                   .addGrid(rf.maxBins, [5, 10, 20])
                 #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
                   .addGrid(rf.numTrees, [5, 20, 50])
                 .build())
    rfevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    rfcv = CrossValidator(estimator = rf,
                          estimatorParamMaps = rfparamGrid,
                          evaluator = rfevaluator,
                          numFolds = 5)
    rfcvModel = rfcv.fit(train)
    print(rfcvModel)
    rfpredictions = rfcvModel.transform(test)
    print('RMSE:', rfevaluator.evaluate(rfpredictions))
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    rfparamGrid = (ParamGridBuilder()
    
                   .addGrid(rf.maxDepth, [2, 5, 10])
    
                   .addGrid(rf.maxBins, [5, 10, 20])
    
                   .addGrid(rf.numTrees, [5, 20, 50])
                 .build())
    rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    rfcv = CrossValidator(estimator = rf,
                          estimatorParamMaps = rfparamGrid,
                          evaluator = rfevaluator,
                          numFolds = 5)
    rfcvModel = rfcv.fit(train)
    print(rfcvModel)
    rfpredictions = rfcvModel.transform(test)
    print('Accuracy:', rfevaluator.evaluate(rfpredictions))
    print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.classification import GBTClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    gb = GBTClassifier(labelCol="label", featuresCol="features")
    gbparamGrid = (ParamGridBuilder()
                 .addGrid(gb.maxDepth, [2, 5, 10])
                 .addGrid(gb.maxBins, [10, 20, 40])
                 .addGrid(gb.maxIter, [5, 10, 20])
                 .build())
    gbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
    # Create 5-fold CrossValidator
    gbcv = CrossValidator(estimator = gb,
                          estimatorParamMaps = gbparamGrid,
                          evaluator = gbevaluator,
                          numFolds = 5)
    gbcvModel = gbcv.fit(train)
    print(gbcvModel)
    gbpredictions = gbcvModel.transform(test)
    print('Accuracy:', gbevaluator.evaluate(gbpredictions))
    print('AUC:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderROC)
    print('PR:', BinaryClassificationMetrics(gbpredictions['label','prediction'].rdd).areaUnderPR)
    from pyspark.ml.regression import GBTRegressor
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator
    gb = GBTRegressor(labelCol="label", featuresCol="features")
    gbparamGrid = (ParamGridBuilder()
                 .addGrid(gb.maxDepth, [2, 5, 10])
                 .addGrid(gb.maxBins, [10, 20, 40])
                 .addGrid(gb.maxIter, [5, 10, 20])
                 .build())
    gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    # Create 5-fold CrossValidator
    gbcv = CrossValidator(estimator = gb,
                          estimatorParamMaps = gbparamGrid,
                          evaluator = gbevaluator,
                          numFolds = 5)
    gbcvModel = gbcv.fit(train)
    print(gbcvModel)
    gbpredictions = gbcvModel.transform(test)
    print('RMSE:', gbevaluator.evaluate(gbpredictions))

    Batch Scoring

    circle-info

    This section is designed for use with a data orchestration tool that can call and execute Databricks notebooks. For more information on how to set up Azure Data Factory, see: https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebookarrow-up-right.

    hashtag
    Create date parameter

    hashtag
    Connect to storage

    hashtag
    Define input schema

    hashtag
    Read in new data

    hashtag
    Load in transformation pipeline and model

    hashtag
    Score data using the model

    hashtag
    Write data back out to storage

    Model Evaluation

    hashtag
    Evaluate model performance by probability cutoff

    Note: Extract probability values using method found here - https://www.sparkitecture.io/machine-learning/model-saving-and-loading#remove-unnecessary-columns-from-the-scored-dataarrow-up-right.

    performance_df = spark.createDataFrame([(0,0,0)], ['cutoff', 'AUPR', 'AUC'])
    
    for cutoff in range(5, 95, 5):
      cutoff = (cutoff * 0.01)
      
      print('Testing cutoff = ', str(format(cutoff, '.2f')))
    
      lrpredictions_prob_temp = lrpredictions.withColumn('prediction_test', when(col('probability') >= cutoff, 1).otherwise(0).cast(DoubleType()))
      aupr_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderPR
      auc_temp = BinaryClassificationMetrics(lrpredictions_prob_temp['label', 'prediction_test'].rdd).areaUnderROC
      print('\tAUPR:', aupr_temp,'\tAUC:', auc_temp)
      performance_df_row = spark.createDataFrame([(cutoff,aupr_temp,auc_temp)], ['cutoff', 'AUPR', 'AUC'])
      performance_df = performance_df.union(performance_df_row)
    
    display(performance_df)

    hashtag
    Evaluate multiclass classification models

    hashtag
    Evaluate binary classification models

    Shaping Data with Pipelines

    hashtag
    Load in required libraries

    hashtag
    Define which columns are numerical versus categorical (and which is the label column)

    hashtag
    Set up stages

    hashtag
    Index the categorical columns and perform One Hot Encoding

    One Hot Encoding will convert a categorical column into multiple columns for each class. (This process is similar to dummy coding.)

    hashtag
    Index the label column and perform One Hot Encoding

    circle-info

    Note: If you are preparing the data for use in regression algorithms, there's no need to One Hot Encode the label column (since it should be numerical).

    hashtag
    Assemble the data together as a vector

    This step transforms all the numerical data along with the encoded categorical data into a series of vectors using the VectorAssembler function.

    hashtag
    Scale features using Normalization

    hashtag
    Set up the transformation pipeline using the stages you've created along the way

    hashtag
    Pipeline Saving and Loading

    Once your transformation pipeline has been creating on your training dataset, it's a good idea to save these transformation steps for future use. For example, we can save the pipeline so that we can equally transform new data before scoring it through a trained machine learning model. This also helps to cut down on errors when using new data that has classes (in categorical variables) or previously unused columns.

    hashtag
    Save the transformation pipeline

    hashtag
    Load in the transformation pipeline

    Feature Importance

    hashtag
    Extract important features using Gini

    ## Based on: https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/
    import pandas as pd
    
    def ExtractFeatureImportance(featureImp, dataset, featuresCol):
        list_extract = []
        for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
            list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
        varlist = pd.DataFrame(list_extract)
        varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
        return(varlist.sort_values('score', ascending = False))
      
      
    # ExtractFeatureImportance(model.stages[-1].featureImportances, dataset, "features")
    dataset_fi = ExtractFeatureImportance(model.bestModel.featureImportances, dataset, "features")
    dataset_fi = sqlContext.createDataFrame(dataset_fi)
    display(dataset_fi)

    hashtag
    Extract important features using p-values

    hashtag
    Extract coefficients from a model

    Glow

    hashtag
    About Glow

    Glow is an open-source and independent Spark library that brings even more flexibility and functionality to Azure Databricks. This toolkit is natively built on Apache Spark, enabling the scale of the cloud for genomics workflows.

    Glow allows for genomic data to work with Spark SQL. So, you can interact with common genetic data types as easily as you can play with a .csv file.

    circle-info

    API Serving

    hashtag
    Use MMLSpark

    hashtag
    Load in required libraries

    Text Data Preparation

    hashtag
    Tokenization and Vectorization

    hashtag
    Load in required libraries

    dbutils.widgets.text("varReportDate", "19000101")
    ReportDate = dbutils.widgets.get("varReportDate")
    print(ReportDate)
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
    label = "dependentvar"
    categoricalColumns = ["col1",
                         "col2"]
    
    numericalColumns = ["num1",
                        "num2"]
    
    #categoricalColumnsclassVec = ["col1classVec",
    #                              "col2classVec"]
    categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]
    ## Based on: https://stackoverflow.com/questions/42935914/how-to-map-features-from-the-output-of-a-vectorassembler-back-to-the-column-name
    lrm = model.stages[-1]
    ## Transform the data:
    transformed =  model.transform(df)
    #Extract and flatten ML attributes:
    from itertools import chain
    
    attrs = sorted(
        (attr["idx"], attr["name"]) for attr in (chain(*transformed
            .schema[lrm.summary.featuresCol]
            .metadata["ml_attr"]["attrs"].values())))
    # and map to the output:
    
    [(name, lrm.summary.pValues[idx]) for idx, name in attrs]
    # [(name, lrm.coefficients[idx]) for idx, name in attrs]
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.mllib.evaluation import MulticlassMetrics
    
    # Evaluate best model
    print('Accuracy:', lrevaluator.evaluate(lrpredictions))
    lrmetrics = MulticlassMetrics(lrpredictions['label','prediction'].rdd)
    print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
    print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))
    import pandas as pd
    
    featurelist = pd.DataFrame(dataset.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+dataset.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")
    featurelist["Coefficient"] = pd.DataFrame(model.bestModel.coefficients.toArray())
    featurelist = sqlContext.createDataFrame(featurelist)
    
    display(featurelist)
    hashtag
    Load in transformation pipeline and trained model

    hashtag
    Define username, key, and IP address

    hashtag
    Define input schema

    hashtag
    Set up streaming DataFrame

    hashtag
    Set up server

    hashtag
    Test the webservice

    circle-exclamation

    You may need to run sudo netstat -tulpn to see what port is open if you're running inside Databricks.

    Use this command to look for the port that was opened by the server.

    hashtag
    Resources:

    Microsoft MMLSpark on GitHub: https://github.com/Azure/mmlsparkarrow-up-right

    hashtag
    Remove usernames, dates, links, etc.

    hashtag
    RegEx tokenization

    hashtag
    Remove stop words

    hashtag
    Count words

    hashtag
    Index strings

    hashtag
    Create transformation pipeline

    circle-info

    Once the transformation pipeline has been fit, you can use normal classification algorithms for classifying the text.

    hashtag
    Extras

    hashtag
    Get label numbers for each class

    hashtag
    Split text body into sentences

    hashtag
    Create `part_number` for the split sentences

    storage_account_name = "mystorage"
    storage_account_access_key = ""
    
    file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
    file_type = "csv"
    
    spark.conf.set(
      "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
      storage_account_access_key)
    from pyspark.sql.types import *
    
    schema = StructType([
        StructField("ReportingDate", DateType(), True),
        StructField("id", StringType(), True),
        StructField("x1", IntegerType(), True),
        StructField("x2", DoubleType(), True)
    ])
    dataset = spark.read\
                   .format(file_type)\
                   .option("header", "true")\
                   .schema(schema)\
                   .load(file_location)
    
    ## You can avoid defining a schema by having spark infer it from your data
    ## This doesn't always work and can be slow
    #.option("inferSchema", "true")
    
    ## Fill in na's, if needed
    # dataset = dataset.na.fill(0)
    display(dataset)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import col, round
    from pyspark.ml.regression import GeneralizedLinearRegressionModel
    
    mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
    ## Transform new data using the pipeline
    mydataset = mypipeline.transform(dataset)
    ## Score new data using a trained model
    scoreddataset = mymodel.bestModel.transform(mydataset)
    
    output = scoreddataset.select(col("id"),
                                  col("ReportingDate"),
                                  col("prediction").alias("MyForecast"))
    display(output)
    fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
    output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
    for model in ["lrpredictions", "dtpredictions", "rfpredictions", "nbpredictions", "gbpredictions"]:
        df = globals()[model]
        
        tp = df[(df.label == 1) & (df.prediction == 1)].count()
        tn = df[(df.label == 0) & (df.prediction == 0)].count()
        fp = df[(df.label == 0) & (df.prediction == 1)].count()
        fn = df[(df.label == 1) & (df.prediction == 0)].count()
        a = ((tp + tn)/df.count())
        
        if(tp + fn == 0.0):
            r = 0.0
            p = float(tp) / (tp + fp)
        elif(tp + fp == 0.0):
            r = float(tp) / (tp + fn)
            p = 0.0
        else:
            r = float(tp) / (tp + fn)
            p = float(tp) / (tp + fp)
        
        if(p + r == 0):
            f1 = 0
        else:
            f1 = 2 * ((p * r)/(p + r))
        
        print("Model:", model)
        print("True Positives:", tp)
        print("True Negatives:", tn)
        print("False Positives:", fp)
        print("False Negatives:", fn)
        print("Total:", df.count())
        print("Accuracy:", a)
        print("Recall:", r)
        print("Precision: ", p)
        print("F1 score:", f1)
        print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
    print("\n")
    stages = []
    for categoricalColumn in categoricalColumns:
      print(categoricalColumn)
      ## Category Indexing with StringIndexer
      stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
      ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
      encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
      ## Add stages
      stages += [stringIndexer, encoder]
    ## Convert label into label indices using the StringIndexer
    label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
    stages += [label_stringIndexer]
    assemblerInputs = categoricalColumnsclassVec + numericalColumns
    assembler = VectorAssembler(inputCols = assemblerInputs,
                                outputCol = "features")
    stages += [assembler]
    from pyspark.ml.feature import StandardScaler
    
    scaler = StandardScaler(inputCol = "features",
                            outputCol = "scaledFeatures",
                            withStd = True,
                            withMean = True)
    stages += [scaler]
    prepPipeline = Pipeline().setStages(stages)
    pipelineModel = prepPipeline.fit(dataset)
    dataset = pipelineModel.transform(dataset)
    pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))
    from pyspark.ml import PipelineModel
    pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    dataset = pipelineModel.transform(dataset)
    display(dataset)
    from pyspark.ml.tuning import CrossValidatorModel
    from pyspark.ml import PipelineModel
    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import col, round
    
    import sys
    import numpy as np
    import pandas as pd
    import mmlspark
    from pyspark.sql.functions import col, from_json
    from pyspark.sql.types import *
    import uuid
    from mmlspark import request_to_string, string_to_response
    ## Load in the transformation pipeline
    mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
    
    ## Load in trained model
    mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
    username = "admin"
    ip = "10.0.0.4" #Internal IP
    sas_url = "" # SAS Token for your VM's Private Key in Blob
    input_schema = StructType([
      StructField("id", IntegerType(), True),
      StructField("x1", IntegerType(), True),
      StructField("x2", DoubleType(), True),
      StructField("x3", StringType(), True),
     ])
    serving_inputs = spark.readStream.continuousServer() \
                          .option("numPartitions", 1) \
                          .option("name", "http://10.0.0.4:8898/my_api") \
                          .option("forwarding.enabled", True) \
                          .option("forwarding.username", username) \
                          .option("forwarding.sshHost", ip) \
                          .option("forwarding.keySas", sas_url) \
                          .address("localhost", 8898, "my_api") \
                          .load()\
                          .parseRequest(input_schema)
    
    mydataset = mypipeline.transform(serving_inputs)
    
    serving_outputs = mymodel.bestModel.transform(mydataset) \
      .makeReply("prediction")
    
    # display(serving_inputs)
    server = serving_outputs.writeStream \
        .continuousServer() \
        .trigger(continuous="1 second") \
        .replyTo("my_api") \
        .queryName("my_query") \
        .option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
        .start()
    import requests
    data = u'{"id":0,"x1":1,"x2":2.0,"x3":"3"}'
    
    #r = requests.post(data=data, url="http://localhost:8898/my_api") # Locally
    r = requests.post(data=data, url="http://102.208.216.32:8902/my_api") # Via the VM IP
    
    print("Response {}".format(r.text))
    from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
    
    def clean_text(c):
      c = lower(c)
      c = regexp_replace(c, "(https?\://)\S+", "") # Remove links
      c = regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
      c = regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
      c = regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
      c = regexp_replace(c, "[0-9]", "") # Remove numbers
      c = regexp_replace(c, "\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
      #c = regexp_replace(c, "(@[A-Za-z0-9_]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", "")
      return c
    
    dataset = dataset.withColumn("text", clean_text(col("text")))
    regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
    # Add Stop words
    add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
    
    stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
    # Bag of Words Count
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
    # String Indexer
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
    label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
    from pyspark.ml import Pipeline
    
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
    
    # Fit the pipeline to training documents.
    pipelineFit = pipeline.fit(data)
    dataset = pipelineFit.transform(data)
    from pyspark.sql import *
    from pyspark.sql.functions import col
    labelset = dataset.select(col("class"),
                              col("label")).distinct()
    display(labelset)
    from pyspark.sql.types import *
    from pyspark.sql.window import *
    from pyspark.sql.functions import col, split, explode, row_number
    # Split text by sentence and convert to array
    array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
      
    # Explode array into separate rows in the dataset
    split_df = array_df.withColumn("text", explode(col("text")))\
                       .withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
    data = split_df
    display(data)
    from pyspark.sql.window import *
    from pyspark.sql.functions import row_number
    
    data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()

    Learn more about Project Glow at projectglow.ioarrow-up-right.

    Read the full documentation: glow.readthedocs.ioarrow-up-right

    hashtag
    Features:

    • Genomic datasources: To read datasets in common file formats such as VCF, BGEN, and Plink into Spark DataFrames.

    • Genomic functions: Common operations such as computing quality control statistics, running regression tests, and performing simple transformations are provided as Spark functions that can be called from Python, SQL, Scala, or R.

    • Data preparation building blocks: Glow includes transformations such as variant normalization and lift over to help produce analysis ready datasets.

    • Integration with existing tools: With Spark, you can write user-defined functions (UDFs) in Python, R, SQL, or Scala. Glow also makes it easy to run DataFrames through command line tools.

    • Integration with other data types: Genomic data can generate additional insights when joined with data sets such as electronic health records, real world evidence, and medical images. Since Glow returns native Spark SQL DataFrames, its simple to join multiple data sets together.

    hashtag
    How To Install

    circle-exclamation

    If you're using Databricks, make sure you enable the Databricks Runtime for Genomicsarrow-up-right. Glow is already included and configured in this runtime.

    hashtag
    pip Installation

    Using pip, install by simply running pip install glow.py and then start the Spark shellarrow-up-right with the Glow maven package.

    hashtag
    Maven Installation

    Install the maven package io.project:glow_2.11:${version} and optionally the Python frontend glow.py. Set the Spark configuration spark.hadoop.io.compression.codecs to io.projectglow.sql.util.BGZFCodec in order to read and write BGZF-compressed files.

    hashtag
    Load in Glow

    hashtag
    Read in Data

    hashtag
    Summary Statistics and Quality Control

    hashtag
    Split Multiallelic Variants to Biallelic

    hashtag
    Write out Data

    ./bin/pyspark --packages io.projectglow:glow_2.11:0.5.0
     --conf spark.hadoop.io.compression.codecs=io.projectglow.sql.util.BGZFCodec
    import glow
    glow.register(spark)
    vcf_path = "/databricks-datasets/genomics/1kg-vcfs/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz"
    
    df = spark.read.format("vcf")\
              .option("includeSampleIds", False)\
              .option("flattenInfoFields", False)\
              .load(vcf_path)\
              .withColumn("first_genotype", expr("genotypes[0]"))
              
    # bgen_path = "/databricks-datasets/genomics/1kg-bgens/1kg_chr22.bgen"
    
    # df = spark.read.format("bgen") \
    #           .load(bgen_path)
    df = df.withColumn("hardyweinberg", expr("hardy_weinberg(genotypes)")) \
           .withColumn("summarystats", expr("call_summary_stats(genotypes)")) \
           .withColumn("depthstats", expr("dp_summary_stats(genotypes)")) \
           .withColumn("genotypequalitystats", expr("gq_summary_stats(genotypes)")) \
           .filter(col("qual") >= 98) \
           .filter((col("start") >= 16000000) & (col("end") >= 16050000)) \
           .where((col("alleleFrequencies").getItem(0) >= allele_freq_cutoff) & 
                  (col("alleleFrequencies").getItem(0) <= (1.0 - allele_freq_cutoff))) \
           .withColumn("log10pValueHwe", when(col("pValueHwe") == 0, 26).otherwise(-log10(col("pValueHwe"))))
    split_df = glow.transform("split_multiallelics", df)
    df.coalesce(1) \
      .write \
      .mode("overwrite") \
      .format("vcf") \
      .save("/tmp/vcf_output")

    Other Common Tasks

    hashtag
    Split Data into Training and Test Datasets

    hashtag
    Rename all columns

    hashtag
    Convert PySpark DataFrame to NumPy array

    hashtag
    Call Cognitive Service API using PySpark

    hashtag
    Create `chunker` function

    The cognitive service APIs can only take a limited number of observations at a time (1,000, to be exact) or a limited amount of data in a single call. So, we can create a chunker function that we will use to split the dataset up into smaller chunks.

    hashtag
    Convert Spark DataFrame to Pandas

    hashtag
    Set up API requirements

    hashtag
    Create DataFrame for incoming scored data

    hashtag
    Loop through chunks of the data and call the API

    hashtag
    Write the results out to mounted storage

    hashtag
    Find All Columns of a Certain Type

    hashtag
    Change a Column's Type

    hashtag
    Generate StructType Schema Printout (Manual Execution)

    hashtag
    Generate StructType Schema from List (Automatic Execution)

    hashtag
    Make a DataFrame of Consecutive Dates

    hashtag
    Unpivot a DataFrame Dynamically (Longer)

    Pivot a wide dataset into a longer form. (Similar to the function from the tidyr R package or the method from pandas.)

    train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)
    column_list = data.columns
    prefix = "my_prefix"
    new_column_list = [prefix + s for s in column_list]
    #new_column_list = [prefix + s if s != "ID" else s for s in column_list] ## Use if you plan on joining on an ID later
     
    column_mapping = [[o, n] for o, n in zip(column_list, new_column_list)]
    
    # print(column_mapping)
    
    data = data.select(list(map(lambda old, new: col(old).alias(new),*zip(*column_mapping))))
    pivot_longer()arrow-up-right
    .wide_to_longarrow-up-right
    ## Convert `train` DataFrame to NumPy
    pdtrain = train.toPandas()
    trainseries = pdtrain['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
    X_train = np.apply_along_axis(lambda x : x[0], 1, trainseries)
    y_train = pdtrain['label'].values.reshape(-1,1).ravel()
    
    ## Convert `test` DataFrame to NumPy
    pdtest = test.toPandas()
    testseries = pdtest['features'].apply(lambda x : np.array(x.toArray())).as_matrix().reshape(-1,1)
    X_test = np.apply_along_axis(lambda x : x[0], 1, testseries)
    y_test = pdtest['label'].values.reshape(-1,1).ravel()
    
    print(y_test)
    ## Define Chunking Logic
    import pandas as pd
    import numpy as np
    # Based on: https://stackoverflow.com/questions/25699439/how-to-iterate-over-consecutive-chunks-of-pandas-dataframe-efficiently
    def chunker(seq, size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))
    ## sentiment_df_pd = sentiment_df.toPandas()
    # pprint is used to format the JSON response
    from pprint import pprint
    import json
    import requests
    
    subscription_key = '<SUBSCRIPTIONKEY>'
    endpoint = 'https://<SERVICENAME>.cognitiveservices.azure.com'
    sentiment_url = endpoint + "/text/analytics/v2.1/sentiment"
    headers = {"Ocp-Apim-Subscription-Key": subscription_key}
    from pyspark.sql.types import *
    
    sentiment_schema = StructType([StructField("id", IntegerType(), True),
                                   StructField("score", FloatType(), True)])
    
    sentiments_df = spark.createDataFrame([], sentiment_schema)
    
    display(sentiments_df)
    for chunk in chunker(sentiment_df_pd, 1000):
      print("Scoring", len(chunk), "rows.")
      sentiment_df_json = json.loads('{"documents":' + chunk.to_json(orient='records') + '}')
      
      response = requests.post(sentiment_url, headers = headers, json = sentiment_df_json)
      sentiments = response.json()
      # pprint(sentiments)
      
      sentiments_pd = pd.read_json(json.dumps(sentiments['documents']))
      sentiments_df_chunk = spark.createDataFrame(sentiments_pd)
      sentiments_df = sentiments_df.unionAll(sentiments_df_chunk)
      
    display(sentiments_df)
    sentiments_df.count()
    sentiments_df.coalesce(1).write.csv("/mnt/textanalytics/sentimentanalysis/")
    import pandas as pd
    def get_nonstring_cols(df):
        types = spark.createDataFrame(pd.DataFrame({'Column': df.schema.names, 'Type': [str(f.dataType) for f in df.schema.fields]}))
        result = types.filter(col('Type') != 'StringType').select('Column').rdd.flatMap(lambda x: x).collect()
        return result
        
    get_nonstring_cols(df)
    from pyspark.sql.types import *
    from pyspark.sql.functions import col
    
    df = df.withColumn('col1', col('col1').cast(IntegerType()))
    ## Fill in list with your desired column names
    cols = ["col1", "col2", "col3"]
    i = 1
    
    for col in cols:
        if i == 1:
            print("schema = StructType([")
            print("\tStructField('" + col +  "', StringType(), True),")
        
        elif i == len(cols):
            print("\tStructField('" + col +  "', StringType(), True)])")
            
        else:
            print("\tStructField('" + col +  "', StringType(), True),")
        
        i += 1
        
    ## Once the output has printed, copy and paste into a new cell
    ## and change column types and nullability
    """
    Struct Schema Creator for PySpark
    
    [<Column Name>, <Column Type>, <Column Nullable>]
    
    Types:  binary, boolean, byte, date,
            double, integer, long, null,
            short, string, timestamp, unknown
    """
    from pyspark.sql.types import *
    
    ## Fill in with your desired column names, types, and nullability
    cols = [["col1", "string", False],
            ["col2", "date", True],
            ["col3", "integer", True]]
    
    ## Loop to build list of StructFields
    schema_set = ["schema = StructType(["]
    
    for i, col in enumerate(cols):
        colname = col[0]
        coltype = col[1].title() + "Type()"
        colnull = col[2]
        
        if i == len(cols)-1:
            iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + ")])"
        else:
            iter_structfield = "StructField('" + colname +  "', " + coltype + ", " + str(colnull) + "),"
        
        schema_set.append(iter_structfield)
    
    ## Convert list to single string
    schema_string = ''.join(map(str, schema_set))
    
    ## This will execute the generated command string
    exec(schema_string)
    from pyspark.sql.functions import sequence, to_date, explode, col
    date_dim = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2019-12-31'), interval 1 day) as DATE").withColumn("DATE", explode(col("DATE")))
    display(date_dim)
    ## UnpivotDF Function
    def UnpivotDF(df, columns, pivotCol, unpivotColName, valueColName):
      columnsValue = list(map(lambda x: str("'") + str(x) + str("',")  + str(x), columns))
      stackCols = ','.join(x for x in columnsValue)
    
      df_unpvt = df.selectExpr(pivotCol, f"stack({str(len(columns))}, {stackCols}) as ({unpivotColName}, {valueColName})")\
                   .select(pivotCol, unpivotColName, valueColName)
      
      return(df_unpvt)
    df_unpvt = UnpivotDF(df = df,
                         columns = df.columns[1:], ## The columns to transpose into a single, longer column
                         pivotCol = "ID", ## The column to leave in place (usually an ID)
                         unpivotColName = "Category", ## The name of the new column
                         valueColName = "value") ## The name of the column of values