11 min read

Recommendation systems can be defined as software applications that draw out and learn from data such as preferences, their actions (clicks, for example), browsing history, and generated recommendations, which are products that the system determines are appealing to the user in the immediate future. In this tutorial, we will learn to build a recommendation system with Scala and Apache Spark.

This article is an excerpt taken from Modern Scala Projects written Ilango Gurusamy.

What does a recommendation system look like

The following diagram is representative of a typical recommendation system:

Recommendation system

In the preceding diagram, can be thought of as a recommendation ecosystem, where the recommendation system is at the heart of it. This system needs three entities:

  • Users
  • Products
  • Transactions between users and products where transactions contain feedback from users about products

Implementation and deployment

Implementation is documented in the following subsections. All code is developed in an Intellij code editor. The very first step is to create an empty Scala project called Chapter7.

Step 1 – creating the Scala project

Let’s create a Scala project called Chapter7 with the following artifacts:

  • RecommendationSystem.scala
  • RecommendationWrapper.scala

Let’s break down the project’s structure:

  • .idea: Generated IntelliJ configuration files.
  • project: Contains build.properties and plugins.sbt.
  • project/assembly.sbt: This file specifies the sbt-assembly plugin needed to build a fat JAR for deployment.
  • src/main/scala: This is a folder that houses Scala source files in the com.packt.modern.chapter7 package.
  • target: This is where artifacts of the compile process are stored. The generated assembly JAR file goes here.
  • build.sbt: This is the main SBT configuration file. Spark and its dependencies are specified here.

At this point, we will start developing code in the IntelliJ code editor. We will start with the AirlineWrapper Scala file and end with the deployment of the final application JAR into Spark with spark-submit.

Step 2 – creating the AirlineWrapper definition

Let’s create the trait definition. The trait will hold the SparkSession variable, schema definitions for the datasets, and methods to build a dataframe:

trait RecWrapper {  }

Next, let’s create a schema for past weapon sales orders.

Step 3 – creating a weapon sales orders schema

Let’s create a schema for the past sales order dataset:

val salesOrderSchema: StructType = StructType(Array(
  StructField("sCustomerId", IntegerType,false),
  StructField("sCustomerName", StringType,false),
  StructField("sItemId", IntegerType,true),
  StructField("sItemName",  StringType,true),
  StructField("sItemUnitPrice",DoubleType,true),
  StructField("sOrderSize", DoubleType,true),
  StructField("sAmountPaid",  DoubleType,true)
))

Next, let’s create a schema for weapon sales leads.

Step 4 – creating a weapon sales leads schema

Here is a schema definition for the weapon sales lead dataset:

val salesLeadSchema: StructType = StructType(Array(
  StructField("sCustomerId", IntegerType,false),
  StructField("sCustomerName", StringType,false),
  StructField("sItemId", IntegerType,true),
  StructField("sItemName",  StringType,true)
))

Next, let’s build a weapon sales order dataframe.

Step 5 – building a weapon sales order dataframe

Let’s invoke the read method on our SparkSession instance and cache it. We will call this method later from the RecSystem object:

def buildSalesOrders(dataSet: String): DataFrame = {
  session.read
    .format("com.databricks.spark.csv")
    .option("header", true).schema(salesOrderSchema).option("nullValue", "")
    .option("treatEmptyValuesAsNulls", "true")
    .load(dataSet).cache()
}

Next up, let’s build a sales leads dataframe:

def buildSalesLeads(dataSet: String): DataFrame = {
  session.read
    .format("com.databricks.spark.csv")
    .option("header", true).schema(salesLeadSchema).option("nullValue", "")
    .option("treatEmptyValuesAsNulls", "true")
    .load(dataSet).cache()
}

This completes the trait. Overall, it looks like this:

trait RecWrapper {

1) Create a lazy SparkSession instance and call it session.
2) Create a schema for the past sales orders dataset
3) Create a schema for sales lead dataset
4) Write a method to create a dataframe that holds past sales order
data. This method takes in sales order dataset and
returns a dataframe
5) Write a method to create a dataframe that holds lead sales data

}

Bring in the following imports:

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

Create a Scala object called RecSystem:

object RecSystem extends App with RecWrapper {   }

Before going any further, bring in the following imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame

Inside this object, start by loading the past sales order data. This will be our training data. Load the sales order dataset, as follows:

val salesOrdersDf = buildSalesOrders("sales\\PastWeaponSalesOrders.csv")

Verify the schema. This is what the schema looks like:

