Build Text Categorization Model with Spark NLP



Natural Language Processing is one of the important processes for data science teams across the globe. With ever-growing data, most of the organizations have already moved to big data platforms like Apache Hadoop and cloud offerings like AWS, Azure, and GCP. These platforms are more than capable of handling big data that enables organizations to perform analytics at scale for unstructured data like text categorization. But when it comes to machine learning, there is still a gap between big data systems and machine learning tools.

  1. Using PySpark to perform Transformations and Actions on RDD
  2. Complete Guide on DataFrame Operations in PySpark

Setting up the environment

Let’s go ahead and see how to set up Spark NLP on AWS EMR.

#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip
sudo python36 -m pip install pandas
sudo python36 -m pip install boto3
sudo python36 -m pip install re
sudo python36 -m pip install spark-nlp==2.4.5
import boto3region_name='region_name'def get_security_group_id(group_name, region_name):
ec2 = boto3.client('ec2', region_name=region_name)
response = ec2.describe_security_groups(GroupNames=[group_name])
return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(
Name='cluster_name', # update the value
LogUri='s3_path_for_logs', # update the value
'InstanceGroups': [
'Name': "Master nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.2xlarge', # change according to the requirement
'InstanceCount': 1 #for master node High Availabiltiy, set count more than 1
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.2xlarge', # change according to the requirement
'InstanceCount': 2
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2KeyName' : 'key_pair_name', # update the value
'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
BootstrapActions=[ {
'Path':'path_to_bootstrapaction_on_s3' # update the value
Steps = [],
{ 'Name': 'hadoop' },
{ 'Name': 'spark' },
{ 'Name': 'hive' },
{ 'Name': 'zeppelin' },
{ 'Name': 'presto' }
"Classification": "yarn-site",
"Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",
"yarn.nodemanager.pmem-check-enabled": "false",
"yarn.nodemanager.vmem-check-enabled": "false"}

"Classification": "hadoop-env",
"Configurations": [
"Classification": "export",
"Configurations": [],
"Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
"Properties": {}

"Classification": "spark-env",
"Configurations": [
"Classification": "export",
"Configurations": [],
"Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
"Properties": {}
"Classification": "spark",
"Properties": {"maximizeResourceAllocation": "true"},
"Configurations": []
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true" #default is also true

Text categorization of BBC articles using Spark NLP

Now that we have our cluster ready, let’s build a simple text categorization example on BBC data using Spark NLP and Spark MLlib.

1 — Initialize Spark

We will import the required libraries and initialize spark-session using different configuration parameters. The configuration values depend upon my local environment. Adjust the parameters accordingly.

# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from import Pipeline# Start Spark Session with Spark NLP
#spark = sparknlp.start()spark = SparkSession.builder \
.appName("BBC Text Categorization")\
.config("spark.driver.memory","8G")\ change accordingly
.config("spark.memory.offHeap.size","8G") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
.config("spark.kryoserializer.buffer.max", "1000M")\

2 — Load the Text Data

We will be using BBC data. You can download the data from this link. After downloading the data, load the data in spark using the below code;

# File location and type
file_location = r'path\to\bbc-text.csv'
file_type = "csv"# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","df = \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \

3 — Split the dataset into train and test sets

Unlike python where we use sci-kit learn to split the data, Spark Dataframe has an inbuilt function called randomSplit() to perform the same operation.

4 — NLP Pipeline using Spark NLP

Let’s go ahead and build the NLP pipeline using Spark NLP. One of the biggest advantages of Spark NLP is that it natively integrates with Spark MLLib modules that help to build a comprehensive ML pipeline consisting of transformers and estimators. This pipeline can include feature extraction modules like CountVectorizer or HashingTF and IDF. We can also include a machine learning model in this pipeline. Below is the example consisting of the NLP pipeline with feature extraction and machine learning model;

5 — Train the Model

Now that our NLP pipeline is ready, let’s train our model on training data.

# fit the pipeline on training data
pipeline_model =

6 — Perform Predictions

Once the training is done, we can predict the class labels on test (unseen) data.

# perform predictions on test data
predictions = pipeline_model.transform(testData)

7 — Evaluate the Model

Evaluating the trained model is very important to understand how the model performed on unseen data. We will look at 3 popular evaluation metrics viz. Accuracy, Precision, and Recall (also called Sensitivity).

# import evaluator
from import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedRecall")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

8 — Save the Pipeline Model

After successfully training, testing, and evaluating the model, you can save the model to disc and use it in different Spark applications. To save the model to the disc, use the below code:


Spark NLP provides a plethora of annotators and transformers to build a production-grade data pre-processing pipeline. Spark NLP seamlessly integrates with Spark MLLib that enables us to build an end-to-end Natural Language Processing Project in a distributed environment. In this article, we looked at how to install Spark NLP on AWS EMR and implemented text categorization of BBC data. We also examined different evaluation metrics in Spark MLlib and saw how to store a model for further usage.