Build Text Categorization Model with Spark NLP

Satish Silveri
7 min readJul 8, 2020

Overview

Introduction

Natural Language Processing is one of the important processes for data science teams across the globe. With ever-growing data, most of the organizations have already moved to big data platforms like Apache Hadoop and cloud offerings like AWS, Azure, and GCP. These platforms are more than capable of handling big data that enables organizations to perform analytics at scale for unstructured data like text categorization. But when it comes to machine learning, there is still a gap between big data systems and machine learning tools.

Popular machine learning python libraries like sci-kit-learn and Gensim are highly optimized to perform on single node machines and not designed for distributed environments. Apache Spark MLlib is one of the many tools that help to bridge this gap by offering most machine learning models like Linear Regression, Logistic Regression, SVM, Random Forest, K-means, LDA, and many more to carry out the most common machine learning tasks.

  1. Comprehensive Introduction to Apache Spark, RDDs & Dataframes (using PySpark)
  2. Using PySpark to perform Transformations and Actions on RDD
  3. Complete Guide on DataFrame Operations in PySpark

If you are new to Spark, I strongly recommend you to go through the below articles:

Apart from machine learning algorithms, Spark MLlib also offers a plethora of feature transformers like Tokenizer, StopWordRemover, n-grams, and features extractors like CountVectorizer, TF-IDF, and Word2Vec. Although these transformers and extractors are sufficient to build a basic NLP pipeline but to build a more comprehensive and production-grade pipeline, we need more advanced techniques like stemming, lemmatization, Part-of-speech tagging, and Named Entity Recognition.

John Snow Labs Spark NLP offers a variety of annotators to perform advanced NLP tasks. For more information, check out the list of annotator and their usage on the website

https://nlp.johnsnowlabs.com/docs/en/annotators.

Setting up the environment

Let’s go ahead and see how to set up Spark NLP on AWS EMR.

1. Before we spin up the EMR cluster, we need to create a bootstrap action. Bootstrap actions are used to set up additional software or customize the configuration of cluster nodes. Following is the bootstrap action that can be used to set up Spark NLP on the EMR cluster,

#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip
#
sudo python36 -m pip install pandas
#
sudo python36 -m pip install boto3
#
sudo python36 -m pip install re
#
sudo python36 -m pip install spark-nlp==2.4.5

Once you create the shell script, copy this script to a location in AWS S3. You can also install additional python packages as per your requirement.

2. We can spin up the EMR cluster using AWS console, API, or boto3 library in python. The advantage of using Python is that you can reuse the code whenever you want to instantiate the cluster or add it to workflows.

Following is the python code to instantiate an EMR cluster.

import boto3region_name='region_name'def get_security_group_id(group_name, region_name):
ec2 = boto3.client('ec2', region_name=region_name)
response = ec2.describe_security_groups(GroupNames=[group_name])
return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(
Name='cluster_name', # update the value
ReleaseLabel='emr-5.27.0',
LogUri='s3_path_for_logs', # update the value
Instances={
'InstanceGroups': [
{
'Name': "Master nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.2xlarge', # change according to the requirement
'InstanceCount': 1 #for master node High Availabiltiy, set count more than 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.2xlarge', # change according to the requirement
'InstanceCount': 2
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName' : 'key_pair_name', # update the value
'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
},
BootstrapActions=[ {
'Name':'install_dependencies',
'ScriptBootstrapAction':{
'Args':[],
'Path':'path_to_bootstrapaction_on_s3' # update the value
}
}],
Steps = [],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
Applications=[
{ 'Name': 'hadoop' },
{ 'Name': 'spark' },
{ 'Name': 'hive' },
{ 'Name': 'zeppelin' },
{ 'Name': 'presto' }
],
Configurations=[
# YARN
{
"Classification": "yarn-site",
"Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",
"yarn.nodemanager.pmem-check-enabled": "false",
"yarn.nodemanager.vmem-check-enabled": "false"}
},

# HADOOP
{
"Classification": "hadoop-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
}
],
"Properties": {}
},

# SPARK
{
"Classification": "spark-env",
"Configurations": [
{
"Classification": "export",
"Configurations": [],
"Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
}
],
"Properties": {}
},
{
"Classification": "spark",
"Properties": {"maximizeResourceAllocation": "true"},
"Configurations": []
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true" #default is also true
}
}
]
)