salesOrdersDf.printSchema()
root
 |-- sCustomerId: integer (nullable = true)
 |-- sCustomerName: string (nullable = true)
 |-- sItemId: integer (nullable = true)
 |-- sItemName: string (nullable = true)
 |-- sItemUnitPrice: double (nullable = true)
 |-- sOrderSize: double (nullable = true)
 |-- sAmountPaid: double (nullable = true)

Here is a partial view of a dataframe displaying past weapon sales order data:

Partial view of dataframe displaying past weapon sales order data

Now, we have what we need to create a dataframe of ratings:

 val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder =>
 Rating( salesOrder.getInt(0),
 salesOrder.getInt(2),
 salesOrder.getDouble(6)
 ) ).toDF("user", "item", "rating")

Save all and compile the project at the command line:

C:\Path\To\Your\Project\Chapter7>sbt compile

You are likely to run into the following error:

[error] C:\Path\To\Your\Project\Chapter7\src\main\scala\com\packt\modern\chapter7\RecSystem.scala:50:50: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder =>
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed

To fix this, place the following statement at the top of the declarations of the rating dataframe. It should look like this:

import session.implicits._
 val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder => UserRating( salesOrder.getInt(0), salesOrder.getInt(2), salesOrder.getDouble(6) ) ).toDF("user", "item", "rating")

Save and recompile the project. This time, it compiles just fine. Next, import the Rating class from the org.apache.spark.mllib.recommendation package. This transforms the rating dataframe that we obtained previously to its RDD equivalent:

val ratings: RDD[Rating] = ratingsDf.rdd.map( row => Rating( row.getInt(0), row.getInt(1), row.getDouble(2) ) )
 println("Ratings RDD is: " + ratings.take(10).mkString(" ") )

The following few lines of code are very important. We will be using the ALS algorithm from Spark MLlib to create and train a MatrixFactorizationModel, which takes an RDD[Rating] object as input. The ALS train method may require a combination of the following training hyperparameters:

  • numBlocks: Preset to -1 in an auto-configuration setting. This parameter is meant to parallelize computation.
  • custRank: The number of features, otherwise known as latent factors.
  • iterations: This parameter represents the number of iterations for ALS to execute. For a reasonable solution to converge on, this algorithm needs roughly 20 iterations or less.
  • regParam: The regularization parameter.
  • implicitPrefs: This hyperparameter is a specifier. It lets us use either of the following:
    • Explicit feedback
    • Implicit feedback
  • alpha: This is a hyperparameter connected to an implicit feedback variant of the ALS algorithm. Its role is to govern the baseline confidence in preference observations.

We just explained the role played by each parameter needed by the ALS algorithm’s train method.

Let’s get started by bringing in the following imports:

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

Now, let’s get down to training the matrix factorization model using the ALS algorithm.
Let’s train a matrix factorization model given an RDD of ratings by customers (users) for certain items (products). Our train method on the ALS algorithm will take the following four parameters:

  • Ratings.
  • A rank.
  • A number of iterations.
  • A Lambda value or regularization parameter:
val ratingsModel: MatrixFactorizationModel = ALS.train(ratings, 
   6, /* THE RANK */ 
  10, /* Number of iterations */
  15.0 /* Lambda, or regularization parameter */
 )

Next, we load the sales lead file and convert it into a tuple format:

 val weaponSalesLeadDf = buildSalesLeads("sales\\ItemSalesLeads.csv")

In the next section, we will display the new weapon sales lead dataframe.

Step 6 – displaying the weapons sales dataframe

First, we must invoke the show method:

println("Weapons Sales Lead dataframe is: ")
weaponSalesLeadDf.show

 

Here is a view of the weapon sales lead dataframe:

View of weapon sales lead dataframe

Next, create a version of the sales lead dataframe structured as (customer, item) tuples:

val customerWeaponsSystemPairDf: DataFrame = weaponSalesLeadDf.map(salesLead => ( salesLead.getInt(0), salesLead.getInt(2) )).toDF("user","item")

In the next section, let’s display the dataframe that we just created.

Step 7 – displaying the customer-weapons-system dataframe

Let’s the show method, as follows:

println("The Customer-Weapons System dataframe as tuple pairs looks like: ")
customerWeaponsSystemPairDf.show

 

Here is a screenshot of the new customer-weapons-system dataframe as tuple pairs:

New customer-weapons-system dataframe as tuple pairs

Next, we will convert the preceding dataframe into an RDD:

val customerWeaponsSystemPairRDD: RDD[(Int, Int)] = customerWeaponsSystemDf.rdd.map(row => 
                                                               (row.getInt(0),          
                                                                row.getInt(1)) 
                                                       )
/*
Notes: As far as the algorithm is concerned, customer corresponds to "user" and "product" or item corresponds to a "weapons system"
*/

We previously created a MatrixFactorization model that we trained with the weapons system sales orders dataset. We are in a position to predict how each customer country may rate a weapon system in the future. In the next section, we will generate predictions.

 

Step 8 – generating predictions

Here is how we will generate predictions. The predict method of our model is designed to do just that. It will generate a predictions RDD that we call weaponRecs. It represents the ratings of weapons systems that were not rated by customer nations (listed in the past sales order data) previously:

val weaponRecs: RDD[Rating] = ratingsModel.predict(customerWeaponsSystemPairRDD).distinct()

Next up, we will display the final predictions.

Step 9 – displaying predictions

Here is how to display the predictions, lined up in tabular format:

println("Future ratings are: " + weaponRecs.foreach(rating => { println( "Customer: " + rating.user + " Product:  " + rating.product + " Rating: " + rating.rating ) } ) )

The following table displays how each nation is expected to rate a certain system in the future, that is, a weapon system that they did not rate earlier:

System rating by each nation

Our recommendation system proved itself capable of generating future predictions.

Up until now, we did not say how all of the preceding code is compiled and deployed. We will look at this in the next section.

Compilation and deployment

Compiling the project

Invoke the sbt compile project at the root folder of your Chapter7 project. You should get the following output:

Output on compiling the project

Besides loading build.sbt, the compile task is also loading settings from assembly.sbt which we will create below.

What is an assembly.sbt file?

We have not yet talked about the assembly.sbt file. Our scala-based Spark application is a Spark job that will be submitted to a (local) Spark cluster as a JAR file. This file, apart from Spark libraries, also needs other dependencies in it for our recommendation system job to successfully complete. The name fat JAR is from all dependencies bundled in one JAR. To build such a fat JAR, we need an sbt-assembly plugin. This explains the need for creating a new assembly.sbt and the assembly plugin.

Creating assembly.sbt

Create a new assembly.sbt in your IntelliJ project view and save it under your project folder, as follows:

Creating assembly.sbt

Contents of assembly.sbt

Paste the following contents into the newly created assembly.sbt (under the project folder). The output should look like this:

Output on placing contents of assembly.sbt

The sbt-assembly plugin, version 0.14.7, gives us the ability to run an sbt-assembly task. With that, we are one step closer to building a fat or Uber JAR. This action is documented in the next step.

Running the sbt assembly task

Issue the sbt assembly command, as follows:

Running the sbt assembly command

This time, the assembly task loads the assembly-plugin in assembly.sbt. However, further assembly halts because of a common duplicate error. This error arises due to several duplicates, multiple copies of dependency files that need removal before the assembly task can successfully complete. To address this situation, build.sbt needs an upgrade.

Upgrading the build.sbt file

The following lines of code need to be added in, as follows:

Code lines for upgrading the build.sbt file

To test the effect of your changes, save this and go to the command line to reissue the sbt assembly task.

Rerunning the assembly command

Run the assembly task, as follows:

Rerunning the assembly task

This time, the settings in the assembly.sbt file are loaded. The task completes successfully. To verify, drill down to the target folder. If everything went well, you should see a fat JAR, as follows:

Output as a JAR file

Our JAR file under the target folder is the recommendation system application’s JAR file that needs to be deployed into Spark. This is documented in the next step.

Deploying the recommendation application

The spark-submit command is how we will deploy the application into Spark. Here are two formats for the spark-submit command. The first one is a long one which sets more parameters than the second one:

spark-submit --class "com.packt.modern.chapter7.RecSystem" --master local[2] --deploy-mode client --driver-memory 16g -num-executors 2 --executor-memory 2g --executor-cores 2  

Leaning on the preceding format, let’s submit our Spark job, supplying various parameters to it:

Parameters for Spark

The different parameters are explained as follows:

   Tabular explanation of parameters for Spark Job

We used Spark’s support for recommendations to build a prediction model that generated recommendations and leveraged Spark’s alternating least squares algorithm to implement our collaborative filtering recommendation system.

If you’ve enjoyed reading this post, do check out the book  Modern Scala Projects
to gain insights into data that will help organizations have a strategic and competitive advantage.

Read Next

How to Build a music recommendation system with PageRank Algorithm

Recommendation Systems

Building A Recommendation System with Azure


Subscribe to the weekly Packt Hub newsletter. We'll send you the results of our AI Now Survey, featuring data and insights from across the tech landscape.

* indicates required