Note: Ensure that you have proper access to S3 bucket(s) that are used for logging and for storing bootstrap action script.

Text categorization of BBC articles using Spark NLP

Now that we have our cluster ready, let’s build a simple text categorization example on BBC data using Spark NLP and Spark MLlib.

1 — Initialize Spark

We will import the required libraries and initialize spark-session using different configuration parameters. The configuration values depend upon my local environment. Adjust the parameters accordingly.

# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline# Start Spark Session with Spark NLP
#spark = sparknlp.start()spark = SparkSession.builder \
.appName("BBC Text Categorization")\
.config("spark.driver.memory","8G")\ change accordingly
.config("spark.memory.offHeap.enabled",True)\
.config("spark.memory.offHeap.size","8G") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
.config("spark.kryoserializer.buffer.max", "1000M")\
.config("spark.network.timeout","3600s")\
.getOrCreate()

2 — Load the Text Data

We will be using BBC data. You can download the data from this link. After downloading the data, load the data in spark using the below code;

# File location and type
file_location = r'path\to\bbc-text.csv'
file_type = "csv"# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)df.count()

3 — Split the dataset into train and test sets

Unlike python where we use sci-kit learn to split the data, Spark Dataframe has an inbuilt function called randomSplit() to perform the same operation.

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

The randomSplit() function requires 2 parameters viz. weights array and seed. For our example, we will be using a 70–30 split where 70% will be the training data and 30% will be the test data.

4 — NLP Pipeline using Spark NLP

Let’s go ahead and build the NLP pipeline using Spark NLP. One of the biggest advantages of Spark NLP is that it natively integrates with Spark MLLib modules that help to build a comprehensive ML pipeline consisting of transformers and estimators. This pipeline can include feature extraction modules like CountVectorizer or HashingTF and IDF. We can also include a machine learning model in this pipeline. Below is the example consisting of the NLP pipeline with feature extraction and machine learning model;

5 — Train the Model

Now that our NLP pipeline is ready, let’s train our model on training data.

# fit the pipeline on training data
pipeline_model = nlp_pipeline.fit(trainingData)

6 — Perform Predictions

Once the training is done, we can predict the class labels on test (unseen) data.

# perform predictions on test data
predictions = pipeline_model.transform(testData)

7 — Evaluate the Model

Evaluating the trained model is very important to understand how the model performed on unseen data. We will look at 3 popular evaluation metrics viz. Accuracy, Precision, and Recall (also called Sensitivity).

7.1 — Accuracy

# import evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

7.2 — Precision

evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

7.3 — Recall

evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedRecall")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

Depending on the business use case, you can decide which metric to use for evaluating the model. E.g. If a machine learning model is designed to detect cancer based on certain parameters, it’s better to use recall or sensitivity because the company cannot afford false negatives (a person having cancer but the model did not detect it) whereas if a machine learning model is designed to generate users recommendations, the company can afford a few false negatives (8 out of 10 recommendations match the user profile) and hence can use precision as the evaluation metric.

8 — Save the Pipeline Model

After successfully training, testing, and evaluating the model, you can save the model to disc and use it in different Spark applications. To save the model to the disc, use the below code:

pipeline_model.save(‘/path/to/storage_location’)

Conclusion

Spark NLP provides a plethora of annotators and transformers to build a production-grade data pre-processing pipeline. Spark NLP seamlessly integrates with Spark MLLib that enables us to build an end-to-end Natural Language Processing Project in a distributed environment. In this article, we looked at how to install Spark NLP on AWS EMR and implemented text categorization of BBC data. We also examined different evaluation metrics in Spark MLlib and saw how to store a model for further usage.

I hope you enjoyed the article. Keep learning!

--

